from collections.abc import Generator
from typing import Any
import chariot_api._openapi.awm.models as api_models
from chariot import _apis, mcp_setting
from chariot.awm import _utils, models
from chariot_api._openapi.awm.api import WorkflowsApi
from chariot_api._openapi.awm.models.model_public_setting_ref import ModelPublicSettingRef
__all__ = [
"create_workflow",
"update_workflow",
"delete_workflow",
"get_workflow",
"get_workflows",
"initialize_workflow_user",
"get_public_setting_values",
"patch_public_setting_values",
]
def _workflow_api() -> WorkflowsApi:
return _apis.awm.workflows_api
[docs]
@mcp_setting(mutating=True)
@_apis.login_required
def create_workflow(
project_id: str,
name: str,
description: str,
config: dict[str, Any],
paused: bool = False,
public_settings: dict[str, models.WorkflowPublicSettingRef] | None = None,
) -> models.Workflow:
"""Create a workflow for a project.
:param project_id: Project id
:type project_id: str
:param name: Name of the workflow
:type name: str
:param description: Description of the workflow
:type description: str
:param config: The workflow configuration
:type config: Dict[str, Any]
:param paused: Set to true to not create an agent process pod
:type paused: bool
:param public_settings: Public settings for the workflow
:type public_settings: Optional[Dict[str, models.WorkflowPublicSettingRef]]
:rtype: models.Workflow
"""
response = _workflow_api().create_workflow(
api_models.InputCreateWorkflowRequest(
project_id=project_id,
name=name,
description=description,
config=config,
paused=paused,
publicSettings={
k: ModelPublicSettingRef(ref=v.ref, schema=v.schema)
for k, v in public_settings.items()
}
if public_settings
else None,
)
)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from create_workflow")
return _utils.convert_to_dataclass(response.data.model_dump(), models.Workflow)
[docs]
@mcp_setting(mutating=True)
@_apis.login_required
def update_workflow(
workflow_id: str,
name: str,
description: str,
config: dict[str, Any],
paused: bool = False,
public_settings: dict[str, models.WorkflowPublicSettingRef] | None = None,
) -> models.Workflow:
"""Update a workflow by the id.
:param workflow_id: Workflow id
:type workflow_id: str
:param name: Name of the workflow
:type name: str
:param description: Description of the workflow
:type description: str
:param config: The workflow configuration
:type config: Dict[str, Any]
:param paused: Set to true to not create an agent process pod
:type paused: bool
:param public_settings: Public settings for the workflow
:type public_settings: Optional[Dict[str, models.WorkflowPublicSettingRef]]
:rtype: models.Workflow
"""
response = _workflow_api().update_workflow(
workflow_id,
api_models.InputUpdateWorkflowRequest(
name=name,
description=description,
config=config,
paused=paused,
publicSettings={
k: ModelPublicSettingRef(ref=v.ref, schema=v.schema)
for k, v in public_settings.items()
}
if public_settings
else None,
),
)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from update_workflow")
return _utils.convert_to_dataclass(response.data.model_dump(), models.Workflow)
[docs]
@_apis.login_required
def get_workflow(workflow_id: str) -> models.Workflow:
"""Get a workflow for a project.
:param workflow_id: Workflow id
:type workflow_id: str
:rtype: models.Workflow
"""
response = _workflow_api().get_workflow(workflow_id)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from get_workflow")
return _utils.convert_to_dataclass(response.data.model_dump(), models.Workflow)
@_apis.login_required
def get_workflow_resources(workflow_id: str) -> models.WorkflowResourceDetails:
"""Get workflow resources for a project.
:param workflow_id: Workflow id
:type workflow_id: str
:rtype: models.WorkflowResourceDetails
"""
response = _workflow_api().get_workflow_resource_details(workflow_id)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from get_workflow")
return _utils.convert_to_dataclass(response.data.model_dump(), models.WorkflowResourceDetails)
[docs]
@mcp_setting(mutating=True)
@_apis.login_required
def delete_workflow(workflow_id: str):
"""Delete a workflow for a project.
:param workflow_id: Workflow id
:type workflow_id: str
:rtype: None
"""
_workflow_api().delete_workflow(workflow_id)
[docs]
@_apis.login_required
def get_workflows(
name: str | None = None,
project_ids: list[str] | None = None,
include_deleted: bool | None = None,
limit: int | None = None,
after_key: str | None = None,
) -> Generator[models.Workflow, None, None]:
"""List workflows matching the specified filters.
:param name: Partial, case-insensitive name filter
:type name: Optional[str]
:param project_ids: List of project ids to filter
:type project_ids: Optional[List[str]]
:param include_deleted: Set to True to include deleted workflows
:type include_deleted: Optional[bool]
:param limit: The max number of items to return
:type limit: Optional[int]
:param after_key: The pagination key to fetch more items
:type after_key: Optional[str]
:rtype: Generator[models.Workflow, None, None]
"""
params = {
k: v
for k, v in {
"name": name,
"project_id": project_ids,
"include_deleted": include_deleted,
"limit": limit,
"after_key": after_key,
}.items()
if v is not None
}
def _paginate_fn(**params):
return _workflow_api().list_workflows(**params)
def _transform_fn(item):
return _utils.convert_to_dataclass(item.model_dump(), models.Workflow)
return _utils.paginate(
func=_paginate_fn,
params=params,
transform_item=_transform_fn,
)
[docs]
@mcp_setting(ignore=True)
@_apis.login_required
def initialize_workflow_user(
workflow_id: str,
agent_name: str,
):
"""Initialize a user for a particular workflow and agent.
:param workflow_id: Workflow id
:type workflow_id: str
:param agent_name: Agent name
:type agent_name: str
:rtype: None
"""
_workflow_api().initialize_workflow_user(
workflow_id,
api_models.InputInitializeWorkflowUserRequest(
agent_name=agent_name,
),
)
[docs]
@_apis.login_required
def get_public_setting_values(workflow_id: str) -> dict[str, models.WorkflowPublicSettingValue]:
"""Get workflow public setting values.
:param workflow_id: Workflow id
:type workflow_id: str
:rtype: dict[str, models.WorkflowPublicSettingValue]
"""
response = _workflow_api().get_workflow_public_settings(workflow_id)
if not response.data:
return {}
if not isinstance(response.data, dict):
raise TypeError(
"Received malformed response (`data` should be a dict) from get_public_setting_values"
)
return _to_public_setting_values(response.data)
[docs]
@mcp_setting(mutating=True)
@_apis.login_required
def patch_public_setting_values(
workflow_id: str, public_setting_values: dict[str, models.WorkflowPublicSettingValue]
) -> dict[str, models.WorkflowPublicSettingValue]:
"""Patch workflow public setting values.
:param workflow_id: Workflow id
:type workflow_id: str
:param public_setting_values: The public settings with updated values
:type public_setting_values: dict[str, models.WorkflowPublicSettingValue]
:rtype: dict[str, models.WorkflowPublicSettingValue]
"""
response = _workflow_api().patch_workflow_public_settings(
workflow_id, {k: {"value": v.value} for k, v in public_setting_values.items()}
)
if not response.data:
return {}
if not isinstance(response.data, dict):
raise TypeError(
"Received malformed response (`data` should be a dict) from patch_public_setting_values"
)
return _to_public_setting_values(response.data)
def _to_public_setting_values(
data: dict[str, dict[str, Any]],
) -> dict[str, models.WorkflowPublicSettingValue]:
return {
k: _utils.convert_to_dataclass(
{
"value": v.get("value"),
"schema": v.get("schema"),
},
models.WorkflowPublicSettingValue,
)
for k, v in data.items()
}