Source code for chariot.datasets.tasks

from collections.abc import Generator
from dataclasses import asdict
from datetime import datetime
from typing import cast

from chariot import _apis
from chariot.datasets import _utils, models
from chariot_api._openapi.datasets_v3 import models as openapi_models
from chariot_api._openapi.datasets_v3.exceptions import NotFoundException
from chariot_api._openapi.datasets_v3.models.model_dataset_filter_config import (
    ModelDatasetFilterConfig,
)
from chariot_api._openapi.datasets_v3.models.model_datum_filter_config import ModelDatumFilterConfig

__all__ = [
    "get_task",
    "get_tasks",
    "count_tasks",
    "create_task",
    "archive_task",
    "get_datum_for_task",
    "get_datum_for_task_by_id",
    "get_task_datum_count",
    "get_task_statistics",
    "get_task_activity",
    "count_task_activity",
    "get_tasks_activity",
    "count_tasks_activity",
    "delete_datum_lock_for_task",
]


[docs] def get_tasks( *, search: str | None = None, exact_name_match: bool | None = None, include_archived: bool | None = None, project_ids: list[str] | None = None, task_ids: list[str] | None = None, sort: models.TaskSortColumn | None = None, direction: models.SortDirection | None = None, max_items: int | None = None, ) -> Generator[models.DatumTask, None, None]: """Get datum annotation tasks that match various criteria. Returns a generator over all matching tasks. :param search: Search string (full text search against name and description fields) :type search: Optional[str] :param exact_name_match: Require search to exactly match the task name (defaults to false) :type exact_name_match: Optional[bool] :param include_archived: If true, archived tasks will be included in the results (defaults to false) :type include_archived: Optional[bool] :param project_ids: Filter by project ids :type project_ids: Optional[List[str]] :param task_ids: Filter by task ids :type task_ids: Optional[List[str]] :param sort: What column to sort the tasks by (defaults to name) :type sort: Optional[models.TaskSortColumn] :param direction: Whether to sort in ascending or descending order :type direction: Optional[models.SortDirection] :param max_items: Limit the returned generator to only produce this many items :type max_items: Optional[int] :return: Task definitions for tasks matching the criteria :rtype: Generator[models.DatumTask, None, None] """ params = locals() if "max_items" in params: del params["max_items"] if task_ids is not None: return _get_task_generator(**params) return _utils.paginate_items(_get_tasks, params, max_items)
def _get_task_generator(**params) -> Generator[models.DatumTask, None, None]: yield from _get_tasks(**params) def _get_tasks( *, search: str | None = None, exact_name_match: bool | None = None, include_archived: bool | None = None, project_ids: list[str] | None = None, task_ids: list[str] | None = None, sort: models.TaskSortColumn | None = None, direction: models.SortDirection | None = None, limit: int | None = None, offset: int | None = None, ) -> list[models.DatumTask]: response = _apis.datasets_v3.tasks_api.get_datum_tasks( openapi_models.InputGetDatumTasksRequest( search=search, exact_name_match=exact_name_match, include_archived=include_archived, project_ids=project_ids, task_ids=task_ids, direction=_utils.enum_value(direction), sort=_utils.enum_value(sort), limit=limit, offset=offset, ) ) return [ _utils.convert_to_dataclass(t.model_dump(), models.DatumTask) for t in response.data or [] ]
[docs] def count_tasks( *, search: str | None = None, exact_name_match: bool | None = None, include_archived: bool | None = None, project_ids: list[str] | None = None, task_ids: list[str] | None = None, ) -> int: """Get number of tasks that match given criteria. :param search: Search string (full text search against name and description fields) :type name: Optional[str] :param exact_name_match: Require search to exactly match the task name (defaults to false) :type exact_name_match: Optional[bool] :param include_archived: If true, archived tasks will be included in the results (defaults to false) :type include_archived: Optional[bool] :param project_ids: Filter by project ids :type project_ids: Optional[List[str]] :param task_ids: Filter by task ids :type task_ids: Optional[List[str]] :return: Number of tasks that match given criteria :rtype: int """ response = _apis.datasets_v3.tasks_api.count_datum_tasks( openapi_models.InputCountDatumTasksRequest( search=search, exact_name_match=exact_name_match, include_archived=include_archived, project_ids=project_ids, task_ids=task_ids, ) ) return response.data or 0
[docs] def create_task( *, name: str, project_id: str, description: str | None = None, dataset_config: models.DatasetConfig | None = None, datum_config: models.DatumConfig | None = None, ) -> models.DatumTask: """Create a new datum annotation task. :param name: Datum annotation task name :type name: str :param project_id: Project id that datum annotation Task belongs to :type project_id: str :param description: Datum annotation task description :type description: Optional[str] :return: New datum annotation task detail :rtype: models.DatumTask """ datum_config_dict = ( asdict(datum_config, dict_factory=_utils.dict_factory) if datum_config else None ) dataset_config_dict = ( asdict(dataset_config, dict_factory=_utils.dict_factory) if dataset_config else None ) response = _apis.datasets_v3.tasks_api.create_datum_task( openapi_models.InputCreateDatumTaskRequest( name=name, project_id=project_id, description=description, dataset_config=cast(ModelDatasetFilterConfig, dataset_config_dict), datum_config=cast(ModelDatumFilterConfig, datum_config_dict), ) ) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from create_datum_task") return _utils.convert_to_dataclass(response.data.model_dump(), models.DatumTask)
[docs] def get_task(id: str) -> models.DatumTaskDetails: """Get a datum annotation task by id. :param id: Datum annotation task id :type id: str :return: The datum annotation task details :rtype: models.DatumTaskDetails """ response = _apis.datasets_v3.tasks_api.get_datum_task(task_id=id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from get_datum_task") return _utils.convert_to_dataclass(response.data.model_dump(), models.DatumTaskDetails)
[docs] def archive_task(id: str) -> models.DatumTask: """Archive a datum annotation task. :param id: Datum annotation task id :type id: str :return: The datum annotation task that has been archived :rtype: models.DatumTask """ response = _apis.datasets_v3.tasks_api.archive_datum_task(task_id=id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from archive_datum_task") return _utils.convert_to_dataclass(response.data.model_dump(), models.DatumTask)
[docs] def get_task_statistics( id: str, *, task_type_label_filters: list[models.TaskTypeLabelFilter] | None = None, gps_coordinates_circle: models.Circle | None = None, gps_coordinates_rectangle: models.Rectangle | None = None, gps_coordinates_polygon: list[models.GeoPoint] | None = None, capture_timestamp_range: models.TimestampRange | None = None, metadata: dict[str, str] | None = None, asof_timestamp: datetime | None = None, unannotated: bool | None = None, datum_ids: list[str] | None = None, approval_status: list[str] | None = None, annotation_metadata: dict[str, str] | None = None, ) -> models.DatumTaskStatistics: """Get dataset datum statistics with various criteria :param id: Id of datum task to get statistics for :type id: str :param task_type_label_filters: Filter by task types and associated labels :type task_type_label_filters: Optional[List[models.TaskTypeLabelFilter]] :param gps_coordinates_circle: Filter datums within the given circle :type gps_coordinates_circle: Optional[models.Circle] :param gps_coordinates_rectangle: Filter datums within the given rectangle :type gps_coordinates_rectangle: Optional[models.Rectangle] :param gps_coordinates_polygon: Filter datums within the given polygon :type gps_coordinates_polygon: Optional[List[models.GeoPoint]] :param capture_timestamp_range: Filter by datum capture timestamp :type capture_timestamp_range: Optional[models.TimestampRange] :param metadata: Filter by datum metadata values :type metadata: Optional[Dict[str, str]] :param datum_ids: Filter datums with a list of datum ids :type datum_ids: Optional[List[str]] :param approval_status: Filter by annotation approval status :type approval_status: Optional[List[str]] :param annotation_metadata: Filter by annotation metadata values :type annotation_metadata: Optional[Dict[str, str]] :return: Datum task statistics :rtype: models.DatumTaskStatistics """ if task_type_label_filters is not None: task_type_label_filters = [ asdict(f, dict_factory=_utils.dict_factory) for f in task_type_label_filters ] if gps_coordinates_circle is not None: gps_coordinates_circle = asdict(gps_coordinates_circle, dict_factory=_utils.dict_factory) if gps_coordinates_rectangle is not None: gps_coordinates_rectangle = asdict( gps_coordinates_rectangle, dict_factory=_utils.dict_factory ) if gps_coordinates_polygon is not None: gps_coordinates_polygon = [ asdict(p, dict_factory=_utils.dict_factory) for p in gps_coordinates_polygon ] if capture_timestamp_range is not None: capture_timestamp_range = asdict(capture_timestamp_range, dict_factory=_utils.dict_factory) request = openapi_models.InputGetDatumTaskStatsRequest( task_type_label_filters=task_type_label_filters, gps_coordinates_circle=gps_coordinates_circle, gps_coordinates_rectangle=gps_coordinates_rectangle, gps_coordinates_polygon=gps_coordinates_polygon, capture_timestamp_range=capture_timestamp_range, metadata=metadata, datum_ids=datum_ids, approval_status=approval_status, annotation_metadata=annotation_metadata, ) response = _apis.datasets_v3.tasks_api.get_datum_task_stats(task_id=id, body=request) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from get_datum_task_stats") return _utils.convert_to_dataclass(response.data.model_dump(), models.DatumTaskStatistics)
[docs] def get_datum_for_task( task_id: str, *, unannotated: bool = False, random: bool = False, id_after: str | None = None, prev_datum_id: str | None = None, skip_prev: bool | None = None, ) -> models.Datum | None: """Get the next available datum for the given task. Returns None if there are no datums available. :param task_id: The id of the task :type task_id: str :param unannotated: If true, only unannotated datums will be returned (defaults to false) :type unannotated: Optional[bool] :param random: If true, returns a random available datum instead of the next available datum (defaults to false) :type random: Optional[bool] :param id_after: If provided, will return a datum that is after the given datum id (can be used to resume a task from a specific point, or to skip a specific datum) :type id_after: Optional[str] :param prev_datum_id: if specified, any lock held by the user on this datum will be released if a new datum is acquired :type prev_datum_id: Optional[str] :param skip_prev: if true, the datum specified by prev_datum_id will be marked as 'skipped' rather than 'viewed' when its lock is released, putting it at the end of the task queue :type skip_prev: Optional[bool] :return: The datum, or None if no datums matching the request are available :rtype: Optional[models.Datum] """ response = _apis.datasets_v3.datums_api.get_next_datum_for_task( task_id=task_id, unannotated=unannotated, sort="random" if random else "", id_after=id_after, prev_datum_id=prev_datum_id, skip_prev=skip_prev, ) if not response.data or len(response.data) == 0: return None return _utils.convert_to_dataclass(response.data[0].model_dump(), models.Datum)
[docs] def delete_datum_lock_for_task( id: str, task_id: str, user_id: str | None = None, ) -> None: """Delete the specified datum's lock for a given task. Must be the current user holding the lock. :param id: The id of the datum :type id: str :param task_id: The id of the task :type task_id: str :param user_id: The id of the user who holds the lock :type user_id: Optional[str] :return: None """ _apis.datasets_v3.datums_api.delete_datum_task_activity_lock( datum_id=id, task_id=task_id, user_id=user_id )
[docs] def get_datum_for_task_by_id( id: str, task_id: str, ) -> models.Datum | None: """Get the specific datum designated by id. :param id: The id of the datum :type id: str :param task_id: The id of the task :type task_id: str :return: The datum, None if no datum is found or the datum does not apply to the specified task. :rtype: Optional[models.Datum] """ try: response = _apis.datasets_v3.datums_api.retrieve_task_datum(datum_id=id, task_id=task_id) except NotFoundException: return None if not response.data: raise RuntimeError( "Received malformed response (missing `data`) from get_datum_for_task_by_id" ) return _utils.convert_to_dataclass(response.data.model_dump(), models.Datum)
[docs] def get_task_datum_count( task_id: str, ) -> int: """Get the number of datums in the provided task. :param task_id: The id of the task :type task_id: str :return: The datum count :rtype: int """ response = _apis.datasets_v3.datums_api.get_task_datum_count(task_id=task_id) return response.data or 0
[docs] def get_task_activity( task_id: str, *, activities: list[models.DatumTaskActivityCode] | None = None, dataset_ids: list[str] | None = None, user_ids: list[str] | None = None, direction: models.SortDirection | None = None, sort: models.TaskActivitySortColumn | None = None, max_items: int | None = None, ) -> Generator[models.DatumTaskActivity, None, None]: """Get the activities for the provided task and filters. :param task_id: Id of the task :type task_id: str :param activities: List of activity types to filter by :type activities: Optional[List[models.DatumTaskActivityCode]] :param dataset_ids: List of dataset ids to filter by :type dataset_ids: Optional[List[str]] :param user_ids: List of user ids to filter by :type user_ids: Optional[List[str]] :param direction: Sort direction :type direction: Optional[models.SortDirection] :param sort: Sort column :type sort: Optional[models.TaskActivitySortColumn] :param max_items: Limit the returned generator to only produce this many items :type max_items: Optional[int] :return: Generator over the matching task activities :rtype: Generator[models.DatumTaskActivity, None, None] """ params = locals() if "max_items" in params: del params["max_items"] return _utils.paginate_items(_get_task_activity, params, max_items)
def _get_task_activity( task_id: str, *, activities: list[models.DatumTaskActivityCode] | None = None, dataset_ids: list[str] | None = None, user_ids: list[str] | None = None, direction: models.SortDirection | None = None, limit: int | None = None, offset: int | None = None, sort: models.TaskActivitySortColumn | None = None, ) -> list[models.DatumTaskActivity]: response = _apis.datasets_v3.tasks_api.get_datum_task_activity( task_id=task_id, activities=[_utils.enum_value(a) for a in activities] if activities else None, dataset_ids=dataset_ids, user_ids=user_ids, direction=_utils.enum_value(direction), limit=limit, offset=offset, sort=_utils.enum_value(sort), ) if not response.data: return [] return [ _utils.convert_to_dataclass(t.model_dump(), models.DatumTaskActivity) for t in response.data or [] ]
[docs] def count_task_activity( task_id: str, *, activities: list[models.DatumTaskActivityCode] | None = None, dataset_ids: list[str] | None = None, user_ids: list[str] | None = None, ) -> int: """Count the activities for the provided task and filters. :param task_id: Id of the task :type task_id: str :param activities: List of activity types to filter by :type activities: Optional[List[models.DatumTaskActivityCode]] :param dataset_ids: List of dataset ids to filter by :type dataset_ids: Optional[List[str]] :param user_ids: List of user ids to filter by :type user_ids: Optional[List[str]] :return: Number of matching task activities :rtype: int """ response = _apis.datasets_v3.tasks_api.count_datum_task_activity( task_id=task_id, activities=[_utils.enum_value(a) for a in activities] if activities else None, dataset_ids=dataset_ids, user_ids=user_ids, ) return response.data or 0
[docs] def get_tasks_activity( exact_name_match: bool | None = None, search: str | None = None, project_ids: list[str] | None = None, task_ids: list[str] | None = None, activities: list[models.DatumTaskActivityCode] | None = None, dataset_ids: list[str] | None = None, user_ids: list[str] | None = None, direction: models.SortDirection | None = None, sort: models.TaskActivitySortColumn | None = None, max_items: int | None = None, ) -> Generator[models.DatumTaskActivity, None, None]: """Get the matching activities. :param exact_name_match: Require search filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param search: Search string (full text search against task name and description fields) :type search: Optional[str] :param project_ids: List of project ids to filter by :type project_ids: Optional[List[str]] :param task_ids: List of task ids to filter by :type task_ids: Optional[List[str]] :param activities: List of activity types to filter by :type activities: Optional[List[models.DatumTaskActivityCode]] :param dataset_ids: List of dataset ids to filter by :type dataset_ids: Optional[List[str]] :param user_ids: List of user ids to filter by :type user_ids: Optional[List[str]] :param direction: Sort direction :type direction: Optional[models.SortDirection] :param sort: Sort column :type sort: Optional[models.TaskActivitySortColumn] :param max_items: Limit the returned generator to only produce this many items :type max_items: Optional[int] :return: Generator over the matching task activities :rtype: Generator[models.DatumTaskActivity, None, None] """ params = locals() if "max_items" in params: del params["max_items"] return _utils.paginate_items(_get_tasks_activity, params, max_items)
def _get_tasks_activity( exact_name_match: bool | None = None, search: str | None = None, project_ids: list[str] | None = None, task_ids: list[str] | None = None, activities: list[models.DatumTaskActivityCode] | None = None, dataset_ids: list[str] | None = None, user_ids: list[str] | None = None, direction: models.SortDirection | None = None, limit: int | None = None, offset: int | None = None, sort: models.TaskActivitySortColumn | None = None, ) -> list[models.DatumTaskActivity]: response = _apis.datasets_v3.tasks_api.get_datum_tasks_activity( exact_name_match=exact_name_match, search=search, project_ids=project_ids, task_ids=task_ids, activities=[_utils.enum_value(a) for a in activities] if activities else None, dataset_ids=dataset_ids, user_ids=user_ids, direction=_utils.enum_value(direction), limit=limit, offset=offset, sort=_utils.enum_value(sort), ) if not response.data: return [] return [ _utils.convert_to_dataclass(t.model_dump(), models.DatumTaskActivity) for t in response.data or [] ]
[docs] def count_tasks_activity( exact_name_match: bool | None = None, search: str | None = None, project_ids: list[str] | None = None, task_ids: list[str] | None = None, activities: list[models.DatumTaskActivityCode] | None = None, dataset_ids: list[str] | None = None, user_ids: list[str] | None = None, ) -> int: """Count matching activities . :param exact_name_match: Require search filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param search: Search string (full text search against task name and description fields) :type search: Optional[str] :param project_ids: List of project ids to filter by :type project_ids: Optional[List[str]] :param task_ids: List of task ids to filter by :type task_ids: Optional[List[str]] :param activities: List of activity types to filter by :type activities: Optional[List[models.DatumTaskActivityCode]] :param dataset_ids: List of dataset ids to filter by :type dataset_ids: Optional[List[str]] :param user_ids: List of user ids to filter by :type user_ids: Optional[List[str]] :return: Number of matching task activities :rtype: int """ response = _apis.datasets_v3.tasks_api.count_datum_tasks_activity( exact_name_match=exact_name_match, search=search, project_ids=project_ids, task_ids=task_ids, activities=[_utils.enum_value(a) for a in activities] if activities else None, dataset_ids=dataset_ids, user_ids=user_ids, ) return response.data or 0