"""Chariot Video API."""
import datetime
import os
import time
from pathlib import Path
from typing import Any
import urllib3
from chariot import _apis
from chariot.config import getLogger
from chariot.models._utils import _upload_parts
from chariot_api._openapi.video.models import (
InputCompleteUploadRequest,
InputCreateTrackerRequestRequest,
InputCreateUploadRequest,
InputCreateVideoRequest,
InputCreateVideoRequestRequest,
InputUpdateTrackerRequestRequest,
OutputDeleteTrackerRequestResponse,
OutputDeleteVideoRequestResponse,
OutputGetVideoRequestResponse,
OutputGetVideoResponse,
OutputTrackerRequestResponse,
)
__all__ = [
"get_videos",
"delete_video",
"upload_video",
"wait_for_stream_connection",
"stream_video",
"process_video",
"get_video_request",
"get_video_requests",
"wait_for_video_request",
"delete_video_request",
"create_tracker_request",
"get_tracker_request",
"wait_for_tracker_request",
"update_tracker_request",
"delete_tracker_request",
"get_video_asset",
]
# defined in go/apps/video/internal/app/upload.go
PART_SIZE = 100 * 1024 * 1024
log = getLogger(__name__)
[docs]
def get_videos(
*, project_id: str, to_dict: bool = False, **kwargs
) -> list[dict[str, Any]] | list[OutputGetVideoResponse]:
"""Get videos returns all videos within a project."""
response = _apis.video.videos_api.api_videos_v1_videos_get(project_id=project_id, **kwargs)
data = response.data
if data is None:
raise Exception("unable to get videos, empty response")
return [v.to_dict() for v in data] if to_dict else data
[docs]
def delete_video(video_id) -> None:
"""Delete a video."""
return _apis.video.videos_api.api_videos_v1_videos_video_id_delete(video_id)
[docs]
def upload_video(
*, project_id: str, name: str, description: str, video_file: str | Path, **kwargs
) -> OutputGetVideoResponse | None:
"""upload_video uploads the local video located at path `video_file` with the
specified name, description, and project_id
"""
if project_id == "":
raise ValueError("unable to create static video, project id is empty")
if name == "":
raise ValueError("unable to create static video, name is empty")
if description == "":
raise ValueError("unable to create static video, description is empty")
if isinstance(video_file, Path):
video_file = str(video_file)
if video_file == "":
raise ValueError("unable to create static video, video file is empty")
body = {
"project_id": project_id,
"name": name,
"description": description,
"video_file": video_file,
"video_type": "batch",
**kwargs,
}
request_body = InputCreateVideoRequest.from_dict(body)
if request_body is None:
raise Exception("unable to create static video, request body is invalid")
response = _apis.video.videos_api.api_videos_v1_videos_post(video=request_body)
if response.data is None or response.data.id is None:
raise Exception("unable to create static video, empty response")
video_id = response.data.id
upload = _apis.video.uploads_api.api_videos_v1_videos_video_id_uploads_post(
video_id=video_id,
upload=InputCreateUploadRequest(
size=os.path.getsize(video_file),
type="video",
),
)
upload_id = upload.upload_id
if upload_id is None:
raise Exception("unable to create static video, empty upload id")
upload_urls = upload.urls
if upload_urls is None:
raise Exception("unable to create static video, empty upload urls")
log.debug(
f"created upload {upload_id} for video {video_id} with {len(upload_urls)} parts and part_size={PART_SIZE}"
)
signed_urls: list[str] = [u.url for u in upload_urls if u.url is not None]
if signed_urls is None:
raise Exception("unable to create static video, empty signed upload urls")
etags = _upload_parts(video_file, signed_urls, PART_SIZE)
log.debug(f"upload contained the following etags: {etags}")
_apis.video.uploads_api.api_videos_v1_uploads_upload_id_complete_post(
upload_id=upload_id,
request=InputCompleteUploadRequest(
video_id=video_id,
parts=[{"etag": etag, "part_number": idx + 1} for idx, etag in enumerate(etags)],
),
)
return _apis.video.videos_api.api_videos_v1_videos_video_id_get(video_id=video_id).data
[docs]
def wait_for_stream_connection(
address: str, max_attempts: int = 6, wait_interval: int = 10
) -> bool:
"""Wait for a stream to be available."""
for attempt in range(max_attempts):
connection_response = (
_apis.video.h_l_s_api.api_videos_v1_hls_check_get_without_preload_content(address)
)
if connection_response.status == 200:
log.info(f"stream found {address} to be connectable")
return True
log.debug(
f"unable to connect to stream at {address}, attempt ({attempt + 1}/{max_attempts})"
)
time.sleep(wait_interval)
return False
[docs]
def stream_video(
*, project_id: str, name: str, description: str, url: str, **kwargs
) -> OutputGetVideoResponse | None:
"""Create a streaming video."""
if project_id == "":
raise ValueError("unable to create streaming video, project id is empty")
if name == "":
raise ValueError("unable to create streaming video, name is empty")
if description == "":
raise ValueError("unable to create streaming video, description is empty")
if url == "":
raise ValueError("unable to create streaming video, url is empty")
body = {
"project_id": project_id,
"name": name,
"description": description,
"url": url,
"video_type": "stream",
**kwargs,
}
request_body = InputCreateVideoRequest.from_dict(body)
if request_body is None:
raise Exception("unable to create streaming video, request body is invalid")
response = _apis.video.videos_api.api_videos_v1_videos_post(video=request_body)
if response.data is None or response.data.id is None:
raise Exception("unable to create streaming video, empty response")
video_id = str(response.data.id)
return _apis.video.videos_api.api_videos_v1_videos_video_id_get(video_id=video_id).data
[docs]
def process_video(video_id: str, **kwargs) -> OutputGetVideoRequestResponse | None:
"""Process a video."""
body = {"video_id": video_id, **kwargs}
request_body = InputCreateVideoRequestRequest.from_dict(body)
if request_body is None:
raise Exception("unable to process video, request body is invalid")
response = _apis.video.video_requests_api.api_videos_v1_video_requests_post(
request_body=request_body,
)
if response.data is None or response.data.id is None:
raise Exception("unable to process video, empty response")
video_request_id = response.data.id
return get_video_request(video_request_id)
[docs]
def get_video_requests(
*, project_id: str, video_id: str
) -> list[OutputGetVideoRequestResponse] | None:
"""Get video requests within a project or associated to a video."""
if (not project_id or project_id == "") and (not video_id or video_id == ""):
raise ValueError("unable to get video requests, project id or video id must be provided")
if not project_id or project_id == "":
project_id = _apis.video.videos_api.api_videos_v1_videos_video_id_get(
video_id
).data.project_id
response = _apis.video.video_requests_api.api_videos_v1_video_requests_get(
project_id=project_id, video_id=video_id
)
return response.data
[docs]
def get_video_request(video_request_id: str) -> OutputGetVideoRequestResponse | None:
"""Get a video request by id."""
if video_request_id == "":
raise ValueError("unable to get video request, video request id is empty")
response = _apis.video.video_requests_api.api_videos_v1_video_requests_video_request_id_get(
video_request_id=video_request_id
)
return response.data
[docs]
def wait_for_video_request(
*,
video_request_id: str,
wait_interval: int = 5,
timeout: int = 60,
statuses: list[str] = ["success"],
) -> OutputGetVideoRequestResponse:
"""Periodically check if a video request has achieved one of statuses provided."""
if video_request_id == "":
raise ValueError("unable to wait for video request, video request id is empty")
if len(statuses) <= 0:
raise Exception("unable to wait for video request, no statuses provided")
start = datetime.datetime.now()
video_request = get_video_request(video_request_id=video_request_id)
if video_request is None:
raise Exception("unable to wait for video request, unable to get video request")
while video_request.status not in statuses:
if (datetime.datetime.now() - start).total_seconds() > timeout:
raise TimeoutError(
f"video request {video_request_id!r} did not achieve one of the provided statuses within {timeout} seconds"
)
time.sleep(wait_interval)
video_request = get_video_request(video_request_id=video_request_id)
if video_request is None:
raise Exception("unable to wait for video request, unable to get video request")
return video_request
[docs]
def delete_video_request(video_request_id: str) -> OutputDeleteVideoRequestResponse | None:
"""Delete a video request by id."""
if video_request_id == "":
raise ValueError("unable to get video request, video request id is empty")
response = _apis.video.video_requests_api.api_videos_v1_video_requests_video_request_id_delete(
video_request_id=video_request_id
)
return response.data
[docs]
def create_tracker_request(video_request_id: str, **kwargs) -> OutputTrackerRequestResponse | None:
"""Create a tracker request."""
body = {"video_request_id": video_request_id, **kwargs}
request_body = InputCreateTrackerRequestRequest.from_dict(body)
if request_body is None:
raise Exception("unable to create tracker request, request body is invalid")
response = _apis.video.tracker_requests_api.api_videos_v1_tracker_request_post(
request_body=request_body
)
return response.data
[docs]
def get_tracker_request(tracker_request_id: str) -> OutputTrackerRequestResponse | None:
"""Get a tracker request."""
if tracker_request_id == "":
raise ValueError("unable to get tracker request, tracker request id is empty")
response = (
_apis.video.tracker_requests_api.api_videos_v1_tracker_requests_tracker_request_id_get(
tracker_request_id=tracker_request_id
)
)
return response.data
[docs]
def wait_for_tracker_request(
*,
tracker_request_id: str,
wait_interval: int = 5,
timeout: int = 60,
statuses: list[str] = ["success"],
) -> OutputTrackerRequestResponse:
"""Periodically check if a tracker request has achieved one of statuses provided."""
if tracker_request_id == "":
raise ValueError("unable to wait for tracker request, tracker request id is empty")
if len(statuses) <= 0:
raise Exception("unable to wait for tracker request, no statuses provided")
start = datetime.datetime.now()
tracker_request = get_tracker_request(tracker_request_id=tracker_request_id)
if tracker_request is None:
raise Exception("unable to wait for tracker request, unable to get tracker request")
while tracker_request.status not in statuses:
if (datetime.datetime.now() - start).total_seconds() > timeout:
raise TimeoutError(
f"tracker request {tracker_request_id!r} did not achieve one of the provided statuses within {timeout} seconds"
)
time.sleep(wait_interval)
tracker_request = get_tracker_request(tracker_request_id=tracker_request_id)
if tracker_request is None:
raise Exception("unable to wait for tracker request, unable to get tracker request")
return tracker_request
[docs]
def update_tracker_request(
*, tracker_request_id: str, **kwargs
) -> OutputTrackerRequestResponse | None:
"""Update a tracker request."""
body = {**kwargs}
request_body = InputUpdateTrackerRequestRequest.from_dict(body)
if request_body is None:
raise Exception("unable to update tracker request, request body is invalid")
response = (
_apis.video.tracker_requests_api.api_videos_v1_tracker_requests_tracker_request_id_patch(
tracker_request_id=tracker_request_id,
request_body=request_body,
)
)
return response.data
[docs]
def delete_tracker_request(tracker_request_id: str) -> OutputDeleteTrackerRequestResponse | None:
"""Delete a tracker request."""
if tracker_request_id == "":
raise ValueError("unable to delete tracker request, tracker request id is empty")
response = (
_apis.video.tracker_requests_api.api_videos_v1_tracker_request_tracker_request_id_delete(
tracker_request_id=tracker_request_id
)
)
return response.data
[docs]
def get_video_asset(key: str) -> urllib3.HTTPResponse:
"""Get a video asset."""
return _apis.video.h_l_s_api.api_videos_v1_hls_key_get_without_preload_content(key)