Source code for chariot.video.video

"""Chariot Video API."""

import datetime
import os
import time

from chariot import _apis
from chariot.config import getLogger
from chariot.models._utils import _upload_parts
from chariot_api._openapi.video.models import (
    InputCompleteUploadRequest,
    InputCreateUploadRequest,
    InputCreateVideoRequest,
)

__all__ = [
    "get_videos",
    "delete_video",
    "upload_video",
    "process_video",
    "get_video_request",
    "get_video_requests",
    "wait_for_video_request",
]

# defined in go/apps/video/internal/app/upload.go
PART_SIZE = 100 * 1024 * 1024

log = getLogger(__name__)


[docs] def get_videos(*, project_id: str, to_dict: bool = False, **kwargs): """get_videos returns all videos in the requested project_id and kwargs, optionally converts results to_dict """ data = ( _apis.video.videos_api.api_videos_v1_videos_get(project_id=project_id, **kwargs).data or [] # pyright: ignore[reportAttributeAccessIssue] ) if to_dict: return [v.to_dict() for v in data] else: return data
[docs] def delete_video(video_id): """delete_video deletes a video by video_id""" return _apis.video.videos_api.api_videos_v1_videos_video_id_delete(video_id)
[docs] def upload_video(*, project_id: str, name: str, description: str, video_file: str): """upload_video uploads the local video located at path `video_file` with the specified name, description, and project_id """ log.debug(f"Upload video '{video_file}' to project_id {project_id}") resp = _apis.video.videos_api.api_videos_v1_videos_post( video=InputCreateVideoRequest( project_id=project_id, name=name, description=description, video_type="batch" ) ) video_id = resp.data.id log.debug(f"... VideoID: {video_id}") upload = _apis.video.uploads_api.api_videos_v1_videos_video_id_uploads_post( video_id=video_id, upload=InputCreateUploadRequest( size=os.path.getsize(video_file), type="video", ), ) log.debug( f"... UploadID: {upload.upload_id} with {len(upload.urls)} parts part_size={PART_SIZE}" ) etags = _upload_parts(video_file, [u.url for u in upload.urls], PART_SIZE) log.debug(f"... ETags: {etags}") _apis.video.uploads_api.api_videos_v1_uploads_upload_id_complete_post( upload_id=upload.upload_id, request=InputCompleteUploadRequest( video_id=video_id, parts=[{"etag": etag, "part_number": idx + 1} for idx, etag in enumerate(etags)], ), ) return _apis.video.videos_api.api_videos_v1_videos_video_id_get(video_id=video_id).data
[docs] def process_video(*, video_id: str, model_id: str = None): """Process an uploaded video so that it can be playedin the UI. Optionally supply a model-id to process video frames through an inference server. """ log.debug(f"Process video-id '{video_id}' with model-id '{model_id}'") resp = _apis.video.video_requests_api.api_videos_v1_video_requests_post( {"video_id": video_id, "model_id": model_id} ) log.debug(f"... videoRequest: {resp}") vrid = resp.data.id resp = ( _apis.video.video_requests_api.api_videos_v1_video_requests_video_request_id_process_post( video_request_id=vrid ) ) log.debug(f"... processing: {resp}") return get_video_request(vrid)
[docs] def get_video_requests(*, project_id: str = None, video_id: str = None): """Get all video-requests in a project or by video-id""" if not project_id and not video_id: raise ValueError("Must supply one of project_id or video_id") if not project_id: project_id = _apis.video.videos_api.api_videos_v1_videos_video_id_get( video_id ).data.project_id resp = _apis.video.video_requests_api.api_videos_v1_video_requests_get( project_id=project_id, video_id=video_id ) return resp.data
[docs] def get_video_request(video_request_id: str): """Get one video-request by id""" resp = _apis.video.video_requests_api.api_videos_v1_video_requests_video_request_id_get( video_request_id=video_request_id ) return resp.data
[docs] def wait_for_video_request(video_request_id: str, wait_interval: int = 5, timeout: int = 60): """Wait for `timeout` seconds for the video-request to be processed.""" start = datetime.datetime.now() vr = get_video_request(video_request_id) while vr.status != "success": if (datetime.datetime.now() - start).total_seconds() > timeout: raise TimeoutError( f"video_request {video_request_id!r} did not complete within {timeout} seconds" ) time.sleep(wait_interval) vr = get_video_request(video_request_id) return vr