"""Chariot Video API."""
import datetime
import os
import time
from chariot import _apis
from chariot.config import getLogger
from chariot.models._utils import _upload_parts
from chariot_api._openapi.video.models import (
InputCompleteUploadRequest,
InputCreateUploadRequest,
InputCreateVideoRequest,
)
__all__ = [
"get_videos",
"delete_video",
"upload_video",
"process_video",
"get_video_request",
"get_video_requests",
"wait_for_video_request",
]
# defined in go/apps/video/internal/app/upload.go
PART_SIZE = 100 * 1024 * 1024
log = getLogger(__name__)
[docs]
def get_videos(*, project_id: str, to_dict: bool = False, **kwargs):
"""get_videos returns all videos in the requested project_id and kwargs,
optionally converts results to_dict
"""
data = (
_apis.video.videos_api.api_videos_v1_videos_get(project_id=project_id, **kwargs).data or [] # pyright: ignore[reportAttributeAccessIssue]
)
if to_dict:
return [v.to_dict() for v in data]
else:
return data
[docs]
def delete_video(video_id):
"""delete_video deletes a video by video_id"""
return _apis.video.videos_api.api_videos_v1_videos_video_id_delete(video_id)
[docs]
def upload_video(*, project_id: str, name: str, description: str, video_file: str):
"""upload_video uploads the local video located at path `video_file` with the
specified name, description, and project_id
"""
log.debug(f"Upload video '{video_file}' to project_id {project_id}")
resp = _apis.video.videos_api.api_videos_v1_videos_post(
video=InputCreateVideoRequest(
project_id=project_id, name=name, description=description, video_type="batch"
)
)
video_id = resp.data.id
log.debug(f"... VideoID: {video_id}")
upload = _apis.video.uploads_api.api_videos_v1_videos_video_id_uploads_post(
video_id=video_id,
upload=InputCreateUploadRequest(
size=os.path.getsize(video_file),
type="video",
),
)
log.debug(
f"... UploadID: {upload.upload_id} with {len(upload.urls)} parts part_size={PART_SIZE}"
)
etags = _upload_parts(video_file, [u.url for u in upload.urls], PART_SIZE)
log.debug(f"... ETags: {etags}")
_apis.video.uploads_api.api_videos_v1_uploads_upload_id_complete_post(
upload_id=upload.upload_id,
request=InputCompleteUploadRequest(
video_id=video_id,
parts=[{"etag": etag, "part_number": idx + 1} for idx, etag in enumerate(etags)],
),
)
return _apis.video.videos_api.api_videos_v1_videos_video_id_get(video_id=video_id).data
[docs]
def process_video(*, video_id: str, model_id: str = None):
"""Process an uploaded video so that it can be playedin the UI.
Optionally supply a model-id to process video frames through an inference server.
"""
log.debug(f"Process video-id '{video_id}' with model-id '{model_id}'")
resp = _apis.video.video_requests_api.api_videos_v1_video_requests_post(
{"video_id": video_id, "model_id": model_id}
)
log.debug(f"... videoRequest: {resp}")
vrid = resp.data.id
resp = (
_apis.video.video_requests_api.api_videos_v1_video_requests_video_request_id_process_post(
video_request_id=vrid
)
)
log.debug(f"... processing: {resp}")
return get_video_request(vrid)
[docs]
def get_video_requests(*, project_id: str = None, video_id: str = None):
"""Get all video-requests in a project or by video-id"""
if not project_id and not video_id:
raise ValueError("Must supply one of project_id or video_id")
if not project_id:
project_id = _apis.video.videos_api.api_videos_v1_videos_video_id_get(
video_id
).data.project_id
resp = _apis.video.video_requests_api.api_videos_v1_video_requests_get(
project_id=project_id, video_id=video_id
)
return resp.data
[docs]
def get_video_request(video_request_id: str):
"""Get one video-request by id"""
resp = _apis.video.video_requests_api.api_videos_v1_video_requests_video_request_id_get(
video_request_id=video_request_id
)
return resp.data
[docs]
def wait_for_video_request(video_request_id: str, wait_interval: int = 5, timeout: int = 60):
"""Wait for `timeout` seconds for the video-request to be processed."""
start = datetime.datetime.now()
vr = get_video_request(video_request_id)
while vr.status != "success":
if (datetime.datetime.now() - start).total_seconds() > timeout:
raise TimeoutError(
f"video_request {video_request_id!r} did not complete within {timeout} seconds"
)
time.sleep(wait_interval)
vr = get_video_request(video_request_id)
return vr