Source code for chariot.datasets.uploads

import os
import tarfile
import tempfile
from collections.abc import Generator
from typing import Any

import requests

from chariot import _apis
from chariot.config import getLogger, settings
from chariot.datasets import _utils, exceptions, models
from chariot_api._openapi.datasets_v3 import exceptions as api_exceptions
from chariot_api._openapi.datasets_v3 import models as openapi_models

__all__ = [
    "get_uploads",
    "get_upload_statistics",
    "get_upload",
    "upload_file",
    "upload_file_and_wait",
    "upload_folder",
    "upload_folder_and_wait",
    "upload_bytes",
    "upload_bytes_and_wait",
    "upload_files_from_urls",
    "upload_files_from_urls_and_wait",
    "delete_upload",
    "delete_upload_and_wait",
    "wait_for_upload",
    "retry_upload",
    "retry_upload_and_wait",
]

log = getLogger(__name__)

DEFAULT_UPLOAD_WAIT_TIMEOUT = 3600
DEFAULT_UPLOAD_WAIT_INTERVAL = 0.5


[docs] def get_uploads( dataset_id: str, *, type: list[models.UploadType] | None = None, status: list[models.UploadStatus] | None = None, sort: models.UploadSortColumn | None = None, direction: models.SortDirection | None = None, max_items: int | None = None, ) -> Generator[models.Upload, None, None]: """Get uploads for a dataset :param dataset_id: Id of the dataset to get uploads for :type dataset_id: str :param type: Filter snapshots by upload type :type type: Optional[models.UploadType] :param status: Filter snapshots by upload status :type name: Optional[models.UploadStatus] :param sort: How to sort the uploads :type sort: Optional[models.UploadSortColumn] :param direction: Whether to sort in ascending or descending order :type direction: Optional[models.SortDirection] :param max_items: The maximum number of uploads to return :type max_items: Optional[int] :return: Upload details for uploads lmatching the criteria :rtype: Generator[models.Upload, None, None] """ params = locals() if "max_items" in params: del params["max_items"] return _utils.paginate_items(_get_uploads, params, max_items)
def _get_uploads( dataset_id: str, *, type: list[models.UploadType] | None = None, status: list[models.UploadStatus] | None = None, sort: models.UploadSortColumn | None = None, direction: models.SortDirection | None = None, limit: int | None = None, offset: int | None = None, ) -> list[models.Upload]: response = _apis.datasets_v3.uploads_api.get_uploads( dataset_id=dataset_id, limit=limit, offset=offset, sort=_utils.enum_value(sort), direction=_utils.enum_value(direction), type=[_utils.enum_value(t) for t in type] if type is not None else None, status=[_utils.enum_value(s) for s in status] if status is not None else None, ) if not response.data: return [] return [_utils.convert_to_dataclass(s.model_dump(), models.Upload) for s in response.data]
[docs] def get_upload_statistics( *, dataset_id: str, type: list[models.UploadType] | None = None, status: list[models.UploadStatus] | None = None, ) -> models.UploadStatistics: """Get upload statistics with various criteria. :param dataset_id: Id of the dataset to get uploads for :type dataset_id: str :param type: Filter snapshots by upload type :type type: Optional[models.UploadType] :param status: Filter snapshots by upload status :return: Statistics of uploads matching the criteria :rtype: models.UploadStatistics """ response = _apis.datasets_v3.uploads_api.upload_statistics( dataset_id=dataset_id, type=[_utils.enum_value(t) for t in type] if type is not None else None, status=[_utils.enum_value(s) for s in status] if status is not None else None, ) if not response.data: raise RuntimeError( "Received malformed response (missing `data`) from get_upload_statistics" ) return _utils.convert_to_dataclass(response.data.model_dump(), models.UploadStatistics)
[docs] def get_upload(id: str) -> models.Upload: response = _apis.datasets_v3.uploads_api.get_upload(upload_id=id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from get_upload") return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload)
[docs] def wait_for_upload( id: str, *, timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT, wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL, ) -> models.Upload: """Polls the given upload until it has finished processing. :param id: Id of the upload to wait for :type id: str :param timeout: Number of seconds to wait for upload to complete (default 3600) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5) :type wait_interval: float :return: The upload details :rtype: models.Upload :raises TimeoutError: If the timeout has been reached :raises exceptions.UploadValidationError: If the upload fails and has validation errors :raises exceptions.UploadUnknownError: If the upload fails without a specified reason :raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried """ def upload_terminal_status() -> tuple[bool, models.Upload | None]: upload = get_upload(id) if not upload.job: return (True, upload) return (False, None) upload = _utils.wait_for( upload_terminal_status, f"Timed out waiting for upload {id} to be in terminal status after {timeout} seconds", timeout, wait_interval, ) if upload.status in [models.UploadStatus.COMPLETE, models.UploadStatus.CLEANUP]: return upload elif upload.status == models.UploadStatus.ERROR: if upload.validation_errors: raise exceptions.UploadValidationError(upload.id, upload.validation_errors) raise exceptions.UploadUnknownError(upload.id) else: raise exceptions.UploadIncompleteError(upload.id, upload.status)
def _create_upload( dataset_id: str, type: models.UploadType, size: int | None, is_gzipped: bool | None, delete_source: bool | None, max_validation_errors: int | None, image_validation: bool | None, split: models.SplitName | None, source_urls: list[str] | None, datum_metadata: list[dict[str, Any]] | None, annotations_url: str | None, video_sampling_type: models.VideoSamplingType | None, video_sampling_value: float | None, video_deinterlace: bool | None, ) -> models.Upload: video_opts = None if type == models.UploadType.VIDEO: video_opts = openapi_models.ModelCreateUploadVideoOptions( deinterlace=video_deinterlace, frame_sample_rate=video_sampling_value if video_sampling_type == models.VideoSamplingType.RATE else 0 if video_sampling_type == models.VideoSamplingType.NONE else None, frame_sample_ratio=video_sampling_value if video_sampling_type == models.VideoSamplingType.RATIO else None, ) request = openapi_models.InputCreateUploadRequest( type=_utils.enum_value(type), size=size, is_gzipped=is_gzipped, delete_source=delete_source, max_validation_errors=max_validation_errors, image_validation=image_validation, split=_utils.enum_value(split), source_urls=source_urls, datum_metadata=datum_metadata, annotations_url=annotations_url, video_options=video_opts, ) response = _apis.datasets_v3.uploads_api.create_upload(dataset_id=dataset_id, body=request) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from _create_upload") return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload) def _complete_upload( upload_id: str, etags: list[str], ) -> models.Upload: request = openapi_models.InputCompleteUploadRequest(etags=etags) response = _apis.datasets_v3.uploads_api.complete_upload(upload_id=upload_id, body=request) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from _create_upload") return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload) def _upload( dataset_id: str, type: models.UploadType, file: _utils.Path | bytes | None, delete_source: bool | None, max_validation_errors: int | None, image_validation: bool | None, split: models.SplitName | None, source_urls: list[str] | None, datum_metadata: list[dict[str, Any]] | None, annotations_url: str | None, video_sampling_type: models.VideoSamplingType | None = None, video_sampling_value: float | None = None, video_deinterlace: bool | None = None, ) -> models.Upload: size = None if file: if isinstance(file, _utils.Path): size = os.path.getsize(file) else: size = len(file) is_gzipped = None if type == models.UploadType.ANNOTATION: if isinstance(file, _utils.Path): with open(file, "rb") as f: bytes = f.read(2) else: bytes = file[:2] # Check for gzip magic number is_gzipped = bytes == b"\x1f\x8b" upload = _create_upload( dataset_id=dataset_id, type=type, size=size, is_gzipped=is_gzipped, delete_source=delete_source, max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, source_urls=source_urls, datum_metadata=datum_metadata, annotations_url=annotations_url, video_sampling_type=video_sampling_type, video_sampling_value=video_sampling_value, video_deinterlace=video_deinterlace, ) if not source_urls: try: if not upload.presigned_urls: raise RuntimeError("Received no presigned urls from _create_upload") etags = [] with _utils.chunks( num_chunks=len(upload.presigned_urls), total_size=size, file=file ) as chunks: for chunk, url in zip(chunks, upload.presigned_urls): response = requests.request( method=url.method, url=url.url, data=chunk, verify=settings.verify_ssl ) response.raise_for_status() if "ETag" not in response.headers: raise RuntimeError("Bad response from uploading, missing ETag") etags.append(response.headers["ETag"]) upload = _complete_upload(upload_id=upload.id, etags=etags) except: # Attempt to clean up the failed upload try: delete_upload(upload.id) except: log.exception("Error cleaning up failed upload") raise return upload
[docs] def upload_file( dataset_id: str, *, type: models.UploadType, path: str, max_validation_errors: int | None = None, image_validation: bool | None = None, split: models.SplitName | None = None, datum_metadata: dict[str, Any] | None = None, video_sampling_type: models.VideoSamplingType | None = None, video_sampling_value: float | None = None, video_deinterlace: bool | None = None, ) -> models.Upload: """Uploads a single file. Does not wait for the upload to complete processing. :param dataset_id: Id of the dataset to upload to :type dataset_id: str :param type: The type of file being uploaded. :type type: models.UploadType :param path: Path of file to upload :type path: str :param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload :type max_validation_errors: Optional[int] :param image_validation: Whether or not to perform extra validations on image datums :type image_validation: Optional[bool] :param split: Name of split to upload datums to. :type split: Optional[models.SplitName] :param datum_metadata: When uploading a single datum (type=models.UploadType.DATUM), include custom metadata on this datum :type datum_metadata: Optional[Dict[str, Any]] :param video_sampling_type: When uploading a video, optionally control how frames are sampled (at a constant rate, by a ratio of the videos frame rate, or none [all frames are extracted]) :type video_sampling_type: Optional[models.VideoSamplingType] :param video_sampling_value: When uploading a video with a video_sampling_type of VideoSamplingType.RATE or VideoSamplingType.RATIO, this value controls the rate or ratio of sampling (either an FPS value or a multiplier for the video's FPS, respectively) :type video_sampling_value: Optional[float] :param video_deinterlace: When uploading a video, optionally have a deinterlacing filter applied prior to extracting frames :type video_deinterlace: Optional[bool] :return: The upload details :rtype: models.Upload """ return _upload( dataset_id=dataset_id, type=type, file=_utils.Path(path), max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, delete_source=None, source_urls=None, datum_metadata=[datum_metadata] if datum_metadata else None, annotations_url=None, video_sampling_type=video_sampling_type, video_sampling_value=video_sampling_value, video_deinterlace=video_deinterlace, )
[docs] def upload_file_and_wait( dataset_id: str, *, type: models.UploadType, path: str, max_validation_errors: int | None = None, image_validation: bool | None = None, split: models.SplitName | None = None, datum_metadata: dict[str, Any] | None = None, video_sampling_type: models.VideoSamplingType | None = None, video_sampling_value: float | None = None, video_deinterlace: bool | None = None, timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT, wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL, ) -> models.Upload: """Uploads a single file, and waits for the upload to complete processing. :param dataset_id: Id of the dataset to upload to :type dataset_id: str :param type: The type of file being uploaded. :type type: models.UploadType :param path: Path of file to upload :type path: str :param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload :type max_validation_errors: Optional[int] :param image_validation: Whether or not to perform extra validations on image datums :type image_validation: Optional[bool] :param split: Name of split to upload datums to. :type split: Optional[models.SplitName] :param datum_metadata: When uploading a single datum (type=models.UploadType.DATUM), include custom metadata on this datum :type datum_metadata: Optional[Dict[str, Any]] :param video_sampling_type: When uploading a video, optionally control how frames are sampled (at a constant rate, by a ratio of the videos frame rate, or none [all frames are extracted]) :type video_sampling_type: Optional[models.VideoSamplingType] :param video_sampling_value: When uploading a video with a video_sampling_type of VideoSamplingType.RATE or VideoSamplingType.RATIO, this value controls the rate or ratio of sampling (either an FPS value or a multiplier for the video's FPS, respectively) :type video_sampling_value: Optional[float] :param video_deinterlace: When uploading a video, optionally have a deinterlacing filter applied prior to extracting frames :type video_deinterlace: Optional[bool] :param timeout: Number of seconds to wait for upload to complete (default 3600) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5) :type wait_interval: float :return: The upload details :rtype: models.Upload :raises TimeoutError: If the timeout has been reached :raises exceptions.UploadValidationError: If the upload fails and has validation errors :raises exceptions.UploadUnknownError: If the upload fails without a specified reason :raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried """ upload = upload_file( dataset_id=dataset_id, type=type, path=path, max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, datum_metadata=datum_metadata, video_sampling_type=video_sampling_type, video_sampling_value=video_sampling_value, video_deinterlace=video_deinterlace, ) return wait_for_upload(upload.id, timeout=timeout, wait_interval=wait_interval)
[docs] def upload_folder( dataset_id: str, *, path: str, max_validation_errors: int | None = None, image_validation: bool | None = None, split: models.SplitName | None = None, ) -> models.Upload: """Uploads the contents of a folder. Equivalent to creating an archive from that folder and then uploading that archive with type=UploadType.ARCHIVE. Does not wait for the upload to complete processing. :param dataset_id: Id of the dataset to upload to :type dataset_id: str :param path: Path of folder to upload :type path: str :param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload :type max_validation_errors: Optional[int] :param image_validation: Whether or not to perform extra validations on image datums :type image_validation: Optional[bool] :param split: Name of split to upload datums to. :type split: Optional[models.SplitName] :return: The upload details :rtype: models.Upload """ with tempfile.NamedTemporaryFile() as tmp, tarfile.open(fileobj=tmp, mode="w:gz") as tar: empty = True for dir, _, files in os.walk(path): reldir = os.path.relpath(dir, path) for file in files: empty = False tar.add(os.path.join(dir, file), os.path.join(reldir, file)) if empty: raise RuntimeError(f"No files found in the folder `{path}` or its subfolders.") tar.close() tmp.flush() return upload_file( dataset_id=dataset_id, type=models.UploadType.ARCHIVE, path=tmp.name, max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, )
[docs] def upload_folder_and_wait( dataset_id: str, *, path: str, max_validation_errors: int | None = None, image_validation: bool | None = None, split: models.SplitName | None = None, timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT, wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL, ) -> models.Upload: """Uploads the contents of a folder. Equivalent to creating an archive from that folder and then uploading that archive with type=UploadType.ARCHIVE. Waits for the upload to complete processing. :param dataset_id: Id of the dataset to upload to :type dataset_id: str :param path: Path of folder to upload :type path: str :param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload :type max_validation_errors: Optional[int] :param image_validation: Whether or not to perform extra validations on image datums :type image_validation: Optional[bool] :param split: Name of split to upload datums to. :type split: Optional[models.SplitName] :param timeout: Number of seconds to wait for upload to complete (default 3600) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5) :type wait_interval: float :return: The upload details :rtype: models.Upload :raises TimeoutError: If the timeout has been reached :raises exceptions.UploadValidationError: If the upload fails and has validation errors :raises exceptions.UploadUnknownError: If the upload fails without a specified reason :raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried """ upload = upload_folder( dataset_id=dataset_id, path=path, max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, ) return wait_for_upload(upload.id, timeout=timeout, wait_interval=wait_interval)
[docs] def upload_bytes( dataset_id: str, *, type: models.UploadType, data: bytes, max_validation_errors: int | None = None, image_validation: bool | None = None, split: models.SplitName | None = None, datum_metadata: dict[str, Any] | None = None, video_sampling_type: models.VideoSamplingType | None = None, video_sampling_value: float | None = None, video_deinterlace: bool | None = None, ) -> models.Upload: """Uploads a set of bytes as a single file. Does not wait for the upload to complete processing. :param dataset_id: Id of the dataset to upload to :type dataset_id: str :param type: The type of file being uploaded. :type type: models.UploadType :param data: Bytes to upload :type data: bytes :param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload :type max_validation_errors: Optional[int] :param image_validation: Whether or not to perform extra validations on image datums :type image_validation: Optional[bool] :param split: Name of split to upload datums to. :type split: Optional[models.SplitName] :param datum_metadata: When uploading a single datum (type=models.UploadType.DATUM), include custom metadata on this datum :type datum_metadata: Optional[Dict[str, Any]] :param video_sampling_type: When uploading a video, optionally control how frames are sampled (at a constant rate, by a ratio of the videos frame rate, or none [all frames are extracted]) :type video_sampling_type: Optional[models.VideoSamplingType] :param video_sampling_value: When uploading a video with a video_sampling_type of VideoSamplingType.RATE or VideoSamplingType.RATIO, this value controls the rate or ratio of sampling (either an FPS value or a multiplier for the video's FPS, respectively) :type video_sampling_value: Optional[float] :param video_deinterlace: When uploading a video, optionally have a deinterlacing filter applied prior to extracting frames :type video_deinterlace: Optional[bool] :return: The upload details :rtype: models.Upload """ return _upload( dataset_id=dataset_id, type=type, file=data, max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, delete_source=None, source_urls=None, datum_metadata=[datum_metadata] if datum_metadata else None, annotations_url=None, video_sampling_type=video_sampling_type, video_sampling_value=video_sampling_value, video_deinterlace=video_deinterlace, )
[docs] def upload_bytes_and_wait( dataset_id: str, *, type: models.UploadType, data: bytes, max_validation_errors: int | None = None, image_validation: bool | None = None, split: models.SplitName | None = None, datum_metadata: dict[str, Any] | None = None, video_sampling_type: models.VideoSamplingType | None = None, video_sampling_value: float | None = None, video_deinterlace: bool | None = None, timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT, wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL, ) -> models.Upload: """Uploads a set of bytes as a single file, and waits for the upload to complete processing. :param dataset_id: Id of the dataset to upload to :type dataset_id: str :param type: The type of file being uploaded. :type type: models.UploadType :param data: Bytes to upload :type data: bytes :param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload :type max_validation_errors: Optional[int] :param image_validation: Whether or not to perform extra validations on image datums :type image_validation: Optional[bool] :param split: Name of split to upload datums to. :type split: Optional[models.SplitName] :param datum_metadata: When uploading a single datum (type=models.UploadType.DATUM), include custom metadata on this datum :type datum_metadata: Optional[Dict[str, Any]] :param video_sampling_type: When uploading a video, optionally control how frames are sampled (at a constant rate, by a ratio of the videos frame rate, or none [all frames are extracted]) :type video_sampling_type: Optional[models.VideoSamplingType] :param video_sampling_value: When uploading a video with a video_sampling_type of VideoSamplingType.RATE or VideoSamplingType.RATIO, this value controls the rate or ratio of sampling (either an FPS value or a multiplier for the video's FPS, respectively) :type video_sampling_value: Optional[float] :param video_deinterlace: When uploading a video, optionally have a deinterlacing filter applied prior to extracting frames :type video_deinterlace: Optional[bool] :param timeout: Number of seconds to wait for upload to complete (default 3600) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5) :type wait_interval: float :return: The upload details :rtype: models.Upload :raises TimeoutError: If the timeout has been reached :raises exceptions.UploadValidationError: If the upload fails and has validation errors :raises exceptions.UploadUnknownError: If the upload fails without a specified reason :raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried """ upload = upload_bytes( dataset_id=dataset_id, type=type, data=data, max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, datum_metadata=[datum_metadata] if datum_metadata else None, video_sampling_type=video_sampling_type, video_sampling_value=video_sampling_value, video_deinterlace=video_deinterlace, ) return wait_for_upload(upload.id, timeout=timeout, wait_interval=wait_interval)
[docs] def upload_files_from_urls( dataset_id: str, *, type: models.UploadType, source_urls: list[str], source_urls_datum_metadata: list[dict[str, Any]] | None = None, annotations_url: str | None = None, max_validation_errors: int | None = None, image_validation: bool | None = None, split: models.SplitName | None = None, ) -> models.Upload: """Uploads a list of urls to a dataset as individual datums. Does not wait for the upload to complete processing. :param type: The type of file being uploaded. Must be one of models.UploadType.{ARCHIVE|DATUM} :type type: models.UploadType :param source_urls: List of URLs from which the datums are read. len() must be equal to 1 for ARCHIVE upload type. :type source_urls: List[str] :param source_urls_datum_metadata: When uploading individual datums (type=models.UploadType.DATUM), include custom metadata for datums created by each URL. List index should match the desired source_urls list index, empty array elements should include empty Dicts. :type source_urls_datum_metadata: Optional[List[Dict[str, Any]]] :param annotations_url: URL from which a gzipped annotations file in jsonl format will be downloaded and processed along datums from source_urls. Attribute path in the annotations file will be datum index in source_urls. :type annotations_url: Optional[str] :param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload :type max_validation_errors: Optional[int] :param image_validation: Whether or not to perform extra validations on image datums :type image_validation: Optional[bool] :param split: Name of split to upload datums to. :type split: Optional[models.SplitName] :return: The upload details :rtype: models.Upload """ return _upload( dataset_id=dataset_id, type=type, file=None, max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, delete_source=None, source_urls=source_urls, datum_metadata=source_urls_datum_metadata, annotations_url=annotations_url, )
[docs] def upload_files_from_urls_and_wait( dataset_id: str, *, type: models.UploadType, source_urls: list[str], source_urls_datum_metadata: list[dict[str, Any]] | None = None, annotations_url: str | None = None, max_validation_errors: int | None = None, image_validation: bool | None = None, split: models.SplitName | None = None, timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT, wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL, ) -> models.Upload: """Uploads a set of bytes as a single file, and waits for the upload to complete processing. :param dataset_id: Id of the dataset to upload to :type dataset_id: str :param type: The type of file being uploaded. Must be one of models.UploadType.{ARCHIVE|DATUM} :type type: models.UploadType :param source_urls: List of URLs from which the datums are read. len() must be equal to 1 for ARCHIVE upload type. :type source_urls: List[str] :param source_urls_datum_metadata: When uploading individual datums (type=models.UploadType.DATUM), include custom metadata for datums created by each URL. List index should match the desired source_urls list index and empty array elements should include empty Dicts. :type source_urls_datum_metadata: Optional[List[Dict[str, Any]]] :param annotations_url: URL from which a gzipped annotations file in jsonl format will be downloaded and processed along datums from source_urls. Attribute path in the annotations file will be datum index in source_urls. :type annotations_url: Optional[str] :param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload :type max_validation_errors: Optional[int] :param image_validation: Whether or not to perform extra validations on image datums :type image_validation: Optional[bool] :param split: Name of split to upload datums to. :type split: Optional[models.SplitName] :param timeout: Number of seconds to wait for upload to complete (default 3600) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5) :type wait_interval: float :return: The upload details :rtype: models.Upload :raises TimeoutError: If the timeout has been reached :raises exceptions.UploadValidationError: If the upload fails and has validation errors :raises exceptions.UploadUnknownError: If the upload fails without a specified reason :raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried """ upload = upload_files_from_urls( dataset_id=dataset_id, type=type, source_urls=source_urls, source_urls_datum_metadata=source_urls_datum_metadata, annotations_url=annotations_url, max_validation_errors=max_validation_errors, image_validation=image_validation, split=split, ) return wait_for_upload(upload.id, timeout=timeout, wait_interval=wait_interval)
[docs] def delete_upload(id: str) -> models.Upload: """Delete an upload by id. This can only be done if the upload's status is not `COMPLETE` or `CLEANUP`. :param id: Id of the upload to delete :type id: str :return: The upload details :rtype: models.Upload """ response = _apis.datasets_v3.uploads_api.delete_upload(upload_id=id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from delete_upload") return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload)
[docs] def delete_upload_and_wait( id: str, *, timeout: float = 5, wait_interval: float = 0.5, ) -> None: """Delete an upload by id. This can only be done if the upload's status is not `COMPLETE` or `CLEANUP`. Polls for the upload, blocking until the upload has been deleted or the timeout has been reached. :param id: Id of the upload to delete :type id: str :param timeout: Number of seconds to wait for snapshot deletion (default 5) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the upload for deletion (default 0.5) :type wait_interval: float :raises TimeoutError: If the timeout has been reached """ delete_upload(id) def upload_not_found_condition() -> tuple[bool, None]: try: get_upload(id) except api_exceptions.NotFoundException: return (True, None) return (False, None) return _utils.wait_for( upload_not_found_condition, f"Timed out waiting for deletion of upload {id} after {timeout} seconds", timeout, wait_interval, )
[docs] def retry_upload(id: str) -> models.Upload: """Retry processing of an upload that previously did not succeed. :param id: Id of the upload to delete :type id: str :return: The upload details :rtype: models.Upload """ response = _apis.datasets_v3.uploads_api.retry_upload(upload_id=id) if not response.data: raise RuntimeError("Received malformed response (missing `data`) from retry_upload") return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload)
[docs] def retry_upload_and_wait( id: str, *, timeout: float = 5, wait_interval: float = 0.5, ) -> models.Upload: """Retry processing of an upload that previously did not succeed. Polls for the upload, blocking until the upload has finished processing or the timeout has been reached. :param id: Id of the upload to delete :type id: str :param timeout: Number of seconds to wait for snapshot deletion (default 5) :type timeout: float :param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5) :type wait_interval: float :return: The upload details :rtype: models.Upload :raises TimeoutError: If the timeout has been reached """ retry_upload(id) return wait_for_upload(id, timeout=timeout, wait_interval=wait_interval)