Source code for chariot.datasets.views

from collections.abc import Generator, Iterator
from dataclasses import asdict

from chariot import _apis
from chariot.datasets import _utils, models
from chariot_api._openapi.datasets_v3 import models as openapi_models

__all__ = [
    "get_dataset_views",
    "get_dataset_view_count",
    "get_all_views",
    "get_all_view_count",
    "create_view",
    "get_view",
    "update_view",
    "delete_view",
    "get_view_timeline",
]


[docs] def get_dataset_views( dataset_id: str, *, name: str | None = None, exact_name_match: bool | None = None, view_ids: list[str] | None = None, sort: models.ViewSortColumn | None = None, direction: models.SortDirection | None = None, max_items: int | None = None, ) -> Generator[models.View, None, None]: """Get views for dataset id with various criteria. Returns a generator over all matching views :param dataset_id: Dataset ID to search for views in. :type dataset_id: str :param name: Filter by view name :type name: Optional[str] :param exact_name_match: Require name filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param view_ids: Filter by view ids :type view_ids: Optional[List[str]] :param sort: What column to sort the views by (defaults to name) :type sort: Optional[models.ViewSortColumn] :param direction: Whether to sort in ascending or descending order :type direction: Optional[models.SortDirection] :param max_items: Limit the returned generator to only produce this many items :type max_items: Optional[int] :return: View details for views matching the criteria :rtype: Generator[models.View, None, None] """ params = locals() if "max_items" in params: del params["max_items"] if view_ids: return iter( _get_dataset_views( dataset_id=dataset_id, name=name, exact_name_match=exact_name_match, view_ids=view_ids, ) ) else: return _utils.paginate_items(_get_dataset_views, params, max_items)
def _get_dataset_views( dataset_id: str, *, name: str | None = None, exact_name_match: bool | None = None, view_ids: list[str] | None = None, sort: models.ViewSortColumn | None = None, direction: models.SortDirection | None = None, limit: int | None = None, offset: int | None = None, ) -> list[models.View]: response = _apis.datasets_v3.views_api.get_dataset_views( dataset_id=dataset_id, name=name, exact_name_match=exact_name_match, view_ids=view_ids, direction=_utils.enum_value(direction), sort=_utils.enum_value(sort), limit=limit, offset=offset, ) if not response.data: return [] return [_utils.convert_to_dataclass(v.model_dump(), models.View) for v in response.data]
[docs] def get_all_views( *, name: str | None = None, exact_name_match: bool | None = None, view_ids: list[str] | None = None, sort: models.ViewSortColumn | None = None, direction: models.SortDirection | None = None, max_items: int | None = None, ) -> Generator[models.View, None, None]: """Get views for all datasets with various criteria. Returns a generator over all matching views Only admin user can access this function :param name: Filter by view name :type name: Optional[str] :param exact_name_match: Require name filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param view_ids: Filter by view ids :type view_ids: Optional[List[str]] :param sort: What column to sort the views by (defaults to name) :type sort: Optional[models.ViewSortColumn] :param direction: Whether to sort in ascending or descending order :type direction: Optional[models.SortDirection] :param max_items: Limit the returned generator to only produce this many items :type max_items: Optional[int] :return: View details for views matching the criteria :rtype: Generator[models.View, None, None] """ params = locals() if "max_items" in params: del params["max_items"] if view_ids: return iter( _get_all_views( name=name, exact_name_match=exact_name_match, view_ids=view_ids, ) ) else: return _utils.paginate_items(_get_all_views, params, max_items)
def _get_all_views( *, name: str | None = None, exact_name_match: bool | None = None, view_ids: list[str] | None = None, sort: models.ViewSortColumn | None = None, direction: models.SortDirection | None = None, limit: int | None = None, offset: int | None = None, ) -> list[models.View]: response = _apis.datasets_v3.views_api.get_all_views( name=name, exact_name_match=exact_name_match, view_ids=view_ids, direction=_utils.enum_value(direction), sort=_utils.enum_value(sort), limit=limit, offset=offset, ) if not response.data: return [] return [_utils.convert_to_dataclass(v.model_dump(), models.View) for v in response.data]
[docs] def create_view( *, dataset_id: str, name: str, split_algorithm: models.SplitAlgorithm | None = None, apply_default_split: bool | None = None, splits: dict[models.SplitName, float] | None = None, metadata: dict[str, str] | None = None, capture_timestamp_range: models.TimestampRange | None = None, gps_coordinates_circle: models.Circle | None = None, gps_coordinates_polygon: list[models.GeoPoint] | None = None, gps_coordinates_rectangle: models.Rectangle | None = None, task_type_label_filters: list[models.TaskTypeLabelFilter] | None = None, approval_status: list[str] | None = None, annotation_metadata: dict[str, str] | None = None, sample_count: int | None = None, ) -> models.View: """Create a new view in the dataset ID given. :param dataset_id: Id of dataset to create new view in :type dataset_id: str :param name: View name :type name: str :param split_algorithm: Splitting algorithm for the view (defaults to Random) :type split_algorithm: Optional[models.SplitAlgorithm], :param apply_default_split: Whether default splits are used when splitting (defaults to true) :type apply_default_split: Optional[bool] :param splits: Split weights for splitting datums in the view :type splits: Optional[Dict[models.SplitName, float]] :param metadata: Add metadata filter to view :type metadata: Optional[Dict[str, str]] :param capture_timestamp_range: Add capture timestamp range filter to view :type capture_timestamp_range: Optional[models.TimestampRange] :param gps_coordinates_circle: Add circle filter to view :type gps_coordinates_circle: Optional[models.Circle] :param gps_coordinates_polygon: Add polygon filter to view :type gps_coordinates_polygon: Optional[List[models.GeoPoint]] :param gps_coordinates_rectangle: Add rectangle filter to view :type gps_coordinates_rectangle: Optional[models.Rectangle] :param task_type_label_filters: Add filter for task types and associated labels to view :type task_type_label_filters: Optional[List[models.TaskTypeLabelFilter]] :param approval_status: Filter by annotation approval status :type approval_status: Optional[List[str]] :param annotation_metadata: Filter by annotation metadata values :type annotation_metadata: Optional[Dict[str, str]] :param sample_count: Sample count for the view :type sample_count: Optional[int] :return: View details for the newly created view :rtype: models.View """ if task_type_label_filters is not None: task_type_label_filters = [ asdict(f, dict_factory=_utils.dict_factory) for f in task_type_label_filters ] if gps_coordinates_circle is not None: gps_coordinates_circle = asdict(gps_coordinates_circle, dict_factory=_utils.dict_factory) if gps_coordinates_polygon is not None: gps_coordinates_polygon = [ asdict(p, dict_factory=_utils.dict_factory) for p in gps_coordinates_polygon ] if gps_coordinates_rectangle is not None: gps_coordinates_rectangle = asdict( gps_coordinates_rectangle, dict_factory=_utils.dict_factory ) if capture_timestamp_range is not None: capture_timestamp_range = asdict(capture_timestamp_range, dict_factory=_utils.dict_factory) request = openapi_models.InputCreateViewRequest( name=name, split_algorithm=_utils.enum_value(split_algorithm), apply_default_split=apply_default_split, capture_timestamp_range=capture_timestamp_range, gps_coordinates_circle=gps_coordinates_circle, gps_coordinates_polygon=gps_coordinates_polygon, gps_coordinates_rectangle=gps_coordinates_rectangle, task_type_label_filters=task_type_label_filters, splits=splits, metadata=metadata, approval_status=approval_status, annotation_metadata=annotation_metadata, sample_count=sample_count, ) response = _apis.datasets_v3.views_api.create_view(dataset_id=dataset_id, body=request) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from create_view") return _utils.convert_to_dataclass(response.data.model_dump(), models.View)
[docs] def get_dataset_view_count( dataset_id: str, *, name: str | None = None, exact_name_match: bool | None = None, view_ids: list[str] | None = None, ) -> int: """Get number of views in the dataset id given. :param dataset_id: Id of dataset to get number of views in :type dataset_id: str :param name: Filter views counted by name :type name: Optional[str] :param exact_name_match: Require name filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param view_ids: Filter by view ids :type view_ids: Optional[List[str]] :return: Number of views in the provided dataset id :rtype: int """ response = _apis.datasets_v3.views_api.get_dataset_view_count( dataset_id=dataset_id, name=name, exact_name_match=exact_name_match, view_ids=view_ids, ) return response.data or 0
[docs] def get_all_view_count( *, name: str | None = None, exact_name_match: bool | None = None, view_ids: list[str] | None = None, ) -> int: """Get number of views in the dataset id given. Only admin user can access this function :param name: Filter views counted by name :type name: Optional[str] :param exact_name_match: Require name filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param view_ids: Filter by view ids :type view_ids: Optional[List[str]] :return: Number of views in all datasets :rtype: int """ response = _apis.datasets_v3.views_api.get_all_view_count( name=name, exact_name_match=exact_name_match, view_ids=view_ids, ) return response.data or 0
[docs] def get_view(id: str) -> models.View: response = _apis.datasets_v3.views_api.get_view(view_id=id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from get_view") return _utils.convert_to_dataclass(response.data.model_dump(), models.View)
[docs] def update_view( *, id: str, name: str, ) -> models.View: """Update the name of a view by id. :param id: Id of view to update the name of :type id: str :param name: New name for the view :type name: str :return: View with same id but new name :rtype: models.View """ request = openapi_models.InputUpdateViewRequest(name=name) response = _apis.datasets_v3.views_api.update_view(view_id=id, body=request) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from update_view") return _utils.convert_to_dataclass(response.data.model_dump(), models.View)
[docs] def delete_view(id: str) -> models.View: """Delete a view by id. The artifacts for the view will be deleted as well. :param id: Id of view to delete :type id: str :return: View that was deleted :rtype: models.View """ response = _apis.datasets_v3.views_api.archive_view(view_id=id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from archive_view") return _utils.convert_to_dataclass(response.data.model_dump(), models.View)
[docs] def get_view_timeline( id: str, *, max_items: int | None = None, direction: models.SortDirection | None = None, since_last_snapshot: bool | None = None, min_groups: int | None = None, max_ungrouped_events: int | None = None, ) -> Iterator[models.DatasetTimelineEvent]: """Get a series of dataset change events affecting the given view ordered by time and grouped by event type. :param id: Id of view to get events for :type id: str :param max_items: Limit the returned generator to only produce this many items :type max_items: Optional[int] :param direction: Whether to sort in ascending or descending order :type direction: Optional[models.SortDirection] :param since_last_snapshot: Whether or not to only return events since the last snapshot for this view (defaults to false) :type since_last_snapshot: Optional[bool] :param min_groups: How many groups are required before grouping behavior is turned on :type min_groups: Optional[int] :param max_ungrouped_events: The maximum number of events allowed before grouping behavior is turned on :type max_ungrouped_events: Optional[int] :return: Events for the view :rtype: Iterator[models.DatasetTimelineEvent] """ if "upload_filter" in locals(): raise TypeError( "get_view_timeline no longer supports 'upload_filter' - use 'since_last_snapshot' instead" ) response = _apis.datasets_v3.views_api.get_view_timeline( view_id=id, limit=max_items, direction=_utils.enum_value(direction), since_last_snapshot=since_last_snapshot, min_groups=min_groups, max_ungrouped_events=max_ungrouped_events, ) data = response.data or [] return (_utils.convert_to_dataclass(d.model_dump(), models.DatasetTimelineEvent) for d in data)