Source code for chariot.datasets.snapshots

from collections.abc import Generator
from datetime import datetime

from chariot import _apis
from chariot.datasets import _utils, models
from chariot_api._openapi.datasets_v3 import exceptions as api_exceptions
from chariot_api._openapi.datasets_v3 import models as openapi_models

__all__ = [
    "get_dataset_snapshots",
    "get_view_snapshots",
    "get_view_snapshot_count",
    "get_all_snapshots",
    "get_snapshot",
    "delete_snapshot",
    "delete_snapshot_and_wait",
    "create_snapshot",
    "create_snapshot_and_wait",
]


[docs] def get_dataset_snapshots( dataset_id: str, *, exact_name_match: bool | None = None, name: str | None = None, timestamp_interval: models.TimestampRange | None = None, snapshot_ids: list[str] | None = None, sort: models.SnapshotSortColumn | None = None, direction: models.SortDirection | None = None, max_items: int | None = None, ) -> Generator[models.Snapshot, None, None]: """Get a dataset's snapshots with optional filters. Returns a generator over all matching snapshots. :param dataset_id: Id of the dataset that the snapshots belong to :type dataset_id: str :param exact_name_match: Require name filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param name: Filter by snapshot name :type name: Optional[str] :param timestamp_interval: Filter by snapshots occurring during the intenval :type timestamp_interval: Optional[models.TimestampRange] :param snapshot_ids: Filter by snapshot ids :type snapshot_ids: Optional[List[str]] :param sort: How to sort the returned snapshots :type sort: Optional[models.SnapshotSortColumn] :param direction: Whether to sort in ascending or descending order :type direction: Optional[models.SortDirection] :param max_items: The maximum number of snapshots to return :type max_items: Optional[int] :return: Snapshot details for snapshots matching the criteria :rtype: Generator[models.Snapshot, None, None] """ params = locals() if "max_items" in params: del params["max_items"] if snapshot_ids: return iter( _get_dataset_snapshots( dataset_id=dataset_id, exact_name_match=exact_name_match, name=name, timestamp_interval=timestamp_interval, snapshot_ids=snapshot_ids, ) ) else: return _utils.paginate_items(_get_dataset_snapshots, params, max_items)
def _get_dataset_snapshots( dataset_id: str, *, exact_name_match: bool | None = None, name: str | None = None, timestamp_interval: models.TimestampRange | None = None, snapshot_ids: list[str] | None = None, limit: int | None = None, offset: int | None = None, sort: models.SnapshotSortColumn | None = None, direction: models.SortDirection | None = None, ) -> list[models.Snapshot]: response = _apis.datasets_v3.snapshots_api.get_dataset_snapshots( dataset_id=dataset_id, limit=limit, offset=offset, sort=_utils.enum_value(sort), direction=_utils.enum_value(direction), name=name, exact_name_match=exact_name_match, timestamp_interval=timestamp_interval.to_query_param() if timestamp_interval else None, snapshot_ids=snapshot_ids, ) if not response.data: return [] return [_utils.convert_to_dataclass(s.model_dump(), models.Snapshot) for s in response.data]
[docs] def get_view_snapshots( view_id: str, *, exact_name_match: bool | None = None, name: str | None = None, timestamp_interval: models.TimestampRange | None = None, snapshot_ids: list[str] | None = None, sort: models.SnapshotSortColumn | None = None, direction: models.SortDirection | None = None, max_items: int | None = None, ) -> Generator[models.Snapshot, None, None]: """Get a view's snapshots with optional filters. Returns a generator over all matching snapshots. :param view_id: Id of the view that the snapshots belong to :type view_id: str :param exact_name_match: Require name filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param name: Filter by snapshot name :type name: Optional[str] :param timestamp_interval: Filter by snapshots occurring during the intenval :type timestamp_interval: Optional[models.TimestampRange] :param snapshot_ids: Filter by snapshot ids :type snapshot_ids: Optional[List[str]] :param sort: How to sort the returned snapshots :type sort: Optional[models.SnapshotSortColumn] :param direction: Whether to sort in ascending or descending order :type direction: Optional[models.SortDirection] :param max_items: The maximum number of snapshots to return :type max_items: Optional[int] :return: Snapshot details for snapshots matching the criteria :rtype: Generator[models.Snapshot, None, None] """ params = locals() if "max_items" in params: del params["max_items"] if snapshot_ids: return iter( _get_view_snapshots( view_id=view_id, exact_name_match=exact_name_match, name=name, timestamp_interval=timestamp_interval, snapshot_ids=snapshot_ids, ) ) else: return _utils.paginate_items(_get_view_snapshots, params, max_items)
def _get_view_snapshots( view_id: str, *, exact_name_match: bool | None = None, name: str | None = None, timestamp_interval: models.TimestampRange | None = None, snapshot_ids: list[str] | None = None, limit: int | None = None, offset: int | None = None, sort: models.SnapshotSortColumn | None = None, direction: models.SortDirection | None = None, ) -> list[models.Snapshot]: response = _apis.datasets_v3.snapshots_api.get_view_snapshots( view_id=view_id, limit=limit, offset=offset, sort=_utils.enum_value(sort), direction=_utils.enum_value(direction), name=name, exact_name_match=exact_name_match, timestamp_interval=timestamp_interval.to_query_param() if timestamp_interval else None, snapshot_ids=snapshot_ids, ) if not response.data: return [] return [_utils.convert_to_dataclass(s.model_dump(), models.Snapshot) for s in response.data]
[docs] def get_view_snapshot_count( view_id: str, *, exact_name_match: bool | None = None, name: str | None = None, timestamp_interval: models.TimestampRange | None = None, snapshot_ids: list[str] | None = None, ) -> int: """Get number of snapshots for the given view id with optional filters. :param view_id: Id of the view that the snapshots belong to :type view_id: str :param exact_name_match: Require name filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param name: Filter by snapshot name :type name: Optional[str] :param timestamp_interval: Filter by snapshots occurring during the intenval :type timestamp_interval: Optional[models.TimestampRange] :param snapshot_ids: Filter by snapshot ids :type snapshot_ids: Optional[List[str]] :return: Number of snapshots matching the criteria :rtype: int """ response = _apis.datasets_v3.snapshots_api.get_view_snapshot_count( view_id=view_id, exact_name_match=exact_name_match, name=name, timestamp_interval=timestamp_interval.to_query_param() if timestamp_interval else None, snapshot_ids=snapshot_ids, ) return response.data or 0
[docs] def get_all_snapshots( *, exact_name_match: bool | None = None, name: str | None = None, timestamp_interval: models.TimestampRange | None = None, snapshot_ids: list[str] | None = None, sort: models.SnapshotSortColumn | None = None, direction: models.SortDirection | None = None, max_items: int | None = None, ) -> Generator[models.Snapshot, None, None]: """Get all snapshots with optional filters. Returns a generator over all matching snapshots. Only admin user can access this function :param exact_name_match: Require name filter to match exactly (defaults to false) :type exact_name_match: Optional[bool] :param name: Filter by snapshot name :type name: Optional[str] :param timestamp_interval: Filter by snapshots occurring during the intenval :type timestamp_interval: Optional[models.TimestampRange] :param snapshot_ids: Filter by snapshot ids :type snapshot_ids: Optional[List[str]] :param sort: How to sort the returned snapshots :type sort: Optional[models.SnapshotSortColumn] :param direction: Whether to sort in ascending or descending order :type direction: Optional[models.SortDirection] :param max_items: The maximum number of snapshots to return :type max_items: Optional[int] :return: Snapshot details for snapshots matching the criteria :rtype: Generator[models.Snapshot, None, None] """ params = locals() if "max_items" in params: del params["max_items"] if snapshot_ids: return iter( _get_all_snapshots( exact_name_match=exact_name_match, name=name, timestamp_interval=timestamp_interval, snapshot_ids=snapshot_ids, ) ) else: return _utils.paginate_items(_get_all_snapshots, params, max_items)
def _get_all_snapshots( *, exact_name_match: bool | None = None, name: str | None = None, timestamp_interval: models.TimestampRange | None = None, snapshot_ids: list[str] | None = None, limit: int | None = None, offset: int | None = None, sort: models.SnapshotSortColumn | None = None, direction: models.SortDirection | None = None, ) -> list[models.Snapshot]: response = _apis.datasets_v3.snapshots_api.get_all_snapshots( limit=limit, offset=offset, sort=_utils.enum_value(sort), direction=_utils.enum_value(direction), name=name, exact_name_match=exact_name_match, timestamp_interval=timestamp_interval.to_query_param() if timestamp_interval else None, snapshot_ids=snapshot_ids, ) if not response.data: return [] return [_utils.convert_to_dataclass(s.model_dump(), models.Snapshot) for s in response.data]
[docs] def get_snapshot(id: str) -> models.Snapshot: """Get a snapshot by id :param id: Snapshot id :type id: str :return: Snapshot details :rtype: models.Snapshot """ response = _apis.datasets_v3.snapshots_api.get_snapshot(snapshot_id=id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from get_snapshot") return _utils.convert_to_dataclass(response.data.model_dump(), models.Snapshot)
[docs] def delete_snapshot(id: str) -> None: """Delete a snapshot by id. This can only be done if the snapshot's status is still `PENDING`. This will only start the deletion process on the backend. You can call `get_snapshot` with the snapshot's ID and check for a `NotFoundException` to be raised to confirm deletion. To do this all in one call, use `delete_snapshot_and_wait`. :param id: Id of the snapshot to delete :type id: str """ _apis.datasets_v3.snapshots_api.delete_snapshot(snapshot_id=id)
[docs] def delete_snapshot_and_wait( id: str, *, timeout: float = 5, wait_interval: float = 0.5, ) -> None: """Delete a snapshot by id. This can only be done if the snapshot's status is still `PENDING`. The returned task will poll the snapshot to confirm deletion. Once this is successful, the task will return. :param id: Id of the snapshot to delete :type id: str :param timeout: Number of seconds to wait for snapshot deletion (default 5) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the snapshot for deletion (default 1) :type wait_interval: float :raises RuntimeError: If the timeout has been reached """ delete_snapshot(id) def snapshot_not_found_condition() -> tuple[bool, None]: try: get_snapshot(id) except api_exceptions.NotFoundException: return (True, None) return (False, None) return _utils.wait_for( snapshot_not_found_condition, f"Timed out waiting for deletion of snapshot {id} after {timeout} seconds", timeout, wait_interval, )
[docs] def create_snapshot( *, view_id: str, name: str, timestamp: datetime, is_dry_run: bool = False, ) -> models.Snapshot: """Creates a new snapshot for a view at the specified event timestamp. The newly created snapshot will be in status `PENDING` while datums are being assigned. You can call `get_snapshot` with the returned ID to check and see if the status is `COMPLETE`. To do this all in one call, use `create_snapshot_and_wait` :param view_id: Id of the view that the snapshot should belong to :type view_id: str :param name: Snapshot name :type name: str :param timestamp: Event timestamp that the snapshot should reflect :type timestamp: datetime :param is_dry_run: If set to true, the function will return a snapshot for preview with datum count for each split of the most recent snapshot if exists and expected datum count for each split of new snapshot, as well as, available datum counts from unassigned datums with or without default splits. Default is false. :type is_dry_run: bool :return: The newly created snapshot in a `PENDING` status :rtype: models.Snapshot """ request = openapi_models.InputCreateSnapshotRequest( name=name, timestamp=_utils.format_datetime_utc(timestamp), is_dry_run=is_dry_run ) response = _apis.datasets_v3.snapshots_api.create_snapshot(view_id=view_id, body=request) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from create_snapshot") return _utils.convert_to_dataclass(response.data.model_dump(), models.Snapshot)
[docs] def create_snapshot_and_wait( *, view_id: str, name: str, timestamp: datetime, timeout: float = 5, wait_interval: float = 0.5, ) -> models.Snapshot: """Creates a new snapshot for a view at the specified event timestamp and polls the API until the snapshot is in a `COMPLETE` status or the timeout is reached. :param view_id: Id of the view that the snapshot should belong to :type view_id: str :param name: Snapshot name :type name: str :param timestamp: Event timestamp that the snapshot should reflect :type timestamp: datetime :param timeout: Number of seconds to wait for snapshot completion (default 5) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the snapshot for completion (default 1) :type wait_interval: float :return: The `COMPLETE` snapshot after datums have been assigned. :rtype: models.Snapshot :raises RuntimeError: If the timeout has been reached """ params = locals() del params["wait_interval"] del params["timeout"] snapshot = create_snapshot(**params) def snapshot_status_complete_condition() -> tuple[bool, models.Snapshot]: nonlocal snapshot updated_snapshot = get_snapshot(snapshot.id) return (updated_snapshot.status == models.SnapshotStatus.COMPLETE, updated_snapshot) return _utils.wait_for( snapshot_status_complete_condition, f"Timed out waiting for COMPLETE status on snapshot {snapshot.id} after {timeout} seconds", timeout, wait_interval, )