Source code for chariot.awm.workflows

from collections.abc import Generator
from typing import Any

import chariot_api._openapi.awm.models as api_models
from chariot import _apis, mcp_setting
from chariot.awm import _utils, models
from chariot_api._openapi.awm.api import WorkflowsApi
from chariot_api._openapi.awm.models.model_public_setting_ref import ModelPublicSettingRef

__all__ = [
    "create_workflow",
    "update_workflow",
    "delete_workflow",
    "get_workflow",
    "get_workflows",
    "initialize_workflow_user",
    "get_public_setting_values",
    "patch_public_setting_values",
]


def _workflow_api() -> WorkflowsApi:
    return _apis.awm.workflows_api


[docs] @mcp_setting(mutating=True) @_apis.login_required def create_workflow( project_id: str, name: str, description: str, config: dict[str, Any], paused: bool = False, public_settings: dict[str, models.WorkflowPublicSettingRef] | None = None, ) -> models.Workflow: """Create a workflow for a project. :param project_id: Project id :type project_id: str :param name: Name of the workflow :type name: str :param description: Description of the workflow :type description: str :param config: The workflow configuration :type config: Dict[str, Any] :param paused: Set to true to not create an agent process pod :type paused: bool :param public_settings: Public settings for the workflow :type public_settings: Optional[Dict[str, models.WorkflowPublicSettingRef]] :rtype: models.Workflow """ response = _workflow_api().create_workflow( api_models.InputCreateWorkflowRequest( project_id=project_id, name=name, description=description, config=config, paused=paused, publicSettings={ k: ModelPublicSettingRef(ref=v.ref, schema=v.schema) for k, v in public_settings.items() } if public_settings else None, ) ) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from create_workflow") return _utils.convert_to_dataclass(response.data.model_dump(), models.Workflow)
[docs] @mcp_setting(mutating=True) @_apis.login_required def update_workflow( workflow_id: str, name: str, description: str, config: dict[str, Any], paused: bool = False, public_settings: dict[str, models.WorkflowPublicSettingRef] | None = None, ) -> models.Workflow: """Update a workflow by the id. :param workflow_id: Workflow id :type workflow_id: str :param name: Name of the workflow :type name: str :param description: Description of the workflow :type description: str :param config: The workflow configuration :type config: Dict[str, Any] :param paused: Set to true to not create an agent process pod :type paused: bool :param public_settings: Public settings for the workflow :type public_settings: Optional[Dict[str, models.WorkflowPublicSettingRef]] :rtype: models.Workflow """ response = _workflow_api().update_workflow( workflow_id, api_models.InputUpdateWorkflowRequest( name=name, description=description, config=config, paused=paused, publicSettings={ k: ModelPublicSettingRef(ref=v.ref, schema=v.schema) for k, v in public_settings.items() } if public_settings else None, ), ) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from update_workflow") return _utils.convert_to_dataclass(response.data.model_dump(), models.Workflow)
[docs] @_apis.login_required def get_workflow(workflow_id: str) -> models.Workflow: """Get a workflow for a project. :param workflow_id: Workflow id :type workflow_id: str :rtype: models.Workflow """ response = _workflow_api().get_workflow(workflow_id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from get_workflow") return _utils.convert_to_dataclass(response.data.model_dump(), models.Workflow)
@_apis.login_required def get_workflow_resources(workflow_id: str) -> models.WorkflowResourceDetails: """Get workflow resources for a project. :param workflow_id: Workflow id :type workflow_id: str :rtype: models.WorkflowResourceDetails """ response = _workflow_api().get_workflow_resource_details(workflow_id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from get_workflow") return _utils.convert_to_dataclass(response.data.model_dump(), models.WorkflowResourceDetails)
[docs] @mcp_setting(mutating=True) @_apis.login_required def delete_workflow(workflow_id: str): """Delete a workflow for a project. :param workflow_id: Workflow id :type workflow_id: str :rtype: None """ _workflow_api().delete_workflow(workflow_id)
[docs] @_apis.login_required def get_workflows( name: str | None = None, project_ids: list[str] | None = None, include_deleted: bool | None = None, limit: int | None = None, after_key: str | None = None, ) -> Generator[models.Workflow, None, None]: """List workflows matching the specified filters. :param name: Partial, case-insensitive name filter :type name: Optional[str] :param project_ids: List of project ids to filter :type project_ids: Optional[List[str]] :param include_deleted: Set to True to include deleted workflows :type include_deleted: Optional[bool] :param limit: The max number of items to return :type limit: Optional[int] :param after_key: The pagination key to fetch more items :type after_key: Optional[str] :rtype: Generator[models.Workflow, None, None] """ params = { k: v for k, v in { "name": name, "project_id": project_ids, "include_deleted": include_deleted, "limit": limit, "after_key": after_key, }.items() if v is not None } def _paginate_fn(**params): return _workflow_api().list_workflows(**params) def _transform_fn(item): return _utils.convert_to_dataclass(item.model_dump(), models.Workflow) return _utils.paginate( func=_paginate_fn, params=params, transform_item=_transform_fn, )
[docs] @mcp_setting(ignore=True) @_apis.login_required def initialize_workflow_user( workflow_id: str, agent_name: str, ): """Initialize a user for a particular workflow and agent. :param workflow_id: Workflow id :type workflow_id: str :param agent_name: Agent name :type agent_name: str :rtype: None """ _workflow_api().initialize_workflow_user( workflow_id, api_models.InputInitializeWorkflowUserRequest( agent_name=agent_name, ), )
[docs] @_apis.login_required def get_public_setting_values(workflow_id: str) -> dict[str, models.WorkflowPublicSettingValue]: """Get workflow public setting values. :param workflow_id: Workflow id :type workflow_id: str :rtype: dict[str, models.WorkflowPublicSettingValue] """ response = _workflow_api().get_workflow_public_settings(workflow_id) if not response.data: return {} if not isinstance(response.data, dict): raise TypeError( "Received malformed response (`data` should be a dict) from get_public_setting_values" ) return _to_public_setting_values(response.data)
[docs] @mcp_setting(mutating=True) @_apis.login_required def patch_public_setting_values( workflow_id: str, public_setting_values: dict[str, models.WorkflowPublicSettingValue] ) -> dict[str, models.WorkflowPublicSettingValue]: """Patch workflow public setting values. :param workflow_id: Workflow id :type workflow_id: str :param public_setting_values: The public settings with updated values :type public_setting_values: dict[str, models.WorkflowPublicSettingValue] :rtype: dict[str, models.WorkflowPublicSettingValue] """ response = _workflow_api().patch_workflow_public_settings( workflow_id, {k: {"value": v.value} for k, v in public_setting_values.items()} ) if not response.data: return {} if not isinstance(response.data, dict): raise TypeError( "Received malformed response (`data` should be a dict) from patch_public_setting_values" ) return _to_public_setting_values(response.data)
def _to_public_setting_values( data: dict[str, dict[str, Any]], ) -> dict[str, models.WorkflowPublicSettingValue]: return { k: _utils.convert_to_dataclass( { "value": v.get("value"), "schema": v.get("schema"), }, models.WorkflowPublicSettingValue, ) for k, v in data.items() }