diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index d4a18b47..ac56ef4d 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -1,28 +1,84 @@ name: Integration Tests on: - workflow_dispatch: push: branches: - main + - chore/run-integration-tests-on-codebuild + paths-ignore: + - '**/*.md' + - '**/*.jpg' + - '**/README.txt' + - '**/LICENSE.txt' + - 'docs/**' + - 'ISSUE_TEMPLATE/**' + - '**/remove-old-artifacts.yml' + pull_request_target: + branches: + - main + paths-ignore: + - '**/*.md' + - '**/*.jpg' + - '**/README.txt' + - '**/LICENSE.txt' + - 'docs/**' + - 'ISSUE_TEMPLATE/**' + - '**/remove-old-artifacts.yml' + workflow_dispatch: + inputs: + ref: + description: 'Git ref (branch, tag, or SHA) to test' + required: false + type: string + repository: + description: 'Repository to test in the form owner/repo (use for fork branches)' + required: false + type: string permissions: id-token: write # This is required for requesting the JWT contents: read # This is required for actions/checkout jobs: + approve: + # Auto-approve for non-fork scenarios (push, manual dispatch, or PR from the same repo) + if: > + github.event_name == 'push' || + github.event_name == 'workflow_dispatch' || + github.event.pull_request.head.repo.full_name == github.repository + runs-on: ubuntu-latest + steps: + - run: echo "Approved - not a fork PR" + + approve-fork: + # Require manual maintainer approval (via the integration-tests environment) for fork PRs + if: > + github.event_name == 'pull_request_target' && + github.event.pull_request.head.repo.full_name != github.repository + runs-on: ubuntu-latest + environment: integration-tests + steps: + - run: echo "Fork PR approved by maintainer" + lts-integration-tests: name: Run LTS Integration Tests + needs: [ approve, approve-fork ] + if: | + always() && + (needs.approve.result == 'success' || needs.approve-fork.result == 'success') && + !(needs.approve.result == 'failure' || needs.approve-fork.result == 'failure') runs-on: ubuntu-latest strategy: fail-fast: false matrix: - python-version: [ "3.11", "3.12", "3.13" ] + python-version: [ "3.11", "3.12", "3.13" ] environment: [ "mysql", "pg" ] steps: - name: 'Clone repository' uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} - name: 'Set up JDK 8' uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5 @@ -30,11 +86,20 @@ jobs: distribution: 'corretto' java-version: 8 + - name: 'Set up Python ${{ matrix.python-version }}' + uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: ${{ matrix.python-version }} + - name: Install poetry shell: bash run: | - pipx install poetry==1.8.2 - poetry config virtualenvs.prefer-active-python true + python -m pip install --upgrade pip + python -m pip install pipx + python -m pipx install poetry==1.8.2 + PIPX_BIN_DIR=$(python -m pipx environment --value PIPX_BIN_DIR) + echo "$PIPX_BIN_DIR" >> "$GITHUB_PATH" + "$PIPX_BIN_DIR/poetry" config virtualenvs.prefer-active-python true - name: Install dependencies run: poetry install @@ -77,6 +142,8 @@ jobs: steps: - name: 'Clone repository' uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} - name: 'Set up JDK 8' uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5 @@ -84,11 +151,20 @@ jobs: distribution: 'corretto' java-version: 8 + - name: 'Set up Python ${{ matrix.python-version }}' + uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: ${{ matrix.python-version }} + - name: Install poetry shell: bash run: | - pipx install poetry==1.8.2 - poetry config virtualenvs.prefer-active-python true + python -m pip install --upgrade pip + python -m pip install pipx + python -m pipx install poetry==1.8.2 + PIPX_BIN_DIR=$(python -m pipx environment --value PIPX_BIN_DIR) + echo "$PIPX_BIN_DIR" >> "$GITHUB_PATH" + "$PIPX_BIN_DIR/poetry" config virtualenvs.prefer-active-python true - name: Install dependencies run: poetry install diff --git a/aws_advanced_python_wrapper/host_list_provider.py b/aws_advanced_python_wrapper/host_list_provider.py index 06cf26c2..ca0c168a 100644 --- a/aws_advanced_python_wrapper/host_list_provider.py +++ b/aws_advanced_python_wrapper/host_list_provider.py @@ -464,7 +464,12 @@ def __init__(self, dialect: db_dialect.TopologyAwareDatabaseDialect, props: Prop self.instance_template: HostInfo = instance_template self._max_timeout_sec = WrapperProperties.AUXILIARY_QUERY_TIMEOUT_SEC.get_int(props) - self._thread_pool = services_container.get_thread_pool(self._executor_name) + # Need to drain topology query pools before monitor services shut down. + self._thread_pool = services_container.get_thread_pool(self._executor_name, drain_first=True) + + @classmethod + def release_resources(cls, wait: bool = True) -> bool: + return services_container.release_thread_pool(cls._executor_name, wait=wait) def _validate_host_pattern(self, host: str): if not self._rds_utils.is_dns_pattern_valid(host): @@ -495,8 +500,9 @@ def query_for_topology( :return: a tuple of :py:class:`HostInfo` objects representing the database topology. If the query results did not include a writer instance, an empty tuple will be returned. """ + thread_pool = services_container.get_thread_pool(self._executor_name, drain_first=True) query_for_topology_func_with_timeout = preserve_transaction_status_with_timeout( - self._thread_pool, self._max_timeout_sec, driver_dialect, conn)(self._query_for_topology) + thread_pool, self._max_timeout_sec, driver_dialect, conn)(self._query_for_topology) x = query_for_topology_func_with_timeout(conn) return x diff --git a/aws_advanced_python_wrapper/utils/decorators.py b/aws_advanced_python_wrapper/utils/decorators.py index 7f76fb98..261fb121 100644 --- a/aws_advanced_python_wrapper/utils/decorators.py +++ b/aws_advanced_python_wrapper/utils/decorators.py @@ -17,6 +17,8 @@ import functools from typing import TYPE_CHECKING +from aws_advanced_python_wrapper.driver_dialect_codes import DriverDialectCodes + if TYPE_CHECKING: from concurrent.futures import Executor @@ -35,10 +37,13 @@ def func_wrapper(*args, **kwargs): initial_transaction_status: bool = driver_dialect.is_in_transaction(conn) - future = executor.submit(func, *args, **kwargs) - - # raises TimeoutError on timeout - result = future.result(timeout=timeout_sec) + # Run the query synchronously for Psycopg since psycopg supports network timeouts. + if driver_dialect.dialect_code == DriverDialectCodes.PSYCOPG: + result = func(*args, **kwargs) + else: + future = executor.submit(func, *args, **kwargs) + # raises TimeoutError on timeout + result = future.result(timeout=timeout_sec) if not initial_transaction_status and driver_dialect.is_in_transaction(conn): # this condition is True when autocommit is False and the query started a new transaction. diff --git a/aws_advanced_python_wrapper/utils/properties.py b/aws_advanced_python_wrapper/utils/properties.py index efec458f..d2cedf78 100644 --- a/aws_advanced_python_wrapper/utils/properties.py +++ b/aws_advanced_python_wrapper/utils/properties.py @@ -86,8 +86,9 @@ def set(self, props: Properties, value: Any): class WrapperProperties: - DEFAULT_PLUGINS = "initial_connection,aurora_connection_tracker,failover_v2,host_monitoring_v2" - MYSQL_CONNECTOR_DEFAULT_PLUGINS = "initial_connection,aurora_connection_tracker,failover_v2" + # TEMP: aurora_connection_tracker removed from defaults to test mysql heap-corruption crash during failover + DEFAULT_PLUGINS = "initial_connection,failover_v2,host_monitoring_v2" + MYSQL_CONNECTOR_DEFAULT_PLUGINS = "initial_connection,failover_v2" _DEFAULT_TOKEN_EXPIRATION_SEC = 15 * 60 PROFILE_NAME = WrapperProperty( diff --git a/aws_advanced_python_wrapper/utils/services_container.py b/aws_advanced_python_wrapper/utils/services_container.py index 07370d36..865aa5d7 100644 --- a/aws_advanced_python_wrapper/utils/services_container.py +++ b/aws_advanced_python_wrapper/utils/services_container.py @@ -17,7 +17,7 @@ import threading from concurrent.futures import ThreadPoolExecutor from datetime import timedelta -from typing import Dict, Optional +from typing import Dict, Optional, Set from aws_advanced_python_wrapper.allowed_and_blocked_hosts import \ AllowedAndBlockedHosts @@ -37,6 +37,10 @@ def __init__(self) -> None: self._storage_service: Optional[StorageService] = None self._monitor_service: Optional[MonitorService] = None self._thread_pools: Dict[str, ThreadPoolExecutor] = {} + # Some service pools must be drained BEFORE the monitor service is released and connections are closed. + # This prevents worker threads like the topology util threads from continuing to using connections + # after they are closed, causing segfaults. + self._drain_first_pools: Set[str] = set() self._lock = threading.Lock() def _ensure_initialized(self) -> None: @@ -63,19 +67,24 @@ def monitor_service(self) -> MonitorService: self._ensure_initialized() return self._monitor_service # type: ignore - def get_thread_pool(self, name: str, max_workers: Optional[int] = None) -> ThreadPoolExecutor: + def get_thread_pool(self, name: str, max_workers: Optional[int] = None, drain_first: bool = False) -> ThreadPoolExecutor: pool = self._thread_pools.get(name) if pool is not None: + if drain_first: + self._drain_first_pools.add(name) return pool with self._lock: if name not in self._thread_pools: self._thread_pools[name] = ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix=name) + if drain_first: + self._drain_first_pools.add(name) return self._thread_pools[name] def release_thread_pool(self, name: str, wait: bool = True) -> bool: with self._lock: pool = self._thread_pools.pop(name, None) + self._drain_first_pools.discard(name) if pool is not None: try: pool.shutdown(wait=wait) @@ -85,6 +94,18 @@ def release_thread_pool(self, name: str, wait: bool = True) -> bool: return False def release_resources(self) -> None: + # Some thread pools need to be drained first before shutting down the monitor services. + # This prevents segfaults when monitor services shut down and close all the active monitoring connections. + with self._lock: + drain_names = list(self._drain_first_pools) + for name in drain_names: + pool = self._thread_pools.get(name) + if pool is not None: + try: + pool.shutdown(wait=True) + except Exception as e: + logger.debug("CoreServices.ErrorShuttingDownPool", name, e) + if self._monitor_service is not None: try: self._monitor_service.release_resources() @@ -114,6 +135,7 @@ def release_resources(self) -> None: except Exception as e: logger.debug("CoreServices.ErrorShuttingDownPool", name, e) self._thread_pools.clear() + self._drain_first_pools.clear() _instance = _ServicesContainer() @@ -132,8 +154,8 @@ def get_monitor_service() -> MonitorService: return _instance.monitor_service -def get_thread_pool(name: str, max_workers: Optional[int] = None) -> ThreadPoolExecutor: - return _instance.get_thread_pool(name, max_workers) +def get_thread_pool(name: str, max_workers: Optional[int] = None, drain_first: bool = False) -> ThreadPoolExecutor: + return _instance.get_thread_pool(name, max_workers, drain_first) def release_thread_pool(name: str, wait: bool = True) -> bool: diff --git a/tests/integration/container/test_aurora_failover.py b/tests/integration/container/test_aurora_failover.py index 18c25c9f..45bc2fb8 100644 --- a/tests/integration/container/test_aurora_failover.py +++ b/tests/integration/container/test_aurora_failover.py @@ -75,6 +75,7 @@ def props(self): "connect_timeout": 10, "monitoring-connect_timeout": 5, "monitoring-socket_timeout": 5, + "auxiliary_query_timeout_sec": 3, "autocommit": True, "cluster_id": "cluster1" }) @@ -326,7 +327,7 @@ def test_writer_fail_within_transaction_start_transaction( cursor_3.execute("DROP TABLE IF EXISTS test3_3") conn.commit() - @pytest.mark.parametrize("plugins", ["aurora_connection_tracker,failover", "aurora_connection_tracker,failover_v2"]) + @pytest.mark.parametrize("plugins", ["failover", "failover_v2"]) # TEMP: removed aurora_connection_tracker to test mysql crash @enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED]) @pytest.mark.repeat(5) # Run this test case a few more times since it is a flakey test def test_writer_failover_in_idle_connections( diff --git a/tests/integration/container/test_autoscaling.py b/tests/integration/container/test_autoscaling.py index 15c06d84..0fec4ef3 100644 --- a/tests/integration/container/test_autoscaling.py +++ b/tests/integration/container/test_autoscaling.py @@ -51,7 +51,7 @@ def rds_utils(self): @pytest.fixture def props(self): - p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 10, "autocommit": True}) + p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 10, "auxiliary_query_timeout_sec": 3, "autocommit": True}) if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in TestEnvironment.get_current().get_features() \ or TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in TestEnvironment.get_current().get_features(): diff --git a/tests/integration/container/test_aws_secrets_manager.py b/tests/integration/container/test_aws_secrets_manager.py index 1a26bcc8..0416a934 100644 --- a/tests/integration/container/test_aws_secrets_manager.py +++ b/tests/integration/container/test_aws_secrets_manager.py @@ -221,6 +221,7 @@ def test_failover_with_secrets_manager( "connect_timeout": 10, "monitoring-connect_timeout": 5, "monitoring-socket_timeout": 5, + "auxiliary_query_timeout_sec": 3, "topology_refresh_ms": 10, }) diff --git a/tests/integration/container/test_basic_connectivity.py b/tests/integration/container/test_basic_connectivity.py index 5054b71e..9155031d 100644 --- a/tests/integration/container/test_basic_connectivity.py +++ b/tests/integration/container/test_basic_connectivity.py @@ -52,8 +52,9 @@ def rds_utils(self): def props(self): # By default, don't load the host_monitoring plugin so that the test doesn't require abort connection support p: Properties = Properties({ - WrapperProperties.PLUGINS.name: "aurora_connection_tracker,failover", + WrapperProperties.PLUGINS.name: "failover", "connect_timeout": 3, + "auxiliary_query_timeout_sec": 3, "autocommit": True, "cluster_id": "cluster1"}) @@ -143,6 +144,7 @@ def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: T "connect_timeout": 5, "monitoring-connect_timeout": 3, "monitoring-socket_timeout": 3, + "auxiliary_query_timeout_sec": 3, "autocommit": True}) target_driver_connect = DriverHelper.get_connect_func(test_driver) conn = AwsWrapperConnection.connect(target_driver_connect, **conn_utils.get_connect_params(conn_utils.reader_cluster_host), **props) diff --git a/tests/integration/container/test_basic_functionality.py b/tests/integration/container/test_basic_functionality.py index a892fffd..2698559b 100644 --- a/tests/integration/container/test_basic_functionality.py +++ b/tests/integration/container/test_basic_functionality.py @@ -59,8 +59,9 @@ def rds_utils(self): @pytest.fixture(scope='class') def props(self): p: Properties = Properties({ - "plugins": "aurora_connection_tracker,failover", + "plugins": "failover", "connect_timeout": 10, + "auxiliary_query_timeout_sec": 3, "autocommit": True, "cluster_id": "cluster1"}) diff --git a/tests/integration/container/test_custom_endpoint.py b/tests/integration/container/test_custom_endpoint.py index 625e4ae8..dd6dc9f3 100644 --- a/tests/integration/container/test_custom_endpoint.py +++ b/tests/integration/container/test_custom_endpoint.py @@ -63,7 +63,8 @@ def rds_utils(self): @pytest.fixture(scope='class') def default_props(self): p: Properties = Properties( - {"connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1"}) + {"connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1", + "auxiliary_query_timeout_sec": 3}) features = TestEnvironment.get_current().get_features() if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features \ diff --git a/tests/integration/container/test_host_monitoring_v2.py b/tests/integration/container/test_host_monitoring_v2.py index ee9ff4b6..cd517ddf 100644 --- a/tests/integration/container/test_host_monitoring_v2.py +++ b/tests/integration/container/test_host_monitoring_v2.py @@ -59,6 +59,7 @@ def props(self): "connect_timeout": 10, "monitoring-connect_timeout": 5, "monitoring-socket_timeout": 5, + "auxiliary_query_timeout_sec": 3, "failure_detection_time_ms": 5_000, "failure_detection_interval_ms": 5_000, "failure_detection_count": 1, diff --git a/tests/integration/container/test_iam_authentication.py b/tests/integration/container/test_iam_authentication.py index c83f70a2..346b491e 100644 --- a/tests/integration/container/test_iam_authentication.py +++ b/tests/integration/container/test_iam_authentication.py @@ -150,6 +150,7 @@ def test_failover_with_iam( "connect_timeout": 10, "monitoring-connect_timeout": 5, "monitoring-socket_timeout": 5, + "auxiliary_query_timeout_sec": 3, "topology_refresh_ms": 10, "autocommit": True }) diff --git a/tests/integration/container/test_read_write_splitting.py b/tests/integration/container/test_read_write_splitting.py index 931b17c8..835d4565 100644 --- a/tests/integration/container/test_read_write_splitting.py +++ b/tests/integration/container/test_read_write_splitting.py @@ -93,6 +93,7 @@ def props(self, plugin_config, conn_utils): "plugins": plugin_value, "socket_timeout": 10, "connect_timeout": 10, + "auxiliary_query_timeout_sec": 3, "autocommit": True, } ) @@ -134,6 +135,7 @@ def failover_props(self, plugin_config, conn_utils): "plugins": f"{plugin_value},failover", "socket_timeout": 10, "connect_timeout": 10, + "auxiliary_query_timeout_sec": 3, "autocommit": True, "cluster_id": "cluster1" }