Source code for chariot.client

import json
import os

import urllib3
from pydantic import SecretStr

from chariot import _apis
from chariot.cert import get_cert_and_key_from_p12
from chariot.config import get_bearer_token, getLogger, settings

__all__ = [
    "connect",
    "get_version",
]

log = getLogger(__name__)


[docs] def get_version(): return _apis.get_version(settings)
[docs] def connect( *, credentials: str | None = None, host: str | None = None, bearer_token: str | None = None, client_id: str | None = None, client_secret: str | None = None, client_scope: str | None = None, key_filename: str | None = None, cert_filename: str | None = None, pkcs12: str | bytes | None = None, pkcs12_password: str | None = None, retries: urllib3.Retry | None = None, timeout: urllib3.Timeout | None = None, **kwargs, ) -> None: """Connect to Chariot API. Parameters ---------- credentials: str path to a credentials json file downloaded from Chariot host: str host url of Chariot API bearer_token: str bearer token of Chariot user client_id: str client id client_secret: str client secret client_scope: str space delimited string of desired scopes for access token of client key_filename: str filename of client certificate private key. Client ``key_filename`` and ``cert_filename`` are often required on closed networks to perform PKI authentication. Used for PKI authentication. cert_filename: str filename of client certificate. Used for PKI authentication. pkcs12: str | bytes p12 filename or bytes that will be used to extract client cert and key only if both cert_filename and key_filename are not specified. Used for PKI authentication. pkcs12_password: str password used to decrypt p12 file. Used for PKI authentication. retries: Optional[urllib3.Retry] retry configuration timeout: Optional[urllib3.Timeout] timeout configuration Returns ------- None """ if credentials is not None: loaded = json.loads(open(credentials).read()) for k, v in loaded.items(): setattr(settings, k, v) settings.base_url = loaded["url"] settings.client_scope = loaded["scope"] settings.client_secret = SecretStr(loaded["secret"]) # Set settings to connect to host # Client settings are used to retrieve bearer token if one is not supplied if host and not host.startswith("http"): raise RuntimeError(f"must supply http or https scheme with host '{host}'") base_url = host or settings.base_url if not base_url and "CHARIOT_DOMAIN" in os.environ: base_url = f"https://{os.environ['CHARIOT_DOMAIN']}" settings.base_url = base_url if client_scope is not None: settings.client_scope = client_scope if client_id is not None and client_secret is not None: if bearer_token is not None: raise RuntimeError( "must either provide client_id and client_secret OR bearer_token, but not both" ) settings.client_id = client_id settings.client_secret = SecretStr(client_secret) elif bearer_token is not None: settings.bearer_token = bearer_token _apis.set_bearer_token(settings.bearer_token) elif os.environ.get("access_token"): settings.bearer_token = os.environ["access_token"] _apis.set_bearer_token(settings.bearer_token) if key_filename: settings.key_filename = key_filename if cert_filename: settings.cert_filename = cert_filename if retries is not None: settings.retries = retries if timeout is not None: settings.timeout = timeout for k, v in kwargs.items(): setattr(settings, k, v) if not key_filename and not cert_filename: if pkcs12: settings.pkcs12 = pkcs12 settings.pkcs12_password = pkcs12_password settings.key_filename, settings.cert_filename = get_cert_and_key_from_p12( settings.pkcs12, settings.pkcs12_password ) else: log.debug( "No key_filename and cert_filename or pkcs12 supplied. SDK will not work with endpoint requiring PKIs" ) if not settings.bearer_token: settings.bearer_token = get_bearer_token() _apis.build_apis(settings)