Source code for chariot.video.video

"""Chariot Video API."""

import datetime
import os
import time
from pathlib import Path
from typing import Any

import urllib3

from chariot import _apis
from chariot.config import getLogger
from chariot.models._utils import _upload_parts
from chariot_api._openapi.video.models import (
    InputCompleteUploadRequest,
    InputCreateTrackerRequestRequest,
    InputCreateUploadRequest,
    InputCreateVideoRequest,
    InputCreateVideoRequestRequest,
    InputUpdateTrackerRequestRequest,
    OutputDeleteTrackerRequestResponse,
    OutputDeleteVideoRequestResponse,
    OutputGetVideoRequestResponse,
    OutputGetVideoResponse,
    OutputTrackerRequestResponse,
)

__all__ = [
    "get_videos",
    "delete_video",
    "upload_video",
    "wait_for_stream_connection",
    "stream_video",
    "process_video",
    "get_video_request",
    "get_video_requests",
    "wait_for_video_request",
    "delete_video_request",
    "create_tracker_request",
    "get_tracker_request",
    "wait_for_tracker_request",
    "update_tracker_request",
    "delete_tracker_request",
    "get_video_asset",
]

# defined in go/apps/video/internal/app/upload.go
PART_SIZE = 100 * 1024 * 1024

log = getLogger(__name__)


[docs] def get_videos( *, project_id: str, to_dict: bool = False, **kwargs ) -> list[dict[str, Any]] | list[OutputGetVideoResponse]: """Get videos returns all videos within a project.""" response = _apis.video.videos_api.api_videos_v1_videos_get(project_id=project_id, **kwargs) data = response.data if data is None: raise Exception("unable to get videos, empty response") return [v.to_dict() for v in data] if to_dict else data
[docs] def delete_video(video_id) -> None: """Delete a video.""" return _apis.video.videos_api.api_videos_v1_videos_video_id_delete(video_id)
[docs] def upload_video( *, project_id: str, name: str, description: str, video_file: str | Path, **kwargs ) -> OutputGetVideoResponse | None: """upload_video uploads the local video located at path `video_file` with the specified name, description, and project_id """ if project_id == "": raise ValueError("unable to create static video, project id is empty") if name == "": raise ValueError("unable to create static video, name is empty") if description == "": raise ValueError("unable to create static video, description is empty") if isinstance(video_file, Path): video_file = str(video_file) if video_file == "": raise ValueError("unable to create static video, video file is empty") body = { "project_id": project_id, "name": name, "description": description, "video_file": video_file, "video_type": "batch", **kwargs, } request_body = InputCreateVideoRequest.from_dict(body) if request_body is None: raise Exception("unable to create static video, request body is invalid") response = _apis.video.videos_api.api_videos_v1_videos_post(video=request_body) if response.data is None or response.data.id is None: raise Exception("unable to create static video, empty response") video_id = response.data.id upload = _apis.video.uploads_api.api_videos_v1_videos_video_id_uploads_post( video_id=video_id, upload=InputCreateUploadRequest( size=os.path.getsize(video_file), type="video", ), ) upload_id = upload.upload_id if upload_id is None: raise Exception("unable to create static video, empty upload id") upload_urls = upload.urls if upload_urls is None: raise Exception("unable to create static video, empty upload urls") log.debug( f"created upload {upload_id} for video {video_id} with {len(upload_urls)} parts and part_size={PART_SIZE}" ) signed_urls: list[str] = [u.url for u in upload_urls if u.url is not None] if signed_urls is None: raise Exception("unable to create static video, empty signed upload urls") etags = _upload_parts(video_file, signed_urls, PART_SIZE) log.debug(f"upload contained the following etags: {etags}") _apis.video.uploads_api.api_videos_v1_uploads_upload_id_complete_post( upload_id=upload_id, request=InputCompleteUploadRequest( video_id=video_id, parts=[{"etag": etag, "part_number": idx + 1} for idx, etag in enumerate(etags)], ), ) return _apis.video.videos_api.api_videos_v1_videos_video_id_get(video_id=video_id).data
[docs] def wait_for_stream_connection( address: str, max_attempts: int = 6, wait_interval: int = 10 ) -> bool: """Wait for a stream to be available.""" for attempt in range(max_attempts): connection_response = ( _apis.video.h_l_s_api.api_videos_v1_hls_check_get_without_preload_content(address) ) if connection_response.status == 200: log.info(f"stream found {address} to be connectable") return True log.debug( f"unable to connect to stream at {address}, attempt ({attempt + 1}/{max_attempts})" ) time.sleep(wait_interval) return False
[docs] def stream_video( *, project_id: str, name: str, description: str, url: str, **kwargs ) -> OutputGetVideoResponse | None: """Create a streaming video.""" if project_id == "": raise ValueError("unable to create streaming video, project id is empty") if name == "": raise ValueError("unable to create streaming video, name is empty") if description == "": raise ValueError("unable to create streaming video, description is empty") if url == "": raise ValueError("unable to create streaming video, url is empty") body = { "project_id": project_id, "name": name, "description": description, "url": url, "video_type": "stream", **kwargs, } request_body = InputCreateVideoRequest.from_dict(body) if request_body is None: raise Exception("unable to create streaming video, request body is invalid") response = _apis.video.videos_api.api_videos_v1_videos_post(video=request_body) if response.data is None or response.data.id is None: raise Exception("unable to create streaming video, empty response") video_id = str(response.data.id) return _apis.video.videos_api.api_videos_v1_videos_video_id_get(video_id=video_id).data
[docs] def process_video(video_id: str, **kwargs) -> OutputGetVideoRequestResponse | None: """Process a video.""" body = {"video_id": video_id, **kwargs} request_body = InputCreateVideoRequestRequest.from_dict(body) if request_body is None: raise Exception("unable to process video, request body is invalid") response = _apis.video.video_requests_api.api_videos_v1_video_requests_post( request_body=request_body, ) if response.data is None or response.data.id is None: raise Exception("unable to process video, empty response") video_request_id = response.data.id return get_video_request(video_request_id)
[docs] def get_video_requests( *, project_id: str, video_id: str ) -> list[OutputGetVideoRequestResponse] | None: """Get video requests within a project or associated to a video.""" if (not project_id or project_id == "") and (not video_id or video_id == ""): raise ValueError("unable to get video requests, project id or video id must be provided") if not project_id or project_id == "": project_id = _apis.video.videos_api.api_videos_v1_videos_video_id_get( video_id ).data.project_id response = _apis.video.video_requests_api.api_videos_v1_video_requests_get( project_id=project_id, video_id=video_id ) return response.data
[docs] def get_video_request(video_request_id: str) -> OutputGetVideoRequestResponse | None: """Get a video request by id.""" if video_request_id == "": raise ValueError("unable to get video request, video request id is empty") response = _apis.video.video_requests_api.api_videos_v1_video_requests_video_request_id_get( video_request_id=video_request_id ) return response.data
[docs] def wait_for_video_request( *, video_request_id: str, wait_interval: int = 5, timeout: int = 60, statuses: list[str] = ["success"], ) -> OutputGetVideoRequestResponse: """Periodically check if a video request has achieved one of statuses provided.""" if video_request_id == "": raise ValueError("unable to wait for video request, video request id is empty") if len(statuses) <= 0: raise Exception("unable to wait for video request, no statuses provided") start = datetime.datetime.now() video_request = get_video_request(video_request_id=video_request_id) if video_request is None: raise Exception("unable to wait for video request, unable to get video request") while video_request.status not in statuses: if (datetime.datetime.now() - start).total_seconds() > timeout: raise TimeoutError( f"video request {video_request_id!r} did not achieve one of the provided statuses within {timeout} seconds" ) time.sleep(wait_interval) video_request = get_video_request(video_request_id=video_request_id) if video_request is None: raise Exception("unable to wait for video request, unable to get video request") return video_request
[docs] def delete_video_request(video_request_id: str) -> OutputDeleteVideoRequestResponse | None: """Delete a video request by id.""" if video_request_id == "": raise ValueError("unable to get video request, video request id is empty") response = _apis.video.video_requests_api.api_videos_v1_video_requests_video_request_id_delete( video_request_id=video_request_id ) return response.data
[docs] def create_tracker_request(video_request_id: str, **kwargs) -> OutputTrackerRequestResponse | None: """Create a tracker request.""" body = {"video_request_id": video_request_id, **kwargs} request_body = InputCreateTrackerRequestRequest.from_dict(body) if request_body is None: raise Exception("unable to create tracker request, request body is invalid") response = _apis.video.tracker_requests_api.api_videos_v1_tracker_request_post( request_body=request_body ) return response.data
[docs] def get_tracker_request(tracker_request_id: str) -> OutputTrackerRequestResponse | None: """Get a tracker request.""" if tracker_request_id == "": raise ValueError("unable to get tracker request, tracker request id is empty") response = ( _apis.video.tracker_requests_api.api_videos_v1_tracker_requests_tracker_request_id_get( tracker_request_id=tracker_request_id ) ) return response.data
[docs] def wait_for_tracker_request( *, tracker_request_id: str, wait_interval: int = 5, timeout: int = 60, statuses: list[str] = ["success"], ) -> OutputTrackerRequestResponse: """Periodically check if a tracker request has achieved one of statuses provided.""" if tracker_request_id == "": raise ValueError("unable to wait for tracker request, tracker request id is empty") if len(statuses) <= 0: raise Exception("unable to wait for tracker request, no statuses provided") start = datetime.datetime.now() tracker_request = get_tracker_request(tracker_request_id=tracker_request_id) if tracker_request is None: raise Exception("unable to wait for tracker request, unable to get tracker request") while tracker_request.status not in statuses: if (datetime.datetime.now() - start).total_seconds() > timeout: raise TimeoutError( f"tracker request {tracker_request_id!r} did not achieve one of the provided statuses within {timeout} seconds" ) time.sleep(wait_interval) tracker_request = get_tracker_request(tracker_request_id=tracker_request_id) if tracker_request is None: raise Exception("unable to wait for tracker request, unable to get tracker request") return tracker_request
[docs] def update_tracker_request( *, tracker_request_id: str, **kwargs ) -> OutputTrackerRequestResponse | None: """Update a tracker request.""" body = {**kwargs} request_body = InputUpdateTrackerRequestRequest.from_dict(body) if request_body is None: raise Exception("unable to update tracker request, request body is invalid") response = ( _apis.video.tracker_requests_api.api_videos_v1_tracker_requests_tracker_request_id_patch( tracker_request_id=tracker_request_id, request_body=request_body, ) ) return response.data
[docs] def delete_tracker_request(tracker_request_id: str) -> OutputDeleteTrackerRequestResponse | None: """Delete a tracker request.""" if tracker_request_id == "": raise ValueError("unable to delete tracker request, tracker request id is empty") response = ( _apis.video.tracker_requests_api.api_videos_v1_tracker_request_tracker_request_id_delete( tracker_request_id=tracker_request_id ) ) return response.data
[docs] def get_video_asset(key: str) -> urllib3.HTTPResponse: """Get a video asset.""" return _apis.video.h_l_s_api.api_videos_v1_hls_key_get_without_preload_content(key)