diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e18addc..2b8ec6b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -14,4 +14,4 @@ on: jobs: call_ci: - uses: EffectiveRange/ci-workflows/.github/workflows/python-ci.yaml@v5 + uses: EffectiveRange/ci-workflows/.github/workflows/python-ci.yaml@latest-python diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..28c53ee --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "args": [ + "--backend=scipy" + ], + "console": "integratedTerminal" + } + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json index 23c4671..fbf984b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,23 +1,20 @@ { - "editor.rulers": [ - 120 - ], - "[python]": { - "editor.defaultFormatter": "ms-python.black-formatter", - "editor.formatOnSave": true - }, - "black-formatter.args": [ - "--skip-string-normalization", - "--line-length", - "120" - ], - "python.testing.unittestArgs": [ - "-v", - "-s", - "./tests", - "-p", - "*test.py" - ], - "python.testing.pytestEnabled": false, - "python.testing.unittestEnabled": true + "python.venvPath": "${workspaceFolder}/.venv", + "python.testing.unittestArgs": [ + "-v", + "-s", + "./tests", + "-p", + "*test.py" + ], + "python.testing.pytestEnabled": true, + "python.testing.unittestEnabled": false, + "black-formatter.interpreter": [ + "${workspaceFolder}/.venv/bin/python3" + ], + "black-formatter.args": [ + "--config=setup.cfg" + ], + "python.analysis.typeCheckingMode": "standard", + "python.testing.pytestArgs": [] } diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..328d1b1 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,15 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "Create Python venv", + "type": "shell", + "command": "if [ -d /var/chroot/buildroot ];then dpkgdeps -v --arch $(grep TARGET_ARCH /home/crossbuilder/target/target | cut -d'=' -f2 | tr -d \\') .;else dpkgdeps -v .;fi && rm -rf .venv && python3 -m venv --system-site-packages .venv && .venv/bin/pip install -e . && .venv/bin/python3 -m mypy --non-interactive --install-types && .venv/bin/pip install pytest-cov || true", + "group": "build", + "detail": "Creates a Python virtual environment in the .venv folder", + "problemMatcher": [ + "$eslint-compact" + ] + } + ] +} diff --git a/common_utility/configLoader.py b/common_utility/configLoader.py index 2b09780..b505e02 100644 --- a/common_utility/configLoader.py +++ b/common_utility/configLoader.py @@ -3,13 +3,14 @@ # SPDX-License-Identifier: MIT import os -import shutil from configparser import ConfigParser from pathlib import Path from typing import Any from context_logger import get_logger +from common_utility import copy_file + log = get_logger('ConfigLoader') @@ -21,29 +22,37 @@ def load(self, arguments: dict[str, Any]) -> dict[str, Any]: class ConfigLoader(IConfigLoader): - def __init__(self, resource_root: str, default_config: str, config_file_argument: str = 'config_file') -> None: - self._resource_root = resource_root - self._default_config = f'{self._resource_root}/{default_config}' + def __init__(self, default_config_file: Path, config_file_argument: str = 'config_file') -> None: + self._default_config_file = default_config_file self._config_file_argument = config_file_argument def load(self, arguments: dict[str, Any]) -> dict[str, Any]: - config_file = Path(arguments[self._config_file_argument]) + parser = ConfigParser(interpolation=None) - if not os.path.exists(config_file): - log.info('Loading default configuration file', config_file=self._default_config) - config_file.parent.mkdir(parents=True, exist_ok=True) - shutil.copyfile(self._default_config, config_file) - else: - log.info('Using configuration file', config_file=str(config_file)) + log.info('Loading default configuration', config_file=str(self._default_config_file)) + parser.read(self._default_config_file) - parser = ConfigParser(interpolation=None) - parser.read(config_file) + if config_file := arguments.get(self._config_file_argument): + custom_config_file = Path(config_file) + + if os.path.exists(custom_config_file): + log.info('Loading custom configuration', config_file=str(custom_config_file)) + parser.read(custom_config_file) + else: + try: + log.info('Creating custom configuration using default', config_file=str(custom_config_file)) + copy_file(self._default_config_file, custom_config_file) + except Exception as exception: + log.warn('Failed to create custom configuration file', error=str(exception)) configuration = {} for section in parser.sections(): configuration.update(dict(parser[section])) + log.info('Loading command line arguments', arguments=arguments) configuration.update(arguments) + log.info('Configuration loaded', configuration=configuration) + return configuration diff --git a/common_utility/fileDownloader.py b/common_utility/fileDownloader.py index b1a2ad7..cfbce76 100644 --- a/common_utility/fileDownloader.py +++ b/common_utility/fileDownloader.py @@ -3,7 +3,8 @@ # SPDX-License-Identifier: MIT import os -from typing import Optional +from pathlib import Path +from typing import Optional, Union from urllib.parse import urlparse from context_logger import get_logger @@ -16,51 +17,33 @@ class IFileDownloader(object): - def download( - self, - file_url: str, - file_name: Optional[str] = None, - sub_dir: Optional[str] = None, - headers: Optional[dict[str, str]] = None, - skip_if_exists: bool = True, - chunk_size: int = 1000 * 1000, - ) -> str: + def download(self, file_url: str, file_name: Optional[str] = None, sub_dir: Optional[Union[str, Path]] = None, + headers: Optional[dict[str, str]] = None, skip_if_exists: bool = True, chunk_size: int = 1000 * 1000 + ) -> Path: raise NotImplementedError() - def download_and_copy( - self, - file_url: str, - sub_dirs: list[str], - file_name: Optional[str] = None, - headers: Optional[dict[str, str]] = None, - skip_if_exists: bool = True, - chunk_size: int = 1000 * 1000, - ) -> list[str]: + def download_and_copy(self, file_url: str, sub_dirs: list[Union[str, Path]], file_name: Optional[str] = None, + headers: Optional[dict[str, str]] = None, skip_if_exists: bool = True, + chunk_size: int = 1000 * 1000) -> list[Path]: raise NotImplementedError() class FileDownloader(IFileDownloader): - def __init__(self, session_provider: ISessionProvider, download_location: str) -> None: + def __init__(self, session_provider: ISessionProvider, download_location: Path) -> None: self._session_provider = session_provider self._download_location = download_location - def download( - self, - file_url: str, - file_name: Optional[str] = None, - sub_dir: Optional[str] = None, - headers: Optional[dict[str, str]] = None, - skip_if_exists: bool = True, - chunk_size: int = 1000 * 1000, - ) -> str: + def download(self, file_url: str, file_name: Optional[str] = None, sub_dir: Optional[Union[str, Path]] = None, + headers: Optional[dict[str, str]] = None, skip_if_exists: bool = True, chunk_size: int = 1000 * 1000 + ) -> Path: if not urlparse(file_url).scheme: return self._check_local_file(file_url) file_path = self._get_target_path(file_url, file_name, sub_dir) if skip_if_exists and os.path.isfile(file_path): - log.info('File already exists, skipping download', file=file_path) + log.info('File already exists, skipping download', file=str(file_path)) return file_path headers = headers if headers else dict() @@ -71,19 +54,13 @@ def download( self._download_file(response, file_path, chunk_size) - log.info('Downloaded file', file=file_path) + log.info('Downloaded file', file=str(file_path)) return file_path - def download_and_copy( - self, - file_url: str, - sub_dirs: list[str], - file_name: Optional[str] = None, - headers: Optional[dict[str, str]] = None, - skip_if_exists: bool = True, - chunk_size: int = 1000 * 1000, - ) -> list[str]: + def download_and_copy(self, file_url: str, sub_dirs: list[Union[str, Path]], file_name: Optional[str] = None, + headers: Optional[dict[str, str]] = None, skip_if_exists: bool = True, + chunk_size: int = 1000 * 1000) -> list[Path]: if not sub_dirs: raise ValueError('At least one sub directory must be provided') @@ -93,22 +70,22 @@ def download_and_copy( file_path = self._get_target_path(file_url, file_name, sub_dir) if skip_if_exists and os.path.isfile(file_path): - log.info('File already exists, skipping copy', file=file_path) + log.info('File already exists, skipping copy', file=str(file_path)) else: copy_file(downloaded_files[0], file_path) - log.info('Copied downloaded file', file=file_path) + log.info('Copied downloaded file', file=str(file_path)) downloaded_files.append(file_path) return downloaded_files - def _check_local_file(self, file_url: str) -> str: + def _check_local_file(self, file_url: str) -> Path: file_path = os.path.abspath(file_url) if os.path.isfile(file_path): - log.info('Local file path provided, skipping download', file=file_path) - return file_path + log.info('Local file path provided, skipping download', file=str(file_path)) + return Path(file_path) else: - log.error('Local file does not exist', file=file_path) + log.error('Local file does not exist', file=str(file_path)) raise ValueError('Local file does not exist') def _send_request(self, file_url: str, headers: dict[str, str]) -> Response: @@ -121,21 +98,21 @@ def _send_request(self, file_url: str, headers: dict[str, str]) -> Response: return response - def _get_target_path(self, file_url: str, file_name: Optional[str], sub_dir: Optional[str] = None) -> str: - if not file_name and '/' in file_url: + def _get_target_path(self, file_url: str, file_name: Optional[str], + sub_dir: Optional[Union[str, Path]] = None) -> Path: + if not file_name: file_name = file_url.split('/')[-1] download_dir = self._download_location if sub_dir: - download_dir += f'/{sub_dir}' + download_dir = download_dir / sub_dir create_directory(download_dir) - return f'{download_dir}/{file_name}' + return download_dir / file_name - def _download_file(self, response: Response, file_path: str, chunk_size: int) -> None: - if not os.path.exists(self._download_location): - os.makedirs(self._download_location) + def _download_file(self, response: Response, file_path: Path, chunk_size: int) -> None: + create_directory(self._download_location) with open(file_path, 'wb') as asset_file: for chunk in response.iter_content(chunk_size): diff --git a/common_utility/fileUtility.py b/common_utility/fileUtility.py index 20f30ce..bdca737 100644 --- a/common_utility/fileUtility.py +++ b/common_utility/fileUtility.py @@ -6,38 +6,39 @@ import re import shutil from os.path import exists -from typing import Any +from pathlib import Path +from typing import Any, Union from jinja2 import Environment, FileSystemLoader -def create_directory(directory: str) -> None: +def create_directory(directory: Union[str, Path]) -> None: if not os.path.isdir(directory): os.makedirs(directory, exist_ok=True) -def delete_directory(directory: str) -> None: +def delete_directory(directory: Union[str, Path]) -> None: if os.path.isdir(directory): shutil.rmtree(directory) -def create_file(file_path: str, content: str = '\n') -> None: +def create_file(file_path: Union[str, Path], content: str = '\n') -> None: create_directory(os.path.dirname(file_path)) with open(file_path, 'w') as f: f.write(content) -def copy_file(source: str, destination: str) -> None: +def copy_file(source: Union[str, Path], destination: Union[str, Path]) -> None: create_directory(os.path.dirname(destination)) shutil.copy(source, destination) -def append_file(file_path: str, line: str) -> None: +def append_file(file_path: Union[str, Path], line: str) -> None: with open(file_path, 'a+') as file: file.writelines([f'{line}\n']) -def delete_file(file_path: str) -> None: +def delete_file(file_path: Union[str, Path]) -> None: if exists(file_path): if os.path.islink(file_path): os.unlink(file_path) @@ -45,7 +46,7 @@ def delete_file(file_path: str) -> None: os.remove(file_path) -def is_file_matches_pattern(file_path: str, pattern: str) -> bool: +def is_file_matches_pattern(file_path: Union[str, Path], pattern: str) -> bool: if not exists(file_path): return False @@ -53,7 +54,7 @@ def is_file_matches_pattern(file_path: str, pattern: str) -> bool: return re.search(pattern, file.read(), re.MULTILINE) is not None -def is_file_contains_lines(file: str, expected_lines: list[str]) -> bool: +def is_file_contains_lines(file: Union[str, Path], expected_lines: list[str]) -> bool: if not exists(file): return False @@ -65,14 +66,13 @@ def is_file_contains_lines(file: str, expected_lines: list[str]) -> bool: return file_lines_set == expected_lines_set -def render_template_file(resource_root: str, template_file: str, context: dict[str, Any]) -> str: - template_path = f'{resource_root}/{template_file}' - environment = Environment(loader=FileSystemLoader(os.path.dirname(template_path))) - template = environment.get_template(os.path.basename(template_path)) +def render_template_file(template_file: Union[str, Path], context: dict[str, Any]) -> str: + environment = Environment(loader=FileSystemLoader(os.path.dirname(template_file))) + template = environment.get_template(os.path.basename(template_file)) return f'{template.render(context)}\n' -def replace_in_file(file_path: str, pattern: str, replacement: str) -> None: +def replace_in_file(file_path: Union[str, Path], pattern: str, replacement: str) -> None: if not exists(file_path): return diff --git a/common_utility/jsonLoader.py b/common_utility/jsonLoader.py index 2a82d82..57b0493 100644 --- a/common_utility/jsonLoader.py +++ b/common_utility/jsonLoader.py @@ -4,6 +4,7 @@ import json import os +from pathlib import Path from typing import TypeVar, Type, Union, List, Callable, Any from context_logger import get_logger @@ -16,27 +17,27 @@ class IJsonLoader(object): - def load(self, json_file_path: str, model: Type[T]) -> T: + def load(self, json_file_path: Union[str, Path], model: Type[T]) -> T: raise NotImplementedError() - def load_list(self, json_file_path: str, model: Type[T]) -> List[T]: + def load_list(self, json_file_path: Union[str, Path], model: Type[T]) -> List[T]: raise NotImplementedError() class JsonLoader(IJsonLoader): - def load(self, json_data: str, model: Type[T]) -> T: - data = self._load_data(json_data) + def load(self, json_file_path: Union[str, Path], model: Type[T]) -> T: + data = self._load_data(json_file_path) return self._validate(data, dict, lambda: model(**data)) # type: ignore - def load_list(self, json_data: str, model: Type[T]) -> List[T]: - data = self._load_data(json_data) + def load_list(self, json_file_path: Union[str, Path], model: Type[T]) -> List[T]: + data = self._load_data(json_file_path) return self._validate(data, list, lambda: [model(**item) for item in data]) # type: ignore - def _load_data(self, json_data: str) -> Any: - if os.path.isfile(json_data): + def _load_data(self, json_data: Union[str, Path]) -> Any: + if os.path.isfile(json_data) or isinstance(json_data, Path): with open(json_data, 'r') as json_file: return json.load(json_file) else: diff --git a/python-common-utility.code-workspace b/python-common-utility.code-workspace new file mode 100644 index 0000000..a083c85 --- /dev/null +++ b/python-common-utility.code-workspace @@ -0,0 +1,15 @@ +{ + "folders": [ + { + "path": "." + } + ], + "settings": { + "python.formatting.provider": "black", + "editor.formatOnSave": true, + "mypy-type-checker.ignorePatterns": [ + ], + "flake8.ignorePatterns": [ + ] + } +} diff --git a/python-common-utility.iml b/python-common-utility.iml new file mode 100644 index 0000000..6758849 --- /dev/null +++ b/python-common-utility.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/setup.cfg b/setup.cfg index bbd083a..5bea32d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,7 +8,7 @@ packages = common_utility strict = True [flake8] -exclude = build,dist +exclude = build,dist,.eggs,.venv max-line-length = 120 max-complexity = 10 count = True diff --git a/setup.py b/setup.py index 5196381..8f63338 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,14 @@ -from setuptools import setup +from setuptools import setup, find_packages setup( name='python-common-utility', - version='1.3.0', description='Common utility packages for Python projects', author='Ferenc Nandor Janky & Attila Gombos', author_email='info@effective-range.com', - packages=['common_utility', 'test_utility'], + packages=find_packages(exclude=['tests']), package_data={'common_utility': ['py.typed'], 'test_utility': ['py.typed']}, + use_scm_version=True, + setup_requires=["setuptools_scm"], install_requires=[ 'requests', 'pydantic', diff --git a/test_utility/testUtils.py b/test_utility/testUtils.py index c3f5761..eb8cae8 100644 --- a/test_utility/testUtils.py +++ b/test_utility/testUtils.py @@ -1,9 +1,10 @@ import time from difflib import Differ -from typing import Callable, Any, Optional +from pathlib import Path +from typing import Callable, Any, Optional, Union -def compare_files(file1: str, file2: str, exclude_lines: Optional[list[str]] = None) -> bool: +def compare_files(file1: Union[str, Path], file2: Union[str, Path], exclude_lines: Optional[list[str]] = None) -> bool: with open(file1, 'r') as f1, open(file2, 'r') as f2: lines1 = f1.readlines() lines2 = f2.readlines() @@ -11,7 +12,8 @@ def compare_files(file1: str, file2: str, exclude_lines: Optional[list[str]] = N return compare_lines(lines1, lines2, exclude_lines) -def compare_file_with_lines(file: str, lines: list[str], exclude_lines: Optional[list[str]] = None) -> bool: +def compare_file_with_lines( + file: Union[str, Path], lines: list[str], exclude_lines: Optional[list[str]] = None) -> bool: with open(file, 'r') as f1: file_lines = f1.readlines() diff --git a/tests/config/example.conf b/tests/config/example.conf index 106e425..85cdda9 100644 --- a/tests/config/example.conf +++ b/tests/config/example.conf @@ -1,8 +1,8 @@ [DEFAULT] config_key1 = value1 -config_key2 = value2 +config_key2 = value3 [example] -example_key1 = example1 -example_key2 = example2 +example_key1 = example3 +example_key2 = example4 example_key3 = example3% diff --git a/tests/config/example.default.conf b/tests/config/example.default.conf new file mode 100644 index 0000000..106e425 --- /dev/null +++ b/tests/config/example.default.conf @@ -0,0 +1,8 @@ +[DEFAULT] +config_key1 = value1 +config_key2 = value2 + +[example] +example_key1 = example1 +example_key2 = example2 +example_key3 = example3% diff --git a/tests/configLoaderTest.py b/tests/configLoaderTest.py index 914b7b3..b825991 100644 --- a/tests/configLoaderTest.py +++ b/tests/configLoaderTest.py @@ -1,9 +1,11 @@ +import os.path import unittest +from pathlib import Path from unittest import TestCase from context_logger import setup_logging -from common_utility import delete_file, ConfigLoader +from common_utility import delete_file, ConfigLoader, copy_file from tests import TEST_RESOURCE_ROOT, TEST_FILE_SYSTEM_ROOT @@ -17,11 +19,11 @@ def setUp(self): print() delete_file(f'{TEST_FILE_SYSTEM_ROOT}/etc/example.conf') - def test_load_config(self): + def test_load_config_when_custom_configuration_not_exists(self): # Given - config_loader = ConfigLoader(TEST_RESOURCE_ROOT, 'config/example.conf') + config_loader = ConfigLoader(Path(TEST_RESOURCE_ROOT) / 'config' / 'example.default.conf') arguments = { - 'config_file': f'{TEST_RESOURCE_ROOT}/config/example.conf', + 'config_file': f'{TEST_FILE_SYSTEM_ROOT}/etc/example.conf', 'config_key1': 'new_value1', } @@ -29,26 +31,46 @@ def test_load_config(self): result = config_loader.load(arguments) # Then + self.assertTrue(os.path.exists(f'{TEST_FILE_SYSTEM_ROOT}/etc/example.conf')) self.assertEqual('new_value1', result['config_key1']) self.assertEqual('value2', result['config_key2']) self.assertEqual('example1', result['example_key1']) self.assertEqual('example2', result['example_key2']) - def test_load_default_config_when_config_file_not_found(self): + def test_load_config_when_custom_configuration_exists(self): # Given - config_loader = ConfigLoader(TEST_RESOURCE_ROOT, 'config/example.conf') + config_loader = ConfigLoader(Path(TEST_RESOURCE_ROOT) / 'config' / 'example.default.conf') arguments = { 'config_file': f'{TEST_FILE_SYSTEM_ROOT}/etc/example.conf', 'example_key1': 'new_example1', } + copy_file(f'{TEST_RESOURCE_ROOT}/config/example.conf', f'{TEST_FILE_SYSTEM_ROOT}/etc/example.conf') + # When result = config_loader.load(arguments) # Then self.assertEqual('value1', result['config_key1']) - self.assertEqual('value2', result['config_key2']) + self.assertEqual('value3', result['config_key2']) self.assertEqual('new_example1', result['example_key1']) + self.assertEqual('example4', result['example_key2']) + + def test_load_config_when_fail_to_create_custom_configuration(self): + # Given + config_loader = ConfigLoader(Path(TEST_RESOURCE_ROOT) / 'config' / 'example.default.conf') + arguments = { + 'config_file': '/invalid/path/to/example.conf', + 'config_key1': 'new_value1', + } + + # When + result = config_loader.load(arguments) + + # Then + self.assertEqual('new_value1', result['config_key1']) + self.assertEqual('value2', result['config_key2']) + self.assertEqual('example1', result['example_key1']) self.assertEqual('example2', result['example_key2']) diff --git a/tests/fileDownloaderTest.py b/tests/fileDownloaderTest.py index f46eea0..d167027 100644 --- a/tests/fileDownloaderTest.py +++ b/tests/fileDownloaderTest.py @@ -1,4 +1,5 @@ import unittest +from pathlib import Path from unittest import TestCase from unittest.mock import MagicMock @@ -11,7 +12,7 @@ class FileDownloaderTest(TestCase): - DOWNLOAD_LOCATION = f'{TEST_FILE_SYSTEM_ROOT}/opt/debs' + DOWNLOAD_LOCATION = Path(f'{TEST_FILE_SYSTEM_ROOT}/opt/debs') @classmethod def setUpClass(cls): @@ -31,8 +32,8 @@ def test_download_returns_downloaded_file_path(self): # Then session.get.assert_called_once_with('http://url1/package1.deb', stream=True, headers={}) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/package1.deb', result) - with open(f'{self.DOWNLOAD_LOCATION}/package1.deb', 'rb') as file: + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/package1.deb'), result) + with open(result, 'rb') as file: self.assertEqual(b'content', file.read()) def test_download_returns_downloaded_file_path_when_file_name_specified(self): @@ -46,8 +47,8 @@ def test_download_returns_downloaded_file_path_when_file_name_specified(self): # Then session.get.assert_called_once_with('http://url1/package1.deb', stream=True, headers={}) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/test_package1.deb', result) - with open(f'{self.DOWNLOAD_LOCATION}/test_package1.deb', 'rb') as file: + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/test_package1.deb'), result) + with open(result, 'rb') as file: self.assertEqual(b'content', file.read()) def test_download_returns_downloaded_file_path_when_sub_dir_specified(self): @@ -57,12 +58,12 @@ def test_download_returns_downloaded_file_path_when_sub_dir_specified(self): file_downloader = FileDownloader(session_provider, self.DOWNLOAD_LOCATION) # When - result = file_downloader.download('http://url1/package1.deb', sub_dir='distro') + result = file_downloader.download('http://url1/package1.deb', sub_dir=Path('distro')) # Then session.get.assert_called_once_with('http://url1/package1.deb', stream=True, headers={}) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/distro/package1.deb', result) - with open(f'{self.DOWNLOAD_LOCATION}/distro/package1.deb', 'rb') as file: + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/distro/package1.deb'), result) + with open(result, 'rb') as file: self.assertEqual(b'content', file.read()) def test_download_returns_downloaded_file_path_when_headers_specified(self): @@ -83,8 +84,8 @@ def test_download_returns_downloaded_file_path_when_headers_specified(self): stream=True, headers={'header1': 'value1', 'header2': 'value2'}, ) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/package1.deb', result) - with open(f'{self.DOWNLOAD_LOCATION}/package1.deb', 'rb') as file: + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/package1.deb'), result) + with open(result, 'rb') as file: self.assertEqual(file_content, file.read()) def test_download_raises_error_when_fails_to_download_file(self): @@ -110,7 +111,7 @@ def test_download_returns_downloaded_file_path_when_file_is_present(self): # Then session.get.assert_not_called() - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/package1.deb', result) + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/package1.deb'), result) def test_download_returns_downloaded_file_path_when_file_is_present_and_overwrites(self): # Given @@ -124,7 +125,7 @@ def test_download_returns_downloaded_file_path_when_file_is_present_and_overwrit # Then session.get.assert_called_once_with('http://url1/package1.deb', stream=True, headers={}) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/package1.deb', result) + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/package1.deb'), result) def test_download_returns_local_file_path_when_local_file_is_present(self): # Given @@ -137,7 +138,7 @@ def test_download_returns_local_file_path_when_local_file_is_present(self): result = file_downloader.download(file_path) # Then - self.assertEqual(file_path, result) + self.assertEqual(Path(file_path), result) def test_download_raises_error_when_local_file_is_not_present(self): # Given @@ -158,16 +159,17 @@ def test_download_and_copy_returns_downloaded_and_copied_file_paths(self): file_downloader = FileDownloader(session_provider, self.DOWNLOAD_LOCATION) # When - result = file_downloader.download_and_copy('http://url1/package1.deb', sub_dirs=['distro1', 'distro2']) + result = file_downloader.download_and_copy('http://url1/package1.deb', + sub_dirs=[Path('distro1'), Path('distro2')]) # Then session.get.assert_called_once_with('http://url1/package1.deb', stream=True, headers={}) self.assertEqual(2, len(result)) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/distro1/package1.deb', result[0]) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/distro2/package1.deb', result[1]) - with open(f'{self.DOWNLOAD_LOCATION}/distro1/package1.deb', 'rb') as file: + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/distro1/package1.deb'), result[0]) + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/distro2/package1.deb'), result[1]) + with open(result[0], 'rb') as file: self.assertEqual(b'content', file.read()) - with open(f'{self.DOWNLOAD_LOCATION}/distro2/package1.deb', 'rb') as file: + with open(result[1], 'rb') as file: self.assertEqual(b'content', file.read()) def test_download_and_copy_returns_downloaded_and_copied_file_paths_when_file_is_present(self): @@ -176,19 +178,20 @@ def test_download_and_copy_returns_downloaded_and_copied_file_paths_when_file_is session, session_provider = create_components() file_downloader = FileDownloader(session_provider, self.DOWNLOAD_LOCATION) - file_downloader.download('http://url1/package1.deb', sub_dir='distro2') + file_downloader.download('http://url1/package1.deb', sub_dir=Path('distro2')) # When - result = file_downloader.download_and_copy('http://url1/package1.deb', sub_dirs=['distro1', 'distro2']) + result = file_downloader.download_and_copy('http://url1/package1.deb', + sub_dirs=[Path('distro1'), Path('distro2')]) # Then session.get.assert_called_with('http://url1/package1.deb', stream=True, headers={}) self.assertEqual(2, len(result)) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/distro1/package1.deb', result[0]) - self.assertEqual(f'{self.DOWNLOAD_LOCATION}/distro2/package1.deb', result[1]) - with open(f'{self.DOWNLOAD_LOCATION}/distro1/package1.deb', 'rb') as file: + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/distro1/package1.deb'), result[0]) + self.assertEqual(Path(f'{self.DOWNLOAD_LOCATION}/distro2/package1.deb'), result[1]) + with open(result[0], 'rb') as file: self.assertEqual(b'content', file.read()) - with open(f'{self.DOWNLOAD_LOCATION}/distro2/package1.deb', 'rb') as file: + with open(result[1], 'rb') as file: self.assertEqual(b'content', file.read()) def test_download_and_copy_raises_error_when_sub_directories_is_empty(self): diff --git a/tests/fileUtilityTest.py b/tests/fileUtilityTest.py index d7723a0..b2e5810 100644 --- a/tests/fileUtilityTest.py +++ b/tests/fileUtilityTest.py @@ -230,7 +230,7 @@ def test_render_template_file(self): create_file(file_path, 'Hello, {{ name }}!') # When - result = render_template_file('', file_path, {'name': 'World'}) + result = render_template_file(file_path, {'name': 'World'}) # Then self.assertEqual(result, 'Hello, World!\n') diff --git a/tests/rateLimiterTest.py b/tests/rateLimiterTest.py index 3f8599d..49cbffc 100644 --- a/tests/rateLimiterTest.py +++ b/tests/rateLimiterTest.py @@ -72,6 +72,34 @@ def test_acquire_when_exceeds_limit_and_minute_based(self, mock_monotonic): # Then self.assertFalse(result) + @patch('time.monotonic', side_effect=[0.0, 100.0, 600.0, 1200.0]) + def test_acquire_when_not_exceeds_limit_and_hour_based(self, mock_monotonic): + # Given + rate_limiter = TokenBucketLimiter(10, 3600, 2) + + rate_limiter.acquire(10) + rate_limiter.acquire(10) + + # When + result = rate_limiter.acquire(3) + + # Then + self.assertTrue(result) + + @patch('time.monotonic', side_effect=[0.0, 100.0, 600.0, 1200.0]) + def test_acquire_when_exceeds_limit_and_hour_based(self, mock_monotonic): + # Given + rate_limiter = TokenBucketLimiter(10, 3600, 2) + + rate_limiter.acquire(10) + rate_limiter.acquire(10) + + # When + result = rate_limiter.acquire(10) + + # Then + self.assertFalse(result) + if __name__ == '__main__': unittest.main() diff --git a/tests/reusableTimerTest.py b/tests/reusableTimerTest.py index 6b77051..9dc0295 100644 --- a/tests/reusableTimerTest.py +++ b/tests/reusableTimerTest.py @@ -24,18 +24,18 @@ def test_start(self): timer = ReusableTimer() # When - timer.start(1, mock.test_method, args=[1], kwargs={'b': 2, 'c': 3}) + timer.start(0.1, mock.test_method, args=[1], kwargs={'b': 2, 'c': 3}) # Then self.assertTrue(timer.is_alive()) - wait_for_assertion(1.1, mock.test_method.assert_called_once_with, 1, b=2, c=3) + wait_for_assertion(1, mock.test_method.assert_called_once_with, 1, b=2, c=3) def test_restart(self): # Given mock = MagicMock() timer = ReusableTimer() - timer.start(1, mock.test_method, args=[1], kwargs={'b': 2, 'c': 3}) - wait_for_assertion(1.1, mock.test_method.assert_called_once_with, 1, b=2, c=3) + timer.start(0.1, mock.test_method, args=[1], kwargs={'b': 2, 'c': 3}) + wait_for_assertion(1, mock.test_method.assert_called_once_with, 1, b=2, c=3) mock.reset_mock() # When @@ -43,36 +43,36 @@ def test_restart(self): # Then self.assertTrue(timer.is_alive()) - wait_for_assertion(1.1, mock.test_method.assert_called_once_with, 1, b=2, c=3) + wait_for_assertion(1, mock.test_method.assert_called_once_with, 1, b=2, c=3) def test_cancel(self): # Given mock = MagicMock() timer = ReusableTimer() - timer.start(1, mock.test_method, args=[1], kwargs={'b': 2, 'c': 3}) + timer.start(0.1, mock.test_method, args=[1], kwargs={'b': 2, 'c': 3}) # When timer.cancel() # Then self.assertFalse(timer.is_alive()) - sleep(1.1) + sleep(0.2) mock.test_method.assert_not_called() def test_start_again(self): # Given mock = MagicMock() timer = ReusableTimer() - timer.start(1, mock.test_method, args=[1], kwargs={'b': 2, 'c': 3}) - wait_for_assertion(1.1, mock.test_method.assert_called_once_with, 1, b=2, c=3) + timer.start(0.1, mock.test_method, args=[1], kwargs={'b': 2, 'c': 3}) + wait_for_assertion(1, mock.test_method.assert_called_once_with, 1, b=2, c=3) mock.reset_mock() # When - timer.start(1, mock.test_method, args=[2], kwargs={'b': 4, 'c': 6}) + timer.start(0.1, mock.test_method, args=[2], kwargs={'b': 4, 'c': 6}) # Then self.assertTrue(timer.is_alive()) - wait_for_assertion(1.1, mock.test_method.assert_called_once_with, 2, b=4, c=6) + wait_for_assertion(1, mock.test_method.assert_called_once_with, 2, b=4, c=6) if __name__ == '__main__':