import os
import tarfile
import tempfile
from collections.abc import Generator
from typing import Any
import requests
from chariot import _apis
from chariot.config import getLogger, settings
from chariot.datasets import _utils, exceptions, models
from chariot_api._openapi.datasets_v3 import exceptions as api_exceptions
from chariot_api._openapi.datasets_v3 import models as openapi_models
__all__ = [
"get_uploads",
"get_upload_statistics",
"get_upload",
"upload_file",
"upload_file_and_wait",
"upload_folder",
"upload_folder_and_wait",
"upload_bytes",
"upload_bytes_and_wait",
"upload_files_from_urls",
"upload_files_from_urls_and_wait",
"delete_upload",
"delete_upload_and_wait",
"wait_for_upload",
"retry_upload",
"retry_upload_and_wait",
]
log = getLogger(__name__)
DEFAULT_UPLOAD_WAIT_TIMEOUT = 3600
DEFAULT_UPLOAD_WAIT_INTERVAL = 0.5
[docs]
def get_uploads(
dataset_id: str,
*,
type: list[models.UploadType] | None = None,
status: list[models.UploadStatus] | None = None,
sort: models.UploadSortColumn | None = None,
direction: models.SortDirection | None = None,
max_items: int | None = None,
) -> Generator[models.Upload, None, None]:
"""Get uploads for a dataset
:param dataset_id: Id of the dataset to get uploads for
:type dataset_id: str
:param type: Filter snapshots by upload type
:type type: Optional[models.UploadType]
:param status: Filter snapshots by upload status
:type name: Optional[models.UploadStatus]
:param sort: How to sort the uploads
:type sort: Optional[models.UploadSortColumn]
:param direction: Whether to sort in ascending or descending order
:type direction: Optional[models.SortDirection]
:param max_items: The maximum number of uploads to return
:type max_items: Optional[int]
:return: Upload details for uploads lmatching the criteria
:rtype: Generator[models.Upload, None, None]
"""
params = locals()
if "max_items" in params:
del params["max_items"]
return _utils.paginate_items(_get_uploads, params, max_items)
def _get_uploads(
dataset_id: str,
*,
type: list[models.UploadType] | None = None,
status: list[models.UploadStatus] | None = None,
sort: models.UploadSortColumn | None = None,
direction: models.SortDirection | None = None,
limit: int | None = None,
offset: int | None = None,
) -> list[models.Upload]:
response = _apis.datasets_v3.uploads_api.get_uploads(
dataset_id=dataset_id,
limit=limit,
offset=offset,
sort=_utils.enum_value(sort),
direction=_utils.enum_value(direction),
type=[_utils.enum_value(t) for t in type] if type is not None else None,
status=[_utils.enum_value(s) for s in status] if status is not None else None,
)
if not response.data:
return []
return [_utils.convert_to_dataclass(s.model_dump(), models.Upload) for s in response.data]
[docs]
def get_upload_statistics(
*,
dataset_id: str,
type: list[models.UploadType] | None = None,
status: list[models.UploadStatus] | None = None,
) -> models.UploadStatistics:
"""Get upload statistics with various criteria.
:param dataset_id: Id of the dataset to get uploads for
:type dataset_id: str
:param type: Filter snapshots by upload type
:type type: Optional[models.UploadType]
:param status: Filter snapshots by upload status
:return: Statistics of uploads matching the criteria
:rtype: models.UploadStatistics
"""
response = _apis.datasets_v3.uploads_api.upload_statistics(
dataset_id=dataset_id,
type=[_utils.enum_value(t) for t in type] if type is not None else None,
status=[_utils.enum_value(s) for s in status] if status is not None else None,
)
if not response.data:
raise RuntimeError(
"Received malformed response (missing `data`) from get_upload_statistics"
)
return _utils.convert_to_dataclass(response.data.model_dump(), models.UploadStatistics)
[docs]
def get_upload(id: str) -> models.Upload:
response = _apis.datasets_v3.uploads_api.get_upload(upload_id=id)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from get_upload")
return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload)
[docs]
def wait_for_upload(
id: str,
*,
timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT,
wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL,
) -> models.Upload:
"""Polls the given upload until it has finished processing.
:param id: Id of the upload to wait for
:type id: str
:param timeout: Number of seconds to wait for upload to complete (default 3600)
:type timeout: float
:param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5)
:type wait_interval: float
:return: The upload details
:rtype: models.Upload
:raises TimeoutError: If the timeout has been reached
:raises exceptions.UploadValidationError: If the upload fails and has validation errors
:raises exceptions.UploadUnknownError: If the upload fails without a specified reason
:raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried
"""
def upload_terminal_status() -> tuple[bool, models.Upload | None]:
upload = get_upload(id)
if not upload.job:
return (True, upload)
return (False, None)
upload = _utils.wait_for(
upload_terminal_status,
f"Timed out waiting for upload {id} to be in terminal status after {timeout} seconds",
timeout,
wait_interval,
)
if upload.status in [models.UploadStatus.COMPLETE, models.UploadStatus.CLEANUP]:
return upload
elif upload.status == models.UploadStatus.ERROR:
if upload.validation_errors:
raise exceptions.UploadValidationError(upload.id, upload.validation_errors)
raise exceptions.UploadUnknownError(upload.id)
else:
raise exceptions.UploadIncompleteError(upload.id, upload.status)
def _create_upload(
dataset_id: str,
type: models.UploadType,
size: int | None,
is_gzipped: bool | None,
delete_source: bool | None,
max_validation_errors: int | None,
image_validation: bool | None,
split: models.SplitName | None,
source_urls: list[str] | None,
datum_metadata: list[dict[str, Any]] | None,
annotations_url: str | None,
video_sampling_type: models.VideoSamplingType | None,
video_sampling_value: float | None,
video_deinterlace: bool | None,
) -> models.Upload:
video_opts = None
if type == models.UploadType.VIDEO:
video_opts = openapi_models.ModelCreateUploadVideoOptions(
deinterlace=video_deinterlace,
frame_sample_rate=video_sampling_value
if video_sampling_type == models.VideoSamplingType.RATE
else 0
if video_sampling_type == models.VideoSamplingType.NONE
else None,
frame_sample_ratio=video_sampling_value
if video_sampling_type == models.VideoSamplingType.RATIO
else None,
)
request = openapi_models.InputCreateUploadRequest(
type=_utils.enum_value(type),
size=size,
is_gzipped=is_gzipped,
delete_source=delete_source,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=_utils.enum_value(split),
source_urls=source_urls,
datum_metadata=datum_metadata,
annotations_url=annotations_url,
video_options=video_opts,
)
response = _apis.datasets_v3.uploads_api.create_upload(dataset_id=dataset_id, body=request)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from _create_upload")
return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload)
def _complete_upload(
upload_id: str,
etags: list[str],
) -> models.Upload:
request = openapi_models.InputCompleteUploadRequest(etags=etags)
response = _apis.datasets_v3.uploads_api.complete_upload(upload_id=upload_id, body=request)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from _create_upload")
return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload)
def _upload(
dataset_id: str,
type: models.UploadType,
file: _utils.Path | bytes | None,
delete_source: bool | None,
max_validation_errors: int | None,
image_validation: bool | None,
split: models.SplitName | None,
source_urls: list[str] | None,
datum_metadata: list[dict[str, Any]] | None,
annotations_url: str | None,
video_sampling_type: models.VideoSamplingType | None = None,
video_sampling_value: float | None = None,
video_deinterlace: bool | None = None,
) -> models.Upload:
size = None
if file:
if isinstance(file, _utils.Path):
size = os.path.getsize(file)
else:
size = len(file)
is_gzipped = None
if type == models.UploadType.ANNOTATION:
if isinstance(file, _utils.Path):
with open(file, "rb") as f:
bytes = f.read(2)
else:
bytes = file[:2]
# Check for gzip magic number
is_gzipped = bytes == b"\x1f\x8b"
upload = _create_upload(
dataset_id=dataset_id,
type=type,
size=size,
is_gzipped=is_gzipped,
delete_source=delete_source,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
source_urls=source_urls,
datum_metadata=datum_metadata,
annotations_url=annotations_url,
video_sampling_type=video_sampling_type,
video_sampling_value=video_sampling_value,
video_deinterlace=video_deinterlace,
)
if not source_urls:
try:
if not upload.presigned_urls:
raise RuntimeError("Received no presigned urls from _create_upload")
etags = []
with _utils.chunks(
num_chunks=len(upload.presigned_urls), total_size=size, file=file
) as chunks:
for chunk, url in zip(chunks, upload.presigned_urls):
response = requests.request(
method=url.method, url=url.url, data=chunk, verify=settings.verify_ssl
)
response.raise_for_status()
if "ETag" not in response.headers:
raise RuntimeError("Bad response from uploading, missing ETag")
etags.append(response.headers["ETag"])
upload = _complete_upload(upload_id=upload.id, etags=etags)
except:
# Attempt to clean up the failed upload
try:
delete_upload(upload.id)
except:
log.exception("Error cleaning up failed upload")
raise
return upload
[docs]
def upload_file(
dataset_id: str,
*,
type: models.UploadType,
path: str,
max_validation_errors: int | None = None,
image_validation: bool | None = None,
split: models.SplitName | None = None,
datum_metadata: dict[str, Any] | None = None,
video_sampling_type: models.VideoSamplingType | None = None,
video_sampling_value: float | None = None,
video_deinterlace: bool | None = None,
) -> models.Upload:
"""Uploads a single file. Does not wait for the upload to complete processing.
:param dataset_id: Id of the dataset to upload to
:type dataset_id: str
:param type: The type of file being uploaded.
:type type: models.UploadType
:param path: Path of file to upload
:type path: str
:param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload
:type max_validation_errors: Optional[int]
:param image_validation: Whether or not to perform extra validations on image datums
:type image_validation: Optional[bool]
:param split: Name of split to upload datums to.
:type split: Optional[models.SplitName]
:param datum_metadata: When uploading a single datum (type=models.UploadType.DATUM), include custom metadata on this datum
:type datum_metadata: Optional[Dict[str, Any]]
:param video_sampling_type: When uploading a video, optionally control how frames are sampled (at a constant rate, by a ratio of the videos frame rate, or none [all frames are extracted])
:type video_sampling_type: Optional[models.VideoSamplingType]
:param video_sampling_value: When uploading a video with a video_sampling_type of VideoSamplingType.RATE or VideoSamplingType.RATIO, this value controls the rate or ratio of sampling (either an FPS value or a multiplier for the video's FPS, respectively)
:type video_sampling_value: Optional[float]
:param video_deinterlace: When uploading a video, optionally have a deinterlacing filter applied prior to extracting frames
:type video_deinterlace: Optional[bool]
:return: The upload details
:rtype: models.Upload
"""
return _upload(
dataset_id=dataset_id,
type=type,
file=_utils.Path(path),
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
delete_source=None,
source_urls=None,
datum_metadata=[datum_metadata] if datum_metadata else None,
annotations_url=None,
video_sampling_type=video_sampling_type,
video_sampling_value=video_sampling_value,
video_deinterlace=video_deinterlace,
)
[docs]
def upload_file_and_wait(
dataset_id: str,
*,
type: models.UploadType,
path: str,
max_validation_errors: int | None = None,
image_validation: bool | None = None,
split: models.SplitName | None = None,
datum_metadata: dict[str, Any] | None = None,
video_sampling_type: models.VideoSamplingType | None = None,
video_sampling_value: float | None = None,
video_deinterlace: bool | None = None,
timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT,
wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL,
) -> models.Upload:
"""Uploads a single file, and waits for the upload to complete processing.
:param dataset_id: Id of the dataset to upload to
:type dataset_id: str
:param type: The type of file being uploaded.
:type type: models.UploadType
:param path: Path of file to upload
:type path: str
:param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload
:type max_validation_errors: Optional[int]
:param image_validation: Whether or not to perform extra validations on image datums
:type image_validation: Optional[bool]
:param split: Name of split to upload datums to.
:type split: Optional[models.SplitName]
:param datum_metadata: When uploading a single datum (type=models.UploadType.DATUM), include custom metadata on this datum
:type datum_metadata: Optional[Dict[str, Any]]
:param video_sampling_type: When uploading a video, optionally control how frames are sampled (at a constant rate, by a ratio of the videos frame rate, or none [all frames are extracted])
:type video_sampling_type: Optional[models.VideoSamplingType]
:param video_sampling_value: When uploading a video with a video_sampling_type of VideoSamplingType.RATE or VideoSamplingType.RATIO, this value controls the rate or ratio of sampling (either an FPS value or a multiplier for the video's FPS, respectively)
:type video_sampling_value: Optional[float]
:param video_deinterlace: When uploading a video, optionally have a deinterlacing filter applied prior to extracting frames
:type video_deinterlace: Optional[bool]
:param timeout: Number of seconds to wait for upload to complete (default 3600)
:type timeout: float
:param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5)
:type wait_interval: float
:return: The upload details
:rtype: models.Upload
:raises TimeoutError: If the timeout has been reached
:raises exceptions.UploadValidationError: If the upload fails and has validation errors
:raises exceptions.UploadUnknownError: If the upload fails without a specified reason
:raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried
"""
upload = upload_file(
dataset_id=dataset_id,
type=type,
path=path,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
datum_metadata=datum_metadata,
video_sampling_type=video_sampling_type,
video_sampling_value=video_sampling_value,
video_deinterlace=video_deinterlace,
)
return wait_for_upload(upload.id, timeout=timeout, wait_interval=wait_interval)
[docs]
def upload_folder(
dataset_id: str,
*,
path: str,
max_validation_errors: int | None = None,
image_validation: bool | None = None,
split: models.SplitName | None = None,
) -> models.Upload:
"""Uploads the contents of a folder. Equivalent to creating an archive from that folder
and then uploading that archive with type=UploadType.ARCHIVE.
Does not wait for the upload to complete processing.
:param dataset_id: Id of the dataset to upload to
:type dataset_id: str
:param path: Path of folder to upload
:type path: str
:param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload
:type max_validation_errors: Optional[int]
:param image_validation: Whether or not to perform extra validations on image datums
:type image_validation: Optional[bool]
:param split: Name of split to upload datums to.
:type split: Optional[models.SplitName]
:return: The upload details
:rtype: models.Upload
"""
with tempfile.NamedTemporaryFile() as tmp, tarfile.open(fileobj=tmp, mode="w:gz") as tar:
empty = True
for dir, _, files in os.walk(path):
reldir = os.path.relpath(dir, path)
for file in files:
empty = False
tar.add(os.path.join(dir, file), os.path.join(reldir, file))
if empty:
raise RuntimeError(f"No files found in the folder `{path}` or its subfolders.")
tar.close()
tmp.flush()
return upload_file(
dataset_id=dataset_id,
type=models.UploadType.ARCHIVE,
path=tmp.name,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
)
[docs]
def upload_folder_and_wait(
dataset_id: str,
*,
path: str,
max_validation_errors: int | None = None,
image_validation: bool | None = None,
split: models.SplitName | None = None,
timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT,
wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL,
) -> models.Upload:
"""Uploads the contents of a folder. Equivalent to creating an archive from that folder
and then uploading that archive with type=UploadType.ARCHIVE.
Waits for the upload to complete processing.
:param dataset_id: Id of the dataset to upload to
:type dataset_id: str
:param path: Path of folder to upload
:type path: str
:param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload
:type max_validation_errors: Optional[int]
:param image_validation: Whether or not to perform extra validations on image datums
:type image_validation: Optional[bool]
:param split: Name of split to upload datums to.
:type split: Optional[models.SplitName]
:param timeout: Number of seconds to wait for upload to complete (default 3600)
:type timeout: float
:param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5)
:type wait_interval: float
:return: The upload details
:rtype: models.Upload
:raises TimeoutError: If the timeout has been reached
:raises exceptions.UploadValidationError: If the upload fails and has validation errors
:raises exceptions.UploadUnknownError: If the upload fails without a specified reason
:raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried
"""
upload = upload_folder(
dataset_id=dataset_id,
path=path,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
)
return wait_for_upload(upload.id, timeout=timeout, wait_interval=wait_interval)
[docs]
def upload_bytes(
dataset_id: str,
*,
type: models.UploadType,
data: bytes,
max_validation_errors: int | None = None,
image_validation: bool | None = None,
split: models.SplitName | None = None,
datum_metadata: dict[str, Any] | None = None,
video_sampling_type: models.VideoSamplingType | None = None,
video_sampling_value: float | None = None,
video_deinterlace: bool | None = None,
) -> models.Upload:
"""Uploads a set of bytes as a single file. Does not wait for the upload to complete processing.
:param dataset_id: Id of the dataset to upload to
:type dataset_id: str
:param type: The type of file being uploaded.
:type type: models.UploadType
:param data: Bytes to upload
:type data: bytes
:param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload
:type max_validation_errors: Optional[int]
:param image_validation: Whether or not to perform extra validations on image datums
:type image_validation: Optional[bool]
:param split: Name of split to upload datums to.
:type split: Optional[models.SplitName]
:param datum_metadata: When uploading a single datum (type=models.UploadType.DATUM), include custom metadata on this datum
:type datum_metadata: Optional[Dict[str, Any]]
:param video_sampling_type: When uploading a video, optionally control how frames are sampled (at a constant rate, by a ratio of the videos frame rate, or none [all frames are extracted])
:type video_sampling_type: Optional[models.VideoSamplingType]
:param video_sampling_value: When uploading a video with a video_sampling_type of VideoSamplingType.RATE or VideoSamplingType.RATIO, this value controls the rate or ratio of sampling (either an FPS value or a multiplier for the video's FPS, respectively)
:type video_sampling_value: Optional[float]
:param video_deinterlace: When uploading a video, optionally have a deinterlacing filter applied prior to extracting frames
:type video_deinterlace: Optional[bool]
:return: The upload details
:rtype: models.Upload
"""
return _upload(
dataset_id=dataset_id,
type=type,
file=data,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
delete_source=None,
source_urls=None,
datum_metadata=[datum_metadata] if datum_metadata else None,
annotations_url=None,
video_sampling_type=video_sampling_type,
video_sampling_value=video_sampling_value,
video_deinterlace=video_deinterlace,
)
[docs]
def upload_bytes_and_wait(
dataset_id: str,
*,
type: models.UploadType,
data: bytes,
max_validation_errors: int | None = None,
image_validation: bool | None = None,
split: models.SplitName | None = None,
datum_metadata: dict[str, Any] | None = None,
video_sampling_type: models.VideoSamplingType | None = None,
video_sampling_value: float | None = None,
video_deinterlace: bool | None = None,
timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT,
wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL,
) -> models.Upload:
"""Uploads a set of bytes as a single file, and waits for the upload to complete processing.
:param dataset_id: Id of the dataset to upload to
:type dataset_id: str
:param type: The type of file being uploaded.
:type type: models.UploadType
:param data: Bytes to upload
:type data: bytes
:param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload
:type max_validation_errors: Optional[int]
:param image_validation: Whether or not to perform extra validations on image datums
:type image_validation: Optional[bool]
:param split: Name of split to upload datums to.
:type split: Optional[models.SplitName]
:param datum_metadata: When uploading a single datum (type=models.UploadType.DATUM), include custom metadata on this datum
:type datum_metadata: Optional[Dict[str, Any]]
:param video_sampling_type: When uploading a video, optionally control how frames are sampled (at a constant rate, by a ratio of the videos frame rate, or none [all frames are extracted])
:type video_sampling_type: Optional[models.VideoSamplingType]
:param video_sampling_value: When uploading a video with a video_sampling_type of VideoSamplingType.RATE or VideoSamplingType.RATIO, this value controls the rate or ratio of sampling (either an FPS value or a multiplier for the video's FPS, respectively)
:type video_sampling_value: Optional[float]
:param video_deinterlace: When uploading a video, optionally have a deinterlacing filter applied prior to extracting frames
:type video_deinterlace: Optional[bool]
:param timeout: Number of seconds to wait for upload to complete (default 3600)
:type timeout: float
:param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5)
:type wait_interval: float
:return: The upload details
:rtype: models.Upload
:raises TimeoutError: If the timeout has been reached
:raises exceptions.UploadValidationError: If the upload fails and has validation errors
:raises exceptions.UploadUnknownError: If the upload fails without a specified reason
:raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried
"""
upload = upload_bytes(
dataset_id=dataset_id,
type=type,
data=data,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
datum_metadata=[datum_metadata] if datum_metadata else None,
video_sampling_type=video_sampling_type,
video_sampling_value=video_sampling_value,
video_deinterlace=video_deinterlace,
)
return wait_for_upload(upload.id, timeout=timeout, wait_interval=wait_interval)
[docs]
def upload_files_from_urls(
dataset_id: str,
*,
type: models.UploadType,
source_urls: list[str],
source_urls_datum_metadata: list[dict[str, Any]] | None = None,
annotations_url: str | None = None,
max_validation_errors: int | None = None,
image_validation: bool | None = None,
split: models.SplitName | None = None,
) -> models.Upload:
"""Uploads a list of urls to a dataset as individual datums. Does not wait for the upload to complete processing.
:param type: The type of file being uploaded. Must be one of models.UploadType.{ARCHIVE|DATUM}
:type type: models.UploadType
:param source_urls: List of URLs from which the datums are read. len() must be equal to 1 for ARCHIVE upload type.
:type source_urls: List[str]
:param source_urls_datum_metadata: When uploading individual datums (type=models.UploadType.DATUM), include custom metadata for datums created by each URL. List index should match the desired source_urls list index, empty array elements should include empty Dicts.
:type source_urls_datum_metadata: Optional[List[Dict[str, Any]]]
:param annotations_url: URL from which a gzipped annotations file in jsonl format will be downloaded and processed along datums from source_urls. Attribute path in the annotations file will be datum index in source_urls.
:type annotations_url: Optional[str]
:param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload
:type max_validation_errors: Optional[int]
:param image_validation: Whether or not to perform extra validations on image datums
:type image_validation: Optional[bool]
:param split: Name of split to upload datums to.
:type split: Optional[models.SplitName]
:return: The upload details
:rtype: models.Upload
"""
return _upload(
dataset_id=dataset_id,
type=type,
file=None,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
delete_source=None,
source_urls=source_urls,
datum_metadata=source_urls_datum_metadata,
annotations_url=annotations_url,
)
[docs]
def upload_files_from_urls_and_wait(
dataset_id: str,
*,
type: models.UploadType,
source_urls: list[str],
source_urls_datum_metadata: list[dict[str, Any]] | None = None,
annotations_url: str | None = None,
max_validation_errors: int | None = None,
image_validation: bool | None = None,
split: models.SplitName | None = None,
timeout: float = DEFAULT_UPLOAD_WAIT_TIMEOUT,
wait_interval: float = DEFAULT_UPLOAD_WAIT_INTERVAL,
) -> models.Upload:
"""Uploads a set of bytes as a single file, and waits for the upload to complete processing.
:param dataset_id: Id of the dataset to upload to
:type dataset_id: str
:param type: The type of file being uploaded. Must be one of models.UploadType.{ARCHIVE|DATUM}
:type type: models.UploadType
:param source_urls: List of URLs from which the datums are read. len() must be equal to 1 for ARCHIVE upload type.
:type source_urls: List[str]
:param source_urls_datum_metadata: When uploading individual datums (type=models.UploadType.DATUM), include custom metadata for datums created by each URL. List index should match the desired source_urls list index and empty array elements should include empty Dicts.
:type source_urls_datum_metadata: Optional[List[Dict[str, Any]]]
:param annotations_url: URL from which a gzipped annotations file in jsonl format will be downloaded and processed along datums from source_urls. Attribute path in the annotations file will be datum index in source_urls.
:type annotations_url: Optional[str]
:param max_validation_errors: Maximum number of validation errors to tolerate before failing the upload
:type max_validation_errors: Optional[int]
:param image_validation: Whether or not to perform extra validations on image datums
:type image_validation: Optional[bool]
:param split: Name of split to upload datums to.
:type split: Optional[models.SplitName]
:param timeout: Number of seconds to wait for upload to complete (default 3600)
:type timeout: float
:param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5)
:type wait_interval: float
:return: The upload details
:rtype: models.Upload
:raises TimeoutError: If the timeout has been reached
:raises exceptions.UploadValidationError: If the upload fails and has validation errors
:raises exceptions.UploadUnknownError: If the upload fails without a specified reason
:raises exceptions.UploadIncompleteError: If the upload has stopped making progress without reaching a terminal state. Upload should probably be retried
"""
upload = upload_files_from_urls(
dataset_id=dataset_id,
type=type,
source_urls=source_urls,
source_urls_datum_metadata=source_urls_datum_metadata,
annotations_url=annotations_url,
max_validation_errors=max_validation_errors,
image_validation=image_validation,
split=split,
)
return wait_for_upload(upload.id, timeout=timeout, wait_interval=wait_interval)
[docs]
def delete_upload(id: str) -> models.Upload:
"""Delete an upload by id. This can only be done if the upload's status is not `COMPLETE` or `CLEANUP`.
:param id: Id of the upload to delete
:type id: str
:return: The upload details
:rtype: models.Upload
"""
response = _apis.datasets_v3.uploads_api.delete_upload(upload_id=id)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from delete_upload")
return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload)
[docs]
def delete_upload_and_wait(
id: str,
*,
timeout: float = 5,
wait_interval: float = 0.5,
) -> None:
"""Delete an upload by id. This can only be done if the upload's status is not `COMPLETE` or `CLEANUP`.
Polls for the upload, blocking until the upload has been deleted or the timeout has been reached.
:param id: Id of the upload to delete
:type id: str
:param timeout: Number of seconds to wait for snapshot deletion (default 5)
:type timeout: float
:param wait_interval: Number of seconds between successive calls to check the upload for deletion (default 0.5)
:type wait_interval: float
:raises TimeoutError: If the timeout has been reached
"""
delete_upload(id)
def upload_not_found_condition() -> tuple[bool, None]:
try:
get_upload(id)
except api_exceptions.NotFoundException:
return (True, None)
return (False, None)
return _utils.wait_for(
upload_not_found_condition,
f"Timed out waiting for deletion of upload {id} after {timeout} seconds",
timeout,
wait_interval,
)
[docs]
def retry_upload(id: str) -> models.Upload:
"""Retry processing of an upload that previously did not succeed.
:param id: Id of the upload to delete
:type id: str
:return: The upload details
:rtype: models.Upload
"""
response = _apis.datasets_v3.uploads_api.retry_upload(upload_id=id)
if not response.data:
raise RuntimeError("Received malformed response (missing `data`) from retry_upload")
return _utils.convert_to_dataclass(response.data.model_dump(), models.Upload)
[docs]
def retry_upload_and_wait(
id: str,
*,
timeout: float = 5,
wait_interval: float = 0.5,
) -> models.Upload:
"""Retry processing of an upload that previously did not succeed.
Polls for the upload, blocking until the upload has finished processing or the timeout has been reached.
:param id: Id of the upload to delete
:type id: str
:param timeout: Number of seconds to wait for snapshot deletion (default 5)
:type timeout: float
:param wait_interval: Number of seconds between successive calls to check the upload for completion (default 0.5)
:type wait_interval: float
:return: The upload details
:rtype: models.Upload
:raises TimeoutError: If the timeout has been reached
"""
retry_upload(id)
return wait_for_upload(id, timeout=timeout, wait_interval=wait_interval)