"""Datasets models."""
from dataclasses import asdict, dataclass
from datetime import datetime
from enum import Enum, StrEnum
from typing import Any
from chariot.datasets._utils import (
Base,
convert_to_dataclass,
convert_to_dataclass_list,
convert_to_datetime,
format_datetime_utc,
)
__all__ = [
"SortDirection",
"DatasetSortColumn",
"DatumSortColumn",
"SnapshotSortColumn",
"ViewSortColumn",
"UploadSortColumn",
"TaskSortColumn",
"TaskType",
"ContextLabelFilter",
"TaskTypeLabelFilter",
"TimestampRange",
"DatumFilter",
"SplitName",
"SplitConfig",
"SplitAlgorithm",
"SnapshotStatus",
"Snapshot",
"GeoPoint",
"Circle",
"Rectangle",
"Distribution",
"DatumStatistics",
"Datum",
"Annotation",
"Point",
"BoundingBox",
"OrientedBoundingBox",
"TextClassification",
"TokenClassification",
"TextGeneration",
"ApprovalStatus",
"MigrationStatus",
"DatasetType",
"DatasetSummary",
"Dataset",
"DatasetStatistics",
"DatasetTimelineEvent",
"View",
"FileType",
"ManifestType",
"FileFormat",
"FileStatus",
"JobType",
"JobStatus",
"UploadStatus",
"UploadType",
"File",
"Job",
"PresignedUrl",
"Upload",
"UploadStatistics",
"VideoSamplingType",
"VideoSamplingOptions",
"DatumTask",
"DatasetConfig",
"DatumTaskDetails",
"DatumTaskStatistics",
"DatumTaskActivityCode",
"DatumTaskActivity",
"TaskActivitySortColumn",
]
[docs]
class SplitName(StrEnum):
TRAIN = "train"
VAL = "val"
TEST = "test"
[docs]
class SplitAlgorithm(Enum):
RANDOM = "random"
[docs]
class MigrationStatus(Enum):
IDENTIFIED = "identified"
PLANNED = "planned"
DOWNLOADING = "downloading"
UPLOADING_HORIZONTALS = "uploading_horizontals"
UPLOADING_VERTICAL = "uploading_vertical"
CLEANUP = "cleanup"
COMPLETE = "complete"
EXCEPTION = "exception"
ERROR = "error"
[docs]
class DatasetType(Enum):
IMAGE = "image"
TEXT = "text"
[docs]
class SortDirection(Enum):
ASCENDING = "asc"
DESCENDING = "desc"
[docs]
class DatasetSortColumn(Enum):
NAME = "name"
CREATION_TIMESTAMP = "creation timestamp"
UPDATED_TIMESTAMP = "updated timestamp"
DATUM_COUNT = "datum count"
[docs]
class ViewSortColumn(Enum):
NAME = "name"
CREATION_TIMESTAMP = "creation timestamp"
SAMPLE_COUNT = "sample count"
ID = "id"
[docs]
class DatumSortColumn(Enum):
CREATION_TIMESTAMP = "creation timestamp"
[docs]
class SnapshotSortColumn(Enum):
NAME = "name"
TIMESTAMP = "timestamp"
CREATION_TIMESTAMP = "creation timestamp"
ID = "id"
[docs]
class UploadSortColumn(Enum):
TYPE = "type"
STATUS = "status"
CREATION_TIMESTAMP = "creation timestamp"
[docs]
class TaskSortColumn(Enum):
NAME = "name"
ID = "id"
[docs]
class TaskActivitySortColumn(Enum):
ACTIVITY_START_TIME = "activity start time"
ACTIVITY_END_TIME = "activity end time"
[docs]
class TaskType(StrEnum):
IMAGE_CLASSIFICATION = "Image Classification"
OBJECT_DETECTION = "Object Detection"
ORIENTED_OBJECT_DETECTION = "Oriented Object Detection"
IMAGE_SEGMENTATION = "Image Segmentation"
TOKEN_CLASSIFICATION = "Token Classification"
TEXT_CLASSIFICATION = "Text Classification"
TEXT_GENERATION = "Text Generation"
[docs]
class JobType(Enum):
DELETE_DATASET = "delete_dataset"
UPLOAD = "upload"
DELETE_UPLOAD = "delete_upload"
FILE = "file"
DELETE_FILE = "delete_file"
SNAPSHOT = "snapshot"
[docs]
class JobStatus(Enum):
READY = "ready"
IN_PROGRESS = "in progress"
[docs]
class FileType(Enum):
ARCHIVE = "archive"
MANIFEST = "manifest"
[docs]
class ManifestType(Enum):
ANNOTATED = "annotated"
ALL = "all"
[docs]
class FileStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETE = "complete"
ERROR = "error"
ARCHIVED = "archived"
[docs]
class UploadType(Enum):
ARCHIVE = "archive"
DATUM = "datum"
INFERENCE = "inference"
VIDEO = "video"
TEXT = "text"
ANNOTATION = "annotation"
RAIC = "raic"
[docs]
class UploadStatus(Enum):
CREATED = "created"
PROCESSING = "processing"
COMPLETE = "complete"
ERROR = "error"
CLEANUP = "cleanup"
[docs]
class ApprovalStatus(Enum):
NOT_REVIEWED = ""
NEEDS_REVIEW = "needs_review"
VERIFIED = "verified"
REJECTED = "rejected"
[docs]
class DatumTaskActivityCode(StrEnum):
LOCKED = "locked"
VIEWED = "viewed"
SKIPPED = "skipped"
[docs]
@dataclass
class GeoPoint(Base):
latitude: float
longitude: float
[docs]
@dataclass
class Circle(Base):
center: GeoPoint
radius: float
[docs]
@dataclass
class Rectangle(Base):
p1: GeoPoint
p2: GeoPoint
[docs]
@dataclass
class Distribution(Base):
context: str | None
distribution: dict[str, int]
[docs]
@dataclass
class Point(Base):
x: float
y: float
[docs]
@dataclass
class BoundingBox(Base):
xmin: float
xmax: float
ymin: float
ymax: float
[docs]
@dataclass
class OrientedBoundingBox(Base):
cx: float
cy: float
w: float
h: float
r: float
[docs]
@dataclass
class TextClassification(Base):
context: str | None
label: str
[docs]
@dataclass
class TokenClassification(Base):
label: str
start: int
end: int
[docs]
@dataclass
class TextGeneration(Base):
context: str | None
generated_text: str | None
generated_text_presigned_url: str | None
[docs]
@dataclass
class DatumStatistics(Base):
datum_count: int
available_datum_count: int
new_datum_count: int
annotation_count: int
class_label_count: int
bounding_box_count: int
oriented_bounding_box_count: int
contour_count: int
text_classification_count: int
token_classification_count: int
text_generation_count: int
class_label_distribution: dict[str, int] | None
text_classification_distribution: list[Distribution] | None
token_classification_distribution: dict[str, int] | None
text_generation_distribution: dict[str, int] | None
annotation_count_by_approval_status: dict[str, int] | None
def __post_init__(self):
super().__post_init__()
if self.text_classification_distribution is not None:
self.text_classification_distribution = convert_to_dataclass_list(
self.text_classification_distribution, Distribution
)
[docs]
@dataclass
class DatasetSummary(DatumStatistics):
total_datum_size: int
largest_datum_size: int
unannotated_datum_count: int
[docs]
@dataclass
class Dataset(Base):
id: str
name: str
type: DatasetType
project_id: str
is_public: bool
is_test: bool
delete_lock: bool
created_at: datetime
updated_at: datetime
description: str | None = None
archived_at: datetime | None = None
archived_by: str | None = None
summary: DatasetSummary | None = None
migration_status: MigrationStatus | None = None
def __post_init__(self):
super().__post_init__()
self.type = DatasetType(self.type)
self.created_at = convert_to_datetime(self.created_at)
self.updated_at = convert_to_datetime(self.updated_at)
self.archived_at = convert_to_datetime(self.archived_at)
if self.summary is not None:
self.summary = convert_to_dataclass(self.summary, DatasetSummary)
if self.migration_status is not None:
self.migration_status = MigrationStatus(self.migration_status)
[docs]
@dataclass
class Annotation(Base):
id: str
datum_id: str | None
upload_id: str | None
task_type: TaskType
class_label: str | None
contour: list[list[Point]] | None
bbox: BoundingBox | None
oriented_bbox: OrientedBoundingBox | None
text_classification: TextClassification | None
token_classification: TokenClassification | None
text_generation: TextGeneration | None
created_at: datetime
updated_at: datetime
archived_at: datetime | None
archived_upload_id: str | None
size: int | None
approval_status: str
metadata: dict[str, Any] | None = None
previous_annotation_id: str | None = None
datum_annotation_updated_at: str | None = None
prev_datum_annotation_updated_at: str | None = None
def __post_init__(self):
super().__post_init__()
self.task_type = TaskType(self.task_type)
if self.contour is not None:
self.contour = [convert_to_dataclass_list(points, Point) for points in self.contour]
if self.bbox is not None:
self.bbox = convert_to_dataclass(self.bbox, BoundingBox)
if self.oriented_bbox is not None:
self.oriented_bbox = convert_to_dataclass(self.oriented_bbox, OrientedBoundingBox)
if self.text_classification is not None:
self.text_classification = convert_to_dataclass(
self.text_classification, TextClassification
)
if self.token_classification is not None:
self.token_classification = convert_to_dataclass(
self.token_classification, TokenClassification
)
if self.text_generation is not None:
self.text_generation = convert_to_dataclass(self.text_generation, TextGeneration)
self.created_at = convert_to_datetime(self.created_at)
if self.archived_at is not None:
self.archived_at = convert_to_datetime(self.archived_at)
[docs]
@dataclass
class Datum(Base):
id: str
coordinates: GeoPoint | None
timestamp: datetime | None
metadata: dict[str, Any] | None
created_at: datetime
archived_at: datetime | None
dataset: Dataset | None
annotations: list[Annotation] | None
presigned_url: str
signature: str
size: int
split: SplitName | None
datum_annotation_updated_at: str | None = None
def __post_init__(self):
super().__post_init__()
if self.coordinates is not None:
self.coordinates = convert_to_dataclass(self.coordinates, GeoPoint)
if self.timestamp is not None:
self.timestamp = convert_to_datetime(self.timestamp)
self.created_at = convert_to_datetime(self.created_at)
if self.archived_at is not None:
self.archived_at = convert_to_datetime(self.archived_at)
if self.dataset is not None:
self.dataset = convert_to_dataclass(self.dataset, Dataset)
if self.annotations is not None:
self.annotations = convert_to_dataclass_list(self.annotations, Annotation)
if self.split is not None:
self.split = SplitName(self.split)
[docs]
@dataclass
class ContextLabelFilter(Base):
context: str | None = None
labels: list[str] | None = None
[docs]
@dataclass
class TaskTypeLabelFilter(Base):
task_type: TaskType
labels: list[str] | None = None
contexts: list[str | None] | None = None
context_labels: list[ContextLabelFilter] | None = None
def __post_init__(self):
super().__post_init__()
self.task_type = TaskType(self.task_type)
if self.context_labels is not None:
self.context_labels = convert_to_dataclass_list(self.context_labels, ContextLabelFilter)
[docs]
@dataclass
class TimestampRange(Base):
start: datetime | None
end: datetime | None
def __post_init__(self):
super().__post_init__()
if self.start is not None:
self.start = convert_to_datetime(self.start)
if self.end is not None:
self.end = convert_to_datetime(self.end)
[docs]
def to_query_param(self) -> str:
return f"({format_datetime_utc(self.start) if self.start else ''},{format_datetime_utc(self.end) if self.end else ''})"
[docs]
@dataclass
class DatumFilter(Base):
task_type_label_filters: list[TaskTypeLabelFilter] | None
gps_coordinates_circle: Circle | None
gps_coordinates_rectangle: Rectangle | None
gps_coordinates_polygon: list[GeoPoint] | None
capture_timestamp_range: TimestampRange | None
metadata: dict[str, str] | None
asof_timestamp: datetime | None
unannotated: bool | None
datum_ids: list[str] | None
approval_status: list[ApprovalStatus] | None
annotation_metadata: dict[str, str] | None
def __post_init__(self):
super().__post_init__()
if self.task_type_label_filters is not None:
self.task_type_label_filters = convert_to_dataclass_list(
self.task_type_label_filters, TaskTypeLabelFilter
)
if self.gps_coordinates_circle is not None:
self.gps_coordinates_circle = convert_to_dataclass(self.gps_coordinates_circle, Circle)
if self.gps_coordinates_rectangle is not None:
self.gps_coordinates_rectangle = convert_to_dataclass(
self.gps_coordinates_rectangle, Rectangle
)
if self.gps_coordinates_polygon is not None:
self.gps_coordinates_polygon = convert_to_dataclass_list(
self.gps_coordinates_polygon, GeoPoint
)
if self.capture_timestamp_range is not None:
self.capture_timestamp_range = convert_to_dataclass(
self.capture_timestamp_range, TimestampRange
)
if self.asof_timestamp is not None:
self.asof_timestamp = convert_to_datetime(self.asof_timestamp)
@dataclass
class DatumConfig(DatumFilter):
@classmethod
def from_partial(cls, **kwargs):
fields = {f.name for f in cls.__dataclass_fields__.values()}
return cls(**{k: kwargs.get(k, None) for k in fields})
[docs]
@dataclass
class SplitConfig(Base):
sample_count: int | None
split_algorithm: SplitAlgorithm | None
apply_default_split: bool | None
splits: dict[SplitName, float] | None
[docs]
@dataclass
class View(SplitConfig, DatumFilter):
id: str
name: str
snapshot_count: int | None
created_at: datetime
updated_at: datetime
archived_at: datetime | None = None
archived_by: str | None = None
dataset: Dataset | None = None
def __post_init__(self):
super().__post_init__()
self.created_at = convert_to_datetime(self.created_at)
self.updated_at = convert_to_datetime(self.updated_at)
if self.archived_at is not None:
self.archived_at = convert_to_datetime(self.archived_at)
if self.dataset is not None:
self.dataset = convert_to_dataclass(self.dataset, Dataset)
[docs]
class SnapshotStatus(Enum):
PENDING = "pending"
COMPLETE = "complete"
ERROR = "error"
PREVIEW = "preview"
[docs]
@dataclass
class Snapshot(Base):
id: str
view: View
name: str
timestamp: datetime
summary: DatasetSummary | None
split_summaries: dict[SplitName, DatasetSummary] | None
status: SnapshotStatus
created_at: datetime | None
updated_at: datetime | None
def __post_init__(self):
self.view = convert_to_dataclass(self.view, View)
if self.timestamp is not None:
self.timestamp = convert_to_datetime(self.timestamp)
if self.summary is not None:
self.summary = convert_to_dataclass(self.summary, DatasetSummary)
if self.split_summaries is not None:
self.split_summaries = dict(
(SplitName(k), convert_to_dataclass(v, DatasetSummary))
for k, v in self.split_summaries.items()
)
self.status = SnapshotStatus(self.status)
if self.created_at is not None:
self.created_at = convert_to_datetime(self.created_at)
if self.updated_at is not None:
self.updated_at = convert_to_datetime(self.updated_at)
[docs]
@dataclass
class DatasetStatistics(DatumStatistics):
dataset_count: int
total_datum_size: int
largest_datum_size: int
unannotated_datum_count: int
[docs]
@dataclass
class DatasetTimelineEvent(Base):
event_timestamp: datetime
dataset_id: str
event_associated_record_id: str | None
event_operation: str | None
event_user_id: str | None
datums_created: int | None
datums_deleted: str | None
datums_modified: str | None
annotations_created: str | None
annotations_deleted: str | None
annotations_modified: str | None
snapshots: list[Snapshot] | None
event_group_num_timestamps: int
event_group_num_users: int
event_group_start_timestamp: datetime
event_group_description: str
def __post_init__(self):
super().__post_init__()
self.event_timestamp = convert_to_datetime(self.event_timestamp)
self.event_group_start_timestamp = convert_to_datetime(self.event_group_start_timestamp)
if self.snapshots is not None:
self.snapshots = convert_to_dataclass_list(self.snapshots, Snapshot)
[docs]
@dataclass
class Job(Base):
id: str
type: JobType
status: JobStatus
progress_message: str | None
dataset: Dataset | None
# I don't _think_ there's anywhere in our api we would
# actually return an upload and adding this would cause
# a cyclical reference
# upload: Optional[Upload]
upload: Any | None
file: Any | None
view: View | None
execution_count: int
created_at: datetime
updated_at: datetime
start_after: datetime | None
schedule_cron: str | None
def __post_init__(self):
super().__post_init__()
self.type = JobType(self.type)
self.status = JobStatus(self.status)
if self.dataset is not None:
self.dataset = convert_to_dataclass(self.dataset, Dataset)
if self.file is not None:
self.file = convert_to_dataclass(self.file, File)
if self.view is not None:
self.view = convert_to_dataclass(self.view, View)
self.created_at = convert_to_datetime(self.created_at)
self.updated_at = convert_to_datetime(self.updated_at)
if self.start_after is not None:
self.start_after = convert_to_datetime(self.start_after)
[docs]
@dataclass
class File(Base):
id: str
dataset: Dataset | None
dataset_timestamp: datetime | None
snapshot: Snapshot | None
split: SplitName | None
type: FileType
manifest_type: ManifestType | None
file_format: FileFormat
presigned_url: str | None
created_at: datetime
updated_at: datetime
archived_at: datetime | None
expires_at: datetime | None
job: Job | None
status: FileStatus | None # TODO FEED-1156 change to required field after 0.9.0 patch
def __post_init__(self):
super().__post_init__()
if self.dataset is not None:
self.dataset = convert_to_dataclass(self.dataset, Dataset)
if self.dataset_timestamp is not None:
self.dataset_timestamp = convert_to_datetime(self.dataset_timestamp)
if self.snapshot is not None:
self.snapshot = convert_to_dataclass(self.snapshot, Snapshot)
if self.split is not None:
self.split = SplitName(self.split)
self.type = FileType(self.type)
if self.manifest_type is not None:
self.manifest_type = ManifestType(self.manifest_type)
self.file_format = FileFormat(self.file_format)
self.created_at = convert_to_datetime(self.created_at)
self.updated_at = convert_to_datetime(self.updated_at)
if self.job is not None:
self.job = convert_to_dataclass(self.job, Job)
if self.status is not None: # TODO FEED-1156 change to required field after 0.9.0 patch
self.status = FileStatus(self.status)
[docs]
@dataclass
class PresignedUrl(Base):
method: str
url: str
[docs]
class VideoSamplingType(str, Enum):
RATE = "rate"
RATIO = "ratio"
NONE = "none"
[docs]
@dataclass
class VideoSamplingOptions:
sampling_type: VideoSamplingType
sampling_value: int
deinterlace: bool
[docs]
@dataclass
class Upload(Base):
id: str
job: Job | None
type: UploadType
is_gzipped: bool | None
split: SplitName | None
status: UploadStatus
name: str | None
size: int | None
delete_source: bool
max_validation_errors: int
image_validation: bool
validation_errors: list[str] | None
created_at: datetime
updated_at: datetime
data_created_at: datetime | None
presigned_urls: list[PresignedUrl] | None
source_urls: list[str] | None
datum_metadata: list[dict[str, Any]] | None
dataset: Dataset | None
video_options: VideoSamplingOptions | None
def __post_init__(self):
super().__post_init__()
if self.job is not None:
self.job = convert_to_dataclass(self.job, Job)
self.type = UploadType(self.type)
if self.split is not None:
self.split = SplitName(self.split)
self.status = UploadStatus(self.status)
self.created_at = convert_to_datetime(self.created_at)
self.updated_at = convert_to_datetime(self.updated_at)
if self.data_created_at is not None:
self.data_created_at = convert_to_datetime(self.data_created_at)
if self.presigned_urls is not None:
self.presigned_urls = convert_to_dataclass_list(self.presigned_urls, PresignedUrl)
if self.dataset is not None:
self.dataset = convert_to_dataclass(self.dataset, Dataset)
if self.video_options is not None:
self.video_options = convert_to_dataclass(self.video_options, VideoSamplingOptions)
[docs]
@dataclass
class UploadStatistics(Base):
upload_count: int
[docs]
@dataclass
class DatasetConfig(Base):
dataset_ids: list[str] | None = None
dataset_names: list[str] | None = None
exact_name_match: bool | None = None
limit_to_write_access: bool | None = None
dataset_type: str | None = None
[docs]
@dataclass
class DatumTask(Base):
id: str
name: str
description: str | None
created_at: datetime
updated_at: datetime
archived_at: datetime | None
created_by: str
updated_by: str
archived_by: str | None
project_id: str
dataset_config: DatasetConfig | None
datum_config: DatumConfig | None
def __post_init__(self):
super().__post_init__()
self.created_at = convert_to_datetime(self.created_at)
self.updated_at = convert_to_datetime(self.updated_at)
if self.archived_at is not None:
self.archived_at = convert_to_datetime(self.archived_at)
if self.dataset_config is not None:
self.dataset_config = convert_to_dataclass(self.dataset_config, DatasetConfig)
if self.datum_config is not None:
self.datum_config = convert_to_dataclass(self.datum_config, DatumConfig)
[docs]
@dataclass
class DatumTaskStatistics(Base):
pass
[docs]
@dataclass
class DatumTaskDetails(DatumTask):
statistics: DatumTaskStatistics
def __post_init__(self):
super().__post_init__()
if self.statistics is not None:
self.statistics = convert_to_dataclass(self.statistics, DatumTaskStatistics)
def _base_task(self) -> DatumTask:
props = asdict(self)
del props["statistics"]
return DatumTask(**props)
[docs]
@dataclass
class DatumTaskActivity(Base):
dataset_id: str
datum_id: str
task_id: str
user_id: str
activity: DatumTaskActivityCode
activity_start_time: datetime
activity_end_time: datetime
def __post_init__(self):
super().__post_init__()
self.activity = DatumTaskActivityCode(self.activity)
self.activity_start_time = convert_to_datetime(self.activity_start_time)
self.activity_end_time = convert_to_datetime(self.activity_end_time)