Source code for chariot.datasets.models

"""Datasets models."""

from dataclasses import asdict, dataclass
from datetime import datetime
from enum import Enum, StrEnum
from typing import Any

from chariot.datasets._utils import (
    Base,
    convert_to_dataclass,
    convert_to_dataclass_list,
    convert_to_datetime,
    format_datetime_utc,
)

__all__ = [
    "SortDirection",
    "DatasetSortColumn",
    "DatumSortColumn",
    "SnapshotSortColumn",
    "ViewSortColumn",
    "UploadSortColumn",
    "TaskSortColumn",
    "TaskType",
    "ContextLabelFilter",
    "TaskTypeLabelFilter",
    "TimestampRange",
    "DatumFilter",
    "SplitName",
    "SplitConfig",
    "SplitAlgorithm",
    "SnapshotStatus",
    "Snapshot",
    "GeoPoint",
    "Circle",
    "Rectangle",
    "Distribution",
    "DatumStatistics",
    "Datum",
    "Annotation",
    "Point",
    "BoundingBox",
    "OrientedBoundingBox",
    "TextClassification",
    "TokenClassification",
    "TextGeneration",
    "ApprovalStatus",
    "MigrationStatus",
    "DatasetType",
    "DatasetSummary",
    "Dataset",
    "DatasetStatistics",
    "DatasetTimelineEvent",
    "View",
    "FileType",
    "ManifestType",
    "FileFormat",
    "FileStatus",
    "JobType",
    "JobStatus",
    "UploadStatus",
    "UploadType",
    "File",
    "Job",
    "PresignedUrl",
    "Upload",
    "UploadStatistics",
    "VideoSamplingType",
    "VideoSamplingOptions",
    "DatumTask",
    "DatasetConfig",
    "DatumTaskDetails",
    "DatumTaskStatistics",
    "DatumTaskActivityCode",
    "DatumTaskActivity",
    "TaskActivitySortColumn",
]


[docs] class SplitName(StrEnum): TRAIN = "train" VAL = "val" TEST = "test"
[docs] class SplitAlgorithm(Enum): RANDOM = "random"
[docs] class MigrationStatus(Enum): IDENTIFIED = "identified" PLANNED = "planned" DOWNLOADING = "downloading" UPLOADING_HORIZONTALS = "uploading_horizontals" UPLOADING_VERTICAL = "uploading_vertical" CLEANUP = "cleanup" COMPLETE = "complete" EXCEPTION = "exception" ERROR = "error"
[docs] class DatasetType(Enum): IMAGE = "image" TEXT = "text"
[docs] class SortDirection(Enum): ASCENDING = "asc" DESCENDING = "desc"
[docs] class DatasetSortColumn(Enum): NAME = "name" CREATION_TIMESTAMP = "creation timestamp" UPDATED_TIMESTAMP = "updated timestamp" DATUM_COUNT = "datum count"
[docs] class ViewSortColumn(Enum): NAME = "name" CREATION_TIMESTAMP = "creation timestamp" SAMPLE_COUNT = "sample count" ID = "id"
[docs] class DatumSortColumn(Enum): CREATION_TIMESTAMP = "creation timestamp"
[docs] class SnapshotSortColumn(Enum): NAME = "name" TIMESTAMP = "timestamp" CREATION_TIMESTAMP = "creation timestamp" ID = "id"
[docs] class UploadSortColumn(Enum): TYPE = "type" STATUS = "status" CREATION_TIMESTAMP = "creation timestamp"
[docs] class TaskSortColumn(Enum): NAME = "name" ID = "id"
[docs] class TaskActivitySortColumn(Enum): ACTIVITY_START_TIME = "activity start time" ACTIVITY_END_TIME = "activity end time"
[docs] class TaskType(StrEnum): IMAGE_CLASSIFICATION = "Image Classification" OBJECT_DETECTION = "Object Detection" ORIENTED_OBJECT_DETECTION = "Oriented Object Detection" IMAGE_SEGMENTATION = "Image Segmentation" TOKEN_CLASSIFICATION = "Token Classification" TEXT_CLASSIFICATION = "Text Classification" TEXT_GENERATION = "Text Generation"
[docs] class JobType(Enum): DELETE_DATASET = "delete_dataset" UPLOAD = "upload" DELETE_UPLOAD = "delete_upload" FILE = "file" DELETE_FILE = "delete_file" SNAPSHOT = "snapshot"
[docs] class JobStatus(Enum): READY = "ready" IN_PROGRESS = "in progress"
[docs] class FileType(Enum): ARCHIVE = "archive" MANIFEST = "manifest"
[docs] class ManifestType(Enum): ANNOTATED = "annotated" ALL = "all"
[docs] class FileFormat(Enum): TGZ = "tgz" ZIP = "zip" GZ = "gz"
[docs] class FileStatus(Enum): PENDING = "pending" PROCESSING = "processing" COMPLETE = "complete" ERROR = "error" ARCHIVED = "archived"
[docs] class UploadType(Enum): ARCHIVE = "archive" DATUM = "datum" INFERENCE = "inference" VIDEO = "video" TEXT = "text" ANNOTATION = "annotation" RAIC = "raic"
[docs] class UploadStatus(Enum): CREATED = "created" PROCESSING = "processing" COMPLETE = "complete" ERROR = "error" CLEANUP = "cleanup"
[docs] class ApprovalStatus(Enum): NOT_REVIEWED = "" NEEDS_REVIEW = "needs_review" VERIFIED = "verified" REJECTED = "rejected"
[docs] class DatumTaskActivityCode(StrEnum): LOCKED = "locked" VIEWED = "viewed" SKIPPED = "skipped"
[docs] @dataclass class GeoPoint(Base): latitude: float longitude: float
[docs] @dataclass class Circle(Base): center: GeoPoint radius: float
[docs] @dataclass class Rectangle(Base): p1: GeoPoint p2: GeoPoint
[docs] @dataclass class Distribution(Base): context: str | None distribution: dict[str, int]
[docs] @dataclass class Point(Base): x: float y: float
[docs] @dataclass class BoundingBox(Base): xmin: float xmax: float ymin: float ymax: float
[docs] @dataclass class OrientedBoundingBox(Base): cx: float cy: float w: float h: float r: float
[docs] @dataclass class TextClassification(Base): context: str | None label: str
[docs] @dataclass class TokenClassification(Base): label: str start: int end: int
[docs] @dataclass class TextGeneration(Base): context: str | None generated_text: str | None generated_text_presigned_url: str | None
[docs] @dataclass class DatumStatistics(Base): datum_count: int available_datum_count: int new_datum_count: int annotation_count: int class_label_count: int bounding_box_count: int oriented_bounding_box_count: int contour_count: int text_classification_count: int token_classification_count: int text_generation_count: int class_label_distribution: dict[str, int] | None text_classification_distribution: list[Distribution] | None token_classification_distribution: dict[str, int] | None text_generation_distribution: dict[str, int] | None annotation_count_by_approval_status: dict[str, int] | None def __post_init__(self): super().__post_init__() if self.text_classification_distribution is not None: self.text_classification_distribution = convert_to_dataclass_list( self.text_classification_distribution, Distribution )
[docs] @dataclass class DatasetSummary(DatumStatistics): total_datum_size: int largest_datum_size: int unannotated_datum_count: int
[docs] @dataclass class Dataset(Base): id: str name: str type: DatasetType project_id: str is_public: bool is_test: bool delete_lock: bool created_at: datetime updated_at: datetime description: str | None = None archived_at: datetime | None = None archived_by: str | None = None summary: DatasetSummary | None = None migration_status: MigrationStatus | None = None def __post_init__(self): super().__post_init__() self.type = DatasetType(self.type) self.created_at = convert_to_datetime(self.created_at) self.updated_at = convert_to_datetime(self.updated_at) self.archived_at = convert_to_datetime(self.archived_at) if self.summary is not None: self.summary = convert_to_dataclass(self.summary, DatasetSummary) if self.migration_status is not None: self.migration_status = MigrationStatus(self.migration_status)
[docs] @dataclass class Annotation(Base): id: str datum_id: str | None upload_id: str | None task_type: TaskType class_label: str | None contour: list[list[Point]] | None bbox: BoundingBox | None oriented_bbox: OrientedBoundingBox | None text_classification: TextClassification | None token_classification: TokenClassification | None text_generation: TextGeneration | None created_at: datetime updated_at: datetime archived_at: datetime | None archived_upload_id: str | None size: int | None approval_status: str metadata: dict[str, Any] | None = None previous_annotation_id: str | None = None datum_annotation_updated_at: str | None = None prev_datum_annotation_updated_at: str | None = None def __post_init__(self): super().__post_init__() self.task_type = TaskType(self.task_type) if self.contour is not None: self.contour = [convert_to_dataclass_list(points, Point) for points in self.contour] if self.bbox is not None: self.bbox = convert_to_dataclass(self.bbox, BoundingBox) if self.oriented_bbox is not None: self.oriented_bbox = convert_to_dataclass(self.oriented_bbox, OrientedBoundingBox) if self.text_classification is not None: self.text_classification = convert_to_dataclass( self.text_classification, TextClassification ) if self.token_classification is not None: self.token_classification = convert_to_dataclass( self.token_classification, TokenClassification ) if self.text_generation is not None: self.text_generation = convert_to_dataclass(self.text_generation, TextGeneration) self.created_at = convert_to_datetime(self.created_at) if self.archived_at is not None: self.archived_at = convert_to_datetime(self.archived_at)
[docs] @dataclass class Datum(Base): id: str coordinates: GeoPoint | None timestamp: datetime | None metadata: dict[str, Any] | None created_at: datetime archived_at: datetime | None dataset: Dataset | None annotations: list[Annotation] | None presigned_url: str signature: str size: int split: SplitName | None datum_annotation_updated_at: str | None = None def __post_init__(self): super().__post_init__() if self.coordinates is not None: self.coordinates = convert_to_dataclass(self.coordinates, GeoPoint) if self.timestamp is not None: self.timestamp = convert_to_datetime(self.timestamp) self.created_at = convert_to_datetime(self.created_at) if self.archived_at is not None: self.archived_at = convert_to_datetime(self.archived_at) if self.dataset is not None: self.dataset = convert_to_dataclass(self.dataset, Dataset) if self.annotations is not None: self.annotations = convert_to_dataclass_list(self.annotations, Annotation) if self.split is not None: self.split = SplitName(self.split)
[docs] @dataclass class ContextLabelFilter(Base): context: str | None = None labels: list[str] | None = None
[docs] @dataclass class TaskTypeLabelFilter(Base): task_type: TaskType labels: list[str] | None = None contexts: list[str | None] | None = None context_labels: list[ContextLabelFilter] | None = None def __post_init__(self): super().__post_init__() self.task_type = TaskType(self.task_type) if self.context_labels is not None: self.context_labels = convert_to_dataclass_list(self.context_labels, ContextLabelFilter)
[docs] @dataclass class TimestampRange(Base): start: datetime | None end: datetime | None def __post_init__(self): super().__post_init__() if self.start is not None: self.start = convert_to_datetime(self.start) if self.end is not None: self.end = convert_to_datetime(self.end)
[docs] def to_query_param(self) -> str: return f"({format_datetime_utc(self.start) if self.start else ''},{format_datetime_utc(self.end) if self.end else ''})"
[docs] @dataclass class DatumFilter(Base): task_type_label_filters: list[TaskTypeLabelFilter] | None gps_coordinates_circle: Circle | None gps_coordinates_rectangle: Rectangle | None gps_coordinates_polygon: list[GeoPoint] | None capture_timestamp_range: TimestampRange | None metadata: dict[str, str] | None asof_timestamp: datetime | None unannotated: bool | None datum_ids: list[str] | None approval_status: list[ApprovalStatus] | None annotation_metadata: dict[str, str] | None def __post_init__(self): super().__post_init__() if self.task_type_label_filters is not None: self.task_type_label_filters = convert_to_dataclass_list( self.task_type_label_filters, TaskTypeLabelFilter ) if self.gps_coordinates_circle is not None: self.gps_coordinates_circle = convert_to_dataclass(self.gps_coordinates_circle, Circle) if self.gps_coordinates_rectangle is not None: self.gps_coordinates_rectangle = convert_to_dataclass( self.gps_coordinates_rectangle, Rectangle ) if self.gps_coordinates_polygon is not None: self.gps_coordinates_polygon = convert_to_dataclass_list( self.gps_coordinates_polygon, GeoPoint ) if self.capture_timestamp_range is not None: self.capture_timestamp_range = convert_to_dataclass( self.capture_timestamp_range, TimestampRange ) if self.asof_timestamp is not None: self.asof_timestamp = convert_to_datetime(self.asof_timestamp)
@dataclass class DatumConfig(DatumFilter): @classmethod def from_partial(cls, **kwargs): fields = {f.name for f in cls.__dataclass_fields__.values()} return cls(**{k: kwargs.get(k, None) for k in fields})
[docs] @dataclass class SplitConfig(Base): sample_count: int | None split_algorithm: SplitAlgorithm | None apply_default_split: bool | None splits: dict[SplitName, float] | None
[docs] @dataclass class View(SplitConfig, DatumFilter): id: str name: str snapshot_count: int | None created_at: datetime updated_at: datetime archived_at: datetime | None = None archived_by: str | None = None dataset: Dataset | None = None def __post_init__(self): super().__post_init__() self.created_at = convert_to_datetime(self.created_at) self.updated_at = convert_to_datetime(self.updated_at) if self.archived_at is not None: self.archived_at = convert_to_datetime(self.archived_at) if self.dataset is not None: self.dataset = convert_to_dataclass(self.dataset, Dataset)
[docs] class SnapshotStatus(Enum): PENDING = "pending" COMPLETE = "complete" ERROR = "error" PREVIEW = "preview"
[docs] @dataclass class Snapshot(Base): id: str view: View name: str timestamp: datetime summary: DatasetSummary | None split_summaries: dict[SplitName, DatasetSummary] | None status: SnapshotStatus created_at: datetime | None updated_at: datetime | None def __post_init__(self): self.view = convert_to_dataclass(self.view, View) if self.timestamp is not None: self.timestamp = convert_to_datetime(self.timestamp) if self.summary is not None: self.summary = convert_to_dataclass(self.summary, DatasetSummary) if self.split_summaries is not None: self.split_summaries = dict( (SplitName(k), convert_to_dataclass(v, DatasetSummary)) for k, v in self.split_summaries.items() ) self.status = SnapshotStatus(self.status) if self.created_at is not None: self.created_at = convert_to_datetime(self.created_at) if self.updated_at is not None: self.updated_at = convert_to_datetime(self.updated_at)
[docs] @dataclass class DatasetStatistics(DatumStatistics): dataset_count: int total_datum_size: int largest_datum_size: int unannotated_datum_count: int
[docs] @dataclass class DatasetTimelineEvent(Base): event_timestamp: datetime dataset_id: str event_associated_record_id: str | None event_operation: str | None event_user_id: str | None datums_created: int | None datums_deleted: str | None datums_modified: str | None annotations_created: str | None annotations_deleted: str | None annotations_modified: str | None snapshots: list[Snapshot] | None event_group_num_timestamps: int event_group_num_users: int event_group_start_timestamp: datetime event_group_description: str def __post_init__(self): super().__post_init__() self.event_timestamp = convert_to_datetime(self.event_timestamp) self.event_group_start_timestamp = convert_to_datetime(self.event_group_start_timestamp) if self.snapshots is not None: self.snapshots = convert_to_dataclass_list(self.snapshots, Snapshot)
[docs] @dataclass class Job(Base): id: str type: JobType status: JobStatus progress_message: str | None dataset: Dataset | None # I don't _think_ there's anywhere in our api we would # actually return an upload and adding this would cause # a cyclical reference # upload: Optional[Upload] upload: Any | None file: Any | None view: View | None execution_count: int created_at: datetime updated_at: datetime start_after: datetime | None schedule_cron: str | None def __post_init__(self): super().__post_init__() self.type = JobType(self.type) self.status = JobStatus(self.status) if self.dataset is not None: self.dataset = convert_to_dataclass(self.dataset, Dataset) if self.file is not None: self.file = convert_to_dataclass(self.file, File) if self.view is not None: self.view = convert_to_dataclass(self.view, View) self.created_at = convert_to_datetime(self.created_at) self.updated_at = convert_to_datetime(self.updated_at) if self.start_after is not None: self.start_after = convert_to_datetime(self.start_after)
[docs] @dataclass class File(Base): id: str dataset: Dataset | None dataset_timestamp: datetime | None snapshot: Snapshot | None split: SplitName | None type: FileType manifest_type: ManifestType | None file_format: FileFormat presigned_url: str | None created_at: datetime updated_at: datetime archived_at: datetime | None expires_at: datetime | None job: Job | None status: FileStatus | None # TODO FEED-1156 change to required field after 0.9.0 patch def __post_init__(self): super().__post_init__() if self.dataset is not None: self.dataset = convert_to_dataclass(self.dataset, Dataset) if self.dataset_timestamp is not None: self.dataset_timestamp = convert_to_datetime(self.dataset_timestamp) if self.snapshot is not None: self.snapshot = convert_to_dataclass(self.snapshot, Snapshot) if self.split is not None: self.split = SplitName(self.split) self.type = FileType(self.type) if self.manifest_type is not None: self.manifest_type = ManifestType(self.manifest_type) self.file_format = FileFormat(self.file_format) self.created_at = convert_to_datetime(self.created_at) self.updated_at = convert_to_datetime(self.updated_at) if self.job is not None: self.job = convert_to_dataclass(self.job, Job) if self.status is not None: # TODO FEED-1156 change to required field after 0.9.0 patch self.status = FileStatus(self.status)
[docs] @dataclass class PresignedUrl(Base): method: str url: str
[docs] class VideoSamplingType(str, Enum): RATE = "rate" RATIO = "ratio" NONE = "none"
[docs] @dataclass class VideoSamplingOptions: sampling_type: VideoSamplingType sampling_value: int deinterlace: bool
[docs] @dataclass class Upload(Base): id: str job: Job | None type: UploadType is_gzipped: bool | None split: SplitName | None status: UploadStatus name: str | None size: int | None delete_source: bool max_validation_errors: int image_validation: bool validation_errors: list[str] | None created_at: datetime updated_at: datetime data_created_at: datetime | None presigned_urls: list[PresignedUrl] | None source_urls: list[str] | None datum_metadata: list[dict[str, Any]] | None dataset: Dataset | None video_options: VideoSamplingOptions | None def __post_init__(self): super().__post_init__() if self.job is not None: self.job = convert_to_dataclass(self.job, Job) self.type = UploadType(self.type) if self.split is not None: self.split = SplitName(self.split) self.status = UploadStatus(self.status) self.created_at = convert_to_datetime(self.created_at) self.updated_at = convert_to_datetime(self.updated_at) if self.data_created_at is not None: self.data_created_at = convert_to_datetime(self.data_created_at) if self.presigned_urls is not None: self.presigned_urls = convert_to_dataclass_list(self.presigned_urls, PresignedUrl) if self.dataset is not None: self.dataset = convert_to_dataclass(self.dataset, Dataset) if self.video_options is not None: self.video_options = convert_to_dataclass(self.video_options, VideoSamplingOptions)
[docs] @dataclass class UploadStatistics(Base): upload_count: int
[docs] @dataclass class DatasetConfig(Base): dataset_ids: list[str] | None = None dataset_names: list[str] | None = None exact_name_match: bool | None = None limit_to_write_access: bool | None = None dataset_type: str | None = None
[docs] @dataclass class DatumTask(Base): id: str name: str description: str | None created_at: datetime updated_at: datetime archived_at: datetime | None created_by: str updated_by: str archived_by: str | None project_id: str dataset_config: DatasetConfig | None datum_config: DatumConfig | None def __post_init__(self): super().__post_init__() self.created_at = convert_to_datetime(self.created_at) self.updated_at = convert_to_datetime(self.updated_at) if self.archived_at is not None: self.archived_at = convert_to_datetime(self.archived_at) if self.dataset_config is not None: self.dataset_config = convert_to_dataclass(self.dataset_config, DatasetConfig) if self.datum_config is not None: self.datum_config = convert_to_dataclass(self.datum_config, DatumConfig)
[docs] @dataclass class DatumTaskStatistics(Base): pass
[docs] @dataclass class DatumTaskDetails(DatumTask): statistics: DatumTaskStatistics def __post_init__(self): super().__post_init__() if self.statistics is not None: self.statistics = convert_to_dataclass(self.statistics, DatumTaskStatistics) def _base_task(self) -> DatumTask: props = asdict(self) del props["statistics"] return DatumTask(**props)
[docs] @dataclass class DatumTaskActivity(Base): dataset_id: str datum_id: str task_id: str user_id: str activity: DatumTaskActivityCode activity_start_time: datetime activity_end_time: datetime def __post_init__(self): super().__post_init__() self.activity = DatumTaskActivityCode(self.activity) self.activity_start_time = convert_to_datetime(self.activity_start_time) self.activity_end_time = convert_to_datetime(self.activity_end_time)