diff --git a/ayon_api/utils.py b/ayon_api/utils.py index 7294c110b..e85f72270 100644 --- a/ayon_api/utils.py +++ b/ayon_api/utils.py @@ -12,9 +12,9 @@ import traceback import collections import itertools -from urllib.parse import urlparse, urlencode +from urllib.parse import urlparse, urlencode, ParseResult import typing -from typing import Optional, Any, Iterable, Union +from typing import Any, Iterable from enum import IntEnum import requests @@ -148,7 +148,7 @@ def content(self): return self._response.content @property - def content_type(self) -> Optional[str]: + def content_type(self) -> str | None: return self.headers.get("Content-Type") @property @@ -256,7 +256,7 @@ def fill_own_attribs(entity: AnyEntityDict) -> None: own_attrib[key] = copy.deepcopy(value) -def _convert_filter_value(value: Any) -> Optional[list[Any]]: +def _convert_filter_value(value: Any) -> list[Any] | None: if value is None: return None @@ -318,11 +318,11 @@ def get_machine_name() -> str: return unidecode.unidecode(platform.node()) -def get_default_site_id() -> Optional[str]: +def get_default_site_id() -> str | None: """Site id used for server connection. Returns: - Optional[str]: Site id from environment variable or None. + str | None: Site id from environment variable or None. """ return os.environ.get(SITE_ID_ENV_KEY) @@ -333,25 +333,25 @@ class ThumbnailContent: Args: project_name (str): Project name. - thumbnail_id (Optional[str]): Thumbnail id. - content (Optional[bytes]): Thumbnail content. - content_type (Optional[str]): Content type e.g. 'image/png'. + thumbnail_id (str | None): Thumbnail id. + content (bytes | None): Thumbnail content. + content_type (str | None): Content type e.g. 'image/png'. """ def __init__( self, project_name: str, - thumbnail_id: Optional[str], - content: Optional[bytes], - content_type: Optional[str], + thumbnail_id: str | None, + content: bytes | None, + content_type: str | None, ): self.project_name: str = project_name - self.thumbnail_id: Optional[str] = thumbnail_id - self.content_type: Optional[str] = content_type + self.thumbnail_id: str | None = thumbnail_id + self.content_type: str | None = content_type self.content: bytes = content or b"" @property - def id(self) -> str: + def id(self) -> str | None: """Wrapper for thumbnail id.""" return self.thumbnail_id @@ -394,14 +394,14 @@ def prepare_query_string( if not key_values: return "" - return "?{}".format(urlencode(key_values)) + return f"?{urlencode(key_values)}" def create_entity_id() -> str: return uuid.uuid1().hex -def convert_entity_id(entity_id) -> Optional[str]: +def convert_entity_id(entity_id) -> str | None: if not entity_id: return None @@ -416,7 +416,7 @@ def convert_entity_id(entity_id) -> Optional[str]: return None -def convert_or_create_entity_id(entity_id: Optional[str] = None) -> str: +def convert_or_create_entity_id(entity_id: str | None = None) -> str: output = convert_entity_id(entity_id) if output is None: output = create_entity_id() @@ -428,7 +428,7 @@ def entity_data_json_default(value: Any) -> Any: return int(value.timestamp()) raise TypeError( - "Object of type {} is not JSON serializable".format(str(type(value))) + f"Object of type {type(value)} is not JSON serializable" ) @@ -440,7 +440,7 @@ def slugify_string( min_length: int = 1, lower: bool = False, make_set: bool = False, -) -> Union[str, set[str]]: +) -> str | set[str]: """Slugify a text string. This function removes transliterates input string to ASCII, removes @@ -460,8 +460,7 @@ def slugify_string( min_length (int): Minimal length of an element (word). Returns: - Union[str, set[str]]: Based on 'make_set' value returns slugified - string. + str | set[str]: Based on 'make_set' value returns slugified string. """ tmp_string = unidecode.unidecode(input_string) @@ -486,14 +485,14 @@ def slugify_string( def failed_json_default(value: Any) -> str: - return "< Failed value {} > {}".format(type(value), str(value)) + return f"< Failed value {type(value)} > {value}" def prepare_attribute_changes( old_entity: AnyEntityDict, new_entity: AnyEntityDict, replace: int = False, -): +) -> dict[str, Any]: attrib_changes = {} new_attrib = new_entity.get("attrib") old_attrib = old_entity.get("attrib") @@ -544,7 +543,7 @@ def prepare_entity_changes( return changes -def _try_parse_url(url: str) -> Optional[str]: +def _try_parse_url(url: str) -> ParseResult | None: try: return urlparse(url) except BaseException: @@ -553,10 +552,10 @@ def _try_parse_url(url: str) -> Optional[str]: def _try_connect_to_server( url: str, - timeout: Optional[float], - verify: Optional[Union[str, bool]], - cert: Optional[str], -) -> Optional[str]: + timeout: float | None, + verify: str | bool | None, + cert: str | None, +) -> str | None: if timeout is None: timeout = get_default_timeout() @@ -590,19 +589,19 @@ def login_to_server( url: str, username: str, password: str, - timeout: Optional[float] = None, -) -> Optional[str]: + timeout: float | None = None, +) -> str | None: """Use login to the server to receive token. Args: url (str): Server url. username (str): User's username. password (str): User's password. - timeout (Optional[float]): Timeout for request. Value from + timeout (float | None): Timeout for request. Value from 'get_default_timeout' is used if not specified. Returns: - Optional[str]: User's token if login was successfull. + str | None: User's token if login was successfull. Otherwise 'None'. """ @@ -610,7 +609,7 @@ def login_to_server( timeout = get_default_timeout() headers = {"Content-Type": "application/json"} response = requests.post( - "{}/api/auth/login".format(url), + f"{url}/api/auth/login", headers=headers, json={ "name": username, @@ -627,13 +626,17 @@ def login_to_server( return token -def logout_from_server(url: str, token: str, timeout: Optional[float] = None): +def logout_from_server( + url: str, + token: str, + timeout: float | None = None, +) -> None: """Logout from server and throw token away. Args: url (str): Url from which should be logged out. token (str): Token which should be used to log out. - timeout (Optional[float]): Timeout for request. Value from + timeout (float | None): Timeout for request. Value from 'get_default_timeout' is used if not specified. """ @@ -641,10 +644,10 @@ def logout_from_server(url: str, token: str, timeout: Optional[float] = None): timeout = get_default_timeout() headers = { "Content-Type": "application/json", - "Authorization": "Bearer {}".format(token) + "Authorization": f"Bearer {token}", } requests.post( - url + "/api/auth/logout", + f"{url}/api/auth/logout", headers=headers, timeout=timeout, ) @@ -653,18 +656,18 @@ def logout_from_server(url: str, token: str, timeout: Optional[float] = None): def get_user_by_token( url: str, token: str, - timeout: Optional[float] = None, -) -> Optional[dict[str, Any]]: + timeout: float | None = None, +) -> dict[str, Any] | None: """Get user information by url and token. Args: url (str): Server url. token (str): User's token. - timeout (Optional[float]): Timeout for request. Value from + timeout (float | None): Timeout for request. Value from 'get_default_timeout' is used if not specified. Returns: - Optional[dict[str, Any]]: User information if url and token are valid. + dict[str, Any] | None: User information if url and token are valid. """ if timeout is None: @@ -674,13 +677,13 @@ def get_user_by_token( "Content-Type": "application/json", } for header_value in ( - {"Authorization": "Bearer {}".format(token)}, + {"Authorization": f"Bearer {token}"}, {"X-Api-Key": token}, ): headers = base_headers.copy() headers.update(header_value) response = requests.get( - "{}/api/users/me".format(url), + f"{url}/api/users/me", headers=headers, timeout=timeout, ) @@ -692,7 +695,7 @@ def get_user_by_token( def is_token_valid( url: str, token: str, - timeout: Optional[float] = None, + timeout: float | None = None, ) -> bool: """Check if token is valid. @@ -701,7 +704,7 @@ def is_token_valid( Args: url (str): Server url. token (str): User's token. - timeout (Optional[float]): Timeout for request. Value from + timeout (float | None): Timeout for request. Value from 'get_default_timeout' is used if not specified. Returns: @@ -715,9 +718,9 @@ def is_token_valid( def validate_url( url: str, - timeout: Optional[int] = None, - verify: Optional[Union[str, bool]] = None, - cert: Optional[str] = None, + timeout: int | None = None, + verify: str | bool | None = None, + cert: str | None = None, ) -> str: """Validate url if is valid and server is available. @@ -740,7 +743,7 @@ def validate_url( Args: url (str): Server url. - timeout (Optional[int]): Timeout in seconds for connection to server. + timeout (int | None): Timeout in seconds for connection to server. Returns: Url which was used to connect to server. @@ -818,25 +821,25 @@ def __init__(self): self._started: bool = False self._transfer_done: bool = False self._transferred: int = 0 - self._content_size: Optional[int] = None + self._content_size: int | None = None self._failed: bool = False - self._fail_reason: Optional[str] = None + self._fail_reason: str | None = None self._source_url: str = "N/A" self._destination_url: str = "N/A" - def get_content_size(self): + def get_content_size(self) -> int | None: """Content size in bytes. Returns: - Union[int, None]: Content size in bytes or None + int | None: Content size in bytes or None if is unknown. """ return self._content_size - def set_content_size(self, content_size: int): + def set_content_size(self, content_size: int) -> None: """Set content size in bytes. Args: @@ -859,7 +862,7 @@ def get_started(self) -> bool: """ return self._started - def set_started(self): + def set_started(self) -> None: """Mark that transfer started. Raises: @@ -890,7 +893,7 @@ def get_transfer_done(self) -> bool: """ return self._transfer_done - def set_transfer_done(self): + def set_transfer_done(self) -> None: """Mark progress as transfer finished. Raises: @@ -913,17 +916,17 @@ def get_failed(self) -> bool: """ return self._failed - def get_fail_reason(self) -> Optional[str]: + def get_fail_reason(self) -> str | None: """Get reason why transfer failed. Returns: - Optional[str]: Reason why transfer + str | None: Reason why transfer failed or None. """ return self._fail_reason - def set_failed(self, reason: str): + def set_failed(self, reason: str) -> None: """Mark progress as failed. Args: @@ -942,7 +945,7 @@ def get_transferred_size(self) -> int: """ return self._transferred - def set_transferred_size(self, transferred: int): + def set_transferred_size(self, transferred: int) -> None: """Set already transferred size in bytes. Args: @@ -955,7 +958,7 @@ def reset_transferred(self) -> None: """Reset transferred size to initial value.""" self._transferred = 0 - def add_transferred_chunk(self, chunk_size: int): + def add_transferred_chunk(self, chunk_size: int) -> None: """Add transferred chunk size in bytes. Args: @@ -978,7 +981,7 @@ def get_source_url(self) -> str: """ return self._source_url - def set_source_url(self, url: str): + def set_source_url(self, url: str) -> None: """Set source url from where transfer happens. Args: @@ -1000,7 +1003,7 @@ def get_destination_url(self) -> str: """ return self._destination_url - def set_destination_url(self, url: str): + def set_destination_url(self, url: str) -> None: """Set destination url where transfer happens. Args: @@ -1026,11 +1029,11 @@ def is_running(self) -> bool: return True @property - def transfer_progress(self) -> Optional[float]: + def transfer_progress(self) -> float | None: """Get transfer progress in percents. Returns: - Optional[float]: Transfer progress in percents or 'None' + float | None: Transfer progress in percents or 'None' if content size is unknown. """ @@ -1049,12 +1052,12 @@ def transfer_progress(self) -> Optional[float]: def create_dependency_package_basename( - platform_name: Optional[str] = None + platform_name: str | None = None ) -> str: """Create basename for dependency package file. Args: - platform_name (Optional[str]): Name of platform for which the + platform_name (str | None): Name of platform for which the bundle is targeted. Default value is current platform. Returns: @@ -1066,11 +1069,11 @@ def create_dependency_package_basename( now_date = datetime.datetime.now() time_stamp = now_date.strftime("%y%m%d%H%M") - return "ayon_{}_{}".format(time_stamp, platform_name) + return f"ayon_{time_stamp}_{platform_name}" -def _get_media_mime_type_from_ftyp(content: bytes) -> Optional[str]: +def _get_media_mime_type_from_ftyp(content: bytes) -> str | None: if content[8:10] == b"qt" or content[8:12] == b"MSNV": return "video/quicktime" @@ -1110,7 +1113,7 @@ def _get_media_mime_type_from_ftyp(content: bytes) -> Optional[str]: return None -def _get_media_mime_type_for_content_base(content: bytes) -> Optional[str]: +def _get_media_mime_type_for_content_base(content: bytes) -> str | None: """Determine Mime-Type of a file. Use header of the file to determine mime type (needs 12 bytes). @@ -1173,14 +1176,14 @@ def _get_media_mime_type_for_content_base(content: bytes) -> Optional[str]: return None -def _get_svg_mime_type(content: bytes) -> Optional[str]: +def _get_svg_mime_type(content: bytes) -> str | None: # SVG if b'xmlns="http://www.w3.org/2000/svg"' in content: return "image/svg+xml" return None -def _get_json_mime_type(content: bytes) -> Optional[str]: +def _get_json_mime_type(content: bytes) -> str | None: # json try: json.loads(content.decode("utf-8")) @@ -1190,14 +1193,14 @@ def _get_json_mime_type(content: bytes) -> Optional[str]: return None -def get_media_mime_type_for_content(content: bytes) -> Optional[str]: +def get_media_mime_type_for_content(content: bytes) -> str | None: mime_type = _get_media_mime_type_for_content_base(content) if mime_type is not None: return mime_type return _get_svg_mime_type(content) or _get_json_mime_type(content) -def get_media_mime_type_for_stream(stream: StreamType) -> Optional[str]: +def get_media_mime_type_for_stream(stream: StreamType) -> str | None: # Read only 12 bytes to determine mime type content = stream.read(12) mime_type = _get_media_mime_type_for_content_base(content) @@ -1208,14 +1211,14 @@ def get_media_mime_type_for_stream(stream: StreamType) -> Optional[str]: return _get_svg_mime_type(content) or _get_json_mime_type(content) -def get_media_mime_type(filepath: str) -> Optional[str]: +def get_media_mime_type(filepath: str) -> str | None: """Determine Mime-Type of a file. Args: filepath (str): Path to file. Returns: - Optional[str]: Mime type or None if is unknown mime type. + str | None: Mime type or None if is unknown mime type. """ if not filepath or not os.path.exists(filepath):