from dataclasses import dataclass
from datetime import datetime
from typing import Any
from chariot.awm._utils import convert_to_datetime
__all__ = [
"Workflow",
"WorkflowResourceDetails",
"WorkflowPublicSettingRef",
"WorkflowPublicSettingValue",
"PodDetails",
]
class Base:
def __post_init__(self):
pass
[docs]
@dataclass
class WorkflowPublicSettingRef(Base):
"""A reference to a public setting, with optional schema."""
ref: str
schema: Any | None = None
[docs]
@dataclass
class WorkflowPublicSettingValue(Base):
"""A public setting value. The schema (if one exists), should be treated as read-only and is used for documentation purposes."""
value: Any
schema: Any | None = None
[docs]
@dataclass
class Workflow(Base):
"""An instance of a workflow."""
id: str
name: str
description: str
project_id: str
config: dict[str, Any] | None
public_settings: dict[str, WorkflowPublicSettingRef] | None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
deleted_at: datetime | None
deleted_by: str | None
def __post_init__(self):
super().__post_init__()
self.created_at = convert_to_datetime(self.created_at)
self.updated_at = convert_to_datetime(self.updated_at)
self.deleted_at = (
convert_to_datetime(self.deleted_at) if self.deleted_at is not None else None
)
if self.public_settings is not None:
self.public_settings = {
k: WorkflowPublicSettingRef(
ref=str(v.get("ref")), schema=v.get("schema", v.get("var_schema"))
)
if isinstance(v, dict)
else v
for k, v in self.public_settings.items()
}
[docs]
@dataclass
class PodDetails(Base):
"""Pod details."""
name: str
status: str
[docs]
@dataclass
class WorkflowResourceDetails(Base):
"""Resources of a workflow. Valid is false if any resources are missing, and any attempts to use this workflow will likely fail."""
valid: bool
deployment: str
pods: list[PodDetails]