From 68efb5f754b39ab7edda2ba9912a0be7944654fe Mon Sep 17 00:00:00 2001 From: karezche <64801825+karenc-bq@users.noreply.github.com> Date: Tue, 9 Jun 2026 12:35:36 -0700 Subject: [PATCH 1/4] chore: drain topology util thread pools before shutting down monitor services --- .github/workflows/integration_tests.yml | 88 +++++++++++++++++-- .../host_list_provider.py | 10 ++- .../utils/services_container.py | 30 ++++++- .../container/test_aurora_failover.py | 1 + .../integration/container/test_autoscaling.py | 2 +- .../container/test_aws_secrets_manager.py | 1 + .../container/test_basic_connectivity.py | 2 + .../container/test_basic_functionality.py | 1 + .../container/test_custom_endpoint.py | 3 +- .../container/test_host_monitoring_v2.py | 1 + .../container/test_iam_authentication.py | 1 + .../container/test_read_write_splitting.py | 2 + 12 files changed, 128 insertions(+), 14 deletions(-) diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index d4a18b473..ac56ef4d0 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -1,28 +1,84 @@ name: Integration Tests on: - workflow_dispatch: push: branches: - main + - chore/run-integration-tests-on-codebuild + paths-ignore: + - '**/*.md' + - '**/*.jpg' + - '**/README.txt' + - '**/LICENSE.txt' + - 'docs/**' + - 'ISSUE_TEMPLATE/**' + - '**/remove-old-artifacts.yml' + pull_request_target: + branches: + - main + paths-ignore: + - '**/*.md' + - '**/*.jpg' + - '**/README.txt' + - '**/LICENSE.txt' + - 'docs/**' + - 'ISSUE_TEMPLATE/**' + - '**/remove-old-artifacts.yml' + workflow_dispatch: + inputs: + ref: + description: 'Git ref (branch, tag, or SHA) to test' + required: false + type: string + repository: + description: 'Repository to test in the form owner/repo (use for fork branches)' + required: false + type: string permissions: id-token: write # This is required for requesting the JWT contents: read # This is required for actions/checkout jobs: + approve: + # Auto-approve for non-fork scenarios (push, manual dispatch, or PR from the same repo) + if: > + github.event_name == 'push' || + github.event_name == 'workflow_dispatch' || + github.event.pull_request.head.repo.full_name == github.repository + runs-on: ubuntu-latest + steps: + - run: echo "Approved - not a fork PR" + + approve-fork: + # Require manual maintainer approval (via the integration-tests environment) for fork PRs + if: > + github.event_name == 'pull_request_target' && + github.event.pull_request.head.repo.full_name != github.repository + runs-on: ubuntu-latest + environment: integration-tests + steps: + - run: echo "Fork PR approved by maintainer" + lts-integration-tests: name: Run LTS Integration Tests + needs: [ approve, approve-fork ] + if: | + always() && + (needs.approve.result == 'success' || needs.approve-fork.result == 'success') && + !(needs.approve.result == 'failure' || needs.approve-fork.result == 'failure') runs-on: ubuntu-latest strategy: fail-fast: false matrix: - python-version: [ "3.11", "3.12", "3.13" ] + python-version: [ "3.11", "3.12", "3.13" ] environment: [ "mysql", "pg" ] steps: - name: 'Clone repository' uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} - name: 'Set up JDK 8' uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5 @@ -30,11 +86,20 @@ jobs: distribution: 'corretto' java-version: 8 + - name: 'Set up Python ${{ matrix.python-version }}' + uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: ${{ matrix.python-version }} + - name: Install poetry shell: bash run: | - pipx install poetry==1.8.2 - poetry config virtualenvs.prefer-active-python true + python -m pip install --upgrade pip + python -m pip install pipx + python -m pipx install poetry==1.8.2 + PIPX_BIN_DIR=$(python -m pipx environment --value PIPX_BIN_DIR) + echo "$PIPX_BIN_DIR" >> "$GITHUB_PATH" + "$PIPX_BIN_DIR/poetry" config virtualenvs.prefer-active-python true - name: Install dependencies run: poetry install @@ -77,6 +142,8 @@ jobs: steps: - name: 'Clone repository' uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} - name: 'Set up JDK 8' uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5 @@ -84,11 +151,20 @@ jobs: distribution: 'corretto' java-version: 8 + - name: 'Set up Python ${{ matrix.python-version }}' + uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: ${{ matrix.python-version }} + - name: Install poetry shell: bash run: | - pipx install poetry==1.8.2 - poetry config virtualenvs.prefer-active-python true + python -m pip install --upgrade pip + python -m pip install pipx + python -m pipx install poetry==1.8.2 + PIPX_BIN_DIR=$(python -m pipx environment --value PIPX_BIN_DIR) + echo "$PIPX_BIN_DIR" >> "$GITHUB_PATH" + "$PIPX_BIN_DIR/poetry" config virtualenvs.prefer-active-python true - name: Install dependencies run: poetry install diff --git a/aws_advanced_python_wrapper/host_list_provider.py b/aws_advanced_python_wrapper/host_list_provider.py index 06cf26c21..ca0c168af 100644 --- a/aws_advanced_python_wrapper/host_list_provider.py +++ b/aws_advanced_python_wrapper/host_list_provider.py @@ -464,7 +464,12 @@ def __init__(self, dialect: db_dialect.TopologyAwareDatabaseDialect, props: Prop self.instance_template: HostInfo = instance_template self._max_timeout_sec = WrapperProperties.AUXILIARY_QUERY_TIMEOUT_SEC.get_int(props) - self._thread_pool = services_container.get_thread_pool(self._executor_name) + # Need to drain topology query pools before monitor services shut down. + self._thread_pool = services_container.get_thread_pool(self._executor_name, drain_first=True) + + @classmethod + def release_resources(cls, wait: bool = True) -> bool: + return services_container.release_thread_pool(cls._executor_name, wait=wait) def _validate_host_pattern(self, host: str): if not self._rds_utils.is_dns_pattern_valid(host): @@ -495,8 +500,9 @@ def query_for_topology( :return: a tuple of :py:class:`HostInfo` objects representing the database topology. If the query results did not include a writer instance, an empty tuple will be returned. """ + thread_pool = services_container.get_thread_pool(self._executor_name, drain_first=True) query_for_topology_func_with_timeout = preserve_transaction_status_with_timeout( - self._thread_pool, self._max_timeout_sec, driver_dialect, conn)(self._query_for_topology) + thread_pool, self._max_timeout_sec, driver_dialect, conn)(self._query_for_topology) x = query_for_topology_func_with_timeout(conn) return x diff --git a/aws_advanced_python_wrapper/utils/services_container.py b/aws_advanced_python_wrapper/utils/services_container.py index 07370d369..865aa5d79 100644 --- a/aws_advanced_python_wrapper/utils/services_container.py +++ b/aws_advanced_python_wrapper/utils/services_container.py @@ -17,7 +17,7 @@ import threading from concurrent.futures import ThreadPoolExecutor from datetime import timedelta -from typing import Dict, Optional +from typing import Dict, Optional, Set from aws_advanced_python_wrapper.allowed_and_blocked_hosts import \ AllowedAndBlockedHosts @@ -37,6 +37,10 @@ def __init__(self) -> None: self._storage_service: Optional[StorageService] = None self._monitor_service: Optional[MonitorService] = None self._thread_pools: Dict[str, ThreadPoolExecutor] = {} + # Some service pools must be drained BEFORE the monitor service is released and connections are closed. + # This prevents worker threads like the topology util threads from continuing to using connections + # after they are closed, causing segfaults. + self._drain_first_pools: Set[str] = set() self._lock = threading.Lock() def _ensure_initialized(self) -> None: @@ -63,19 +67,24 @@ def monitor_service(self) -> MonitorService: self._ensure_initialized() return self._monitor_service # type: ignore - def get_thread_pool(self, name: str, max_workers: Optional[int] = None) -> ThreadPoolExecutor: + def get_thread_pool(self, name: str, max_workers: Optional[int] = None, drain_first: bool = False) -> ThreadPoolExecutor: pool = self._thread_pools.get(name) if pool is not None: + if drain_first: + self._drain_first_pools.add(name) return pool with self._lock: if name not in self._thread_pools: self._thread_pools[name] = ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix=name) + if drain_first: + self._drain_first_pools.add(name) return self._thread_pools[name] def release_thread_pool(self, name: str, wait: bool = True) -> bool: with self._lock: pool = self._thread_pools.pop(name, None) + self._drain_first_pools.discard(name) if pool is not None: try: pool.shutdown(wait=wait) @@ -85,6 +94,18 @@ def release_thread_pool(self, name: str, wait: bool = True) -> bool: return False def release_resources(self) -> None: + # Some thread pools need to be drained first before shutting down the monitor services. + # This prevents segfaults when monitor services shut down and close all the active monitoring connections. + with self._lock: + drain_names = list(self._drain_first_pools) + for name in drain_names: + pool = self._thread_pools.get(name) + if pool is not None: + try: + pool.shutdown(wait=True) + except Exception as e: + logger.debug("CoreServices.ErrorShuttingDownPool", name, e) + if self._monitor_service is not None: try: self._monitor_service.release_resources() @@ -114,6 +135,7 @@ def release_resources(self) -> None: except Exception as e: logger.debug("CoreServices.ErrorShuttingDownPool", name, e) self._thread_pools.clear() + self._drain_first_pools.clear() _instance = _ServicesContainer() @@ -132,8 +154,8 @@ def get_monitor_service() -> MonitorService: return _instance.monitor_service -def get_thread_pool(name: str, max_workers: Optional[int] = None) -> ThreadPoolExecutor: - return _instance.get_thread_pool(name, max_workers) +def get_thread_pool(name: str, max_workers: Optional[int] = None, drain_first: bool = False) -> ThreadPoolExecutor: + return _instance.get_thread_pool(name, max_workers, drain_first) def release_thread_pool(name: str, wait: bool = True) -> bool: diff --git a/tests/integration/container/test_aurora_failover.py b/tests/integration/container/test_aurora_failover.py index 18c25c9fa..da9345aac 100644 --- a/tests/integration/container/test_aurora_failover.py +++ b/tests/integration/container/test_aurora_failover.py @@ -75,6 +75,7 @@ def props(self): "connect_timeout": 10, "monitoring-connect_timeout": 5, "monitoring-socket_timeout": 5, + "auxiliary_query_timeout_sec": 3, "autocommit": True, "cluster_id": "cluster1" }) diff --git a/tests/integration/container/test_autoscaling.py b/tests/integration/container/test_autoscaling.py index 15c06d84e..0fec4ef31 100644 --- a/tests/integration/container/test_autoscaling.py +++ b/tests/integration/container/test_autoscaling.py @@ -51,7 +51,7 @@ def rds_utils(self): @pytest.fixture def props(self): - p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 10, "autocommit": True}) + p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 10, "auxiliary_query_timeout_sec": 3, "autocommit": True}) if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in TestEnvironment.get_current().get_features() \ or TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in TestEnvironment.get_current().get_features(): diff --git a/tests/integration/container/test_aws_secrets_manager.py b/tests/integration/container/test_aws_secrets_manager.py index 1a26bcc8f..0416a9344 100644 --- a/tests/integration/container/test_aws_secrets_manager.py +++ b/tests/integration/container/test_aws_secrets_manager.py @@ -221,6 +221,7 @@ def test_failover_with_secrets_manager( "connect_timeout": 10, "monitoring-connect_timeout": 5, "monitoring-socket_timeout": 5, + "auxiliary_query_timeout_sec": 3, "topology_refresh_ms": 10, }) diff --git a/tests/integration/container/test_basic_connectivity.py b/tests/integration/container/test_basic_connectivity.py index 5054b71ec..8aa738f25 100644 --- a/tests/integration/container/test_basic_connectivity.py +++ b/tests/integration/container/test_basic_connectivity.py @@ -54,6 +54,7 @@ def props(self): p: Properties = Properties({ WrapperProperties.PLUGINS.name: "aurora_connection_tracker,failover", "connect_timeout": 3, + "auxiliary_query_timeout_sec": 3, "autocommit": True, "cluster_id": "cluster1"}) @@ -143,6 +144,7 @@ def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: T "connect_timeout": 5, "monitoring-connect_timeout": 3, "monitoring-socket_timeout": 3, + "auxiliary_query_timeout_sec": 3, "autocommit": True}) target_driver_connect = DriverHelper.get_connect_func(test_driver) conn = AwsWrapperConnection.connect(target_driver_connect, **conn_utils.get_connect_params(conn_utils.reader_cluster_host), **props) diff --git a/tests/integration/container/test_basic_functionality.py b/tests/integration/container/test_basic_functionality.py index a892fffd2..28cc35da7 100644 --- a/tests/integration/container/test_basic_functionality.py +++ b/tests/integration/container/test_basic_functionality.py @@ -61,6 +61,7 @@ def props(self): p: Properties = Properties({ "plugins": "aurora_connection_tracker,failover", "connect_timeout": 10, + "auxiliary_query_timeout_sec": 3, "autocommit": True, "cluster_id": "cluster1"}) diff --git a/tests/integration/container/test_custom_endpoint.py b/tests/integration/container/test_custom_endpoint.py index 625e4ae86..dd6dc9f3b 100644 --- a/tests/integration/container/test_custom_endpoint.py +++ b/tests/integration/container/test_custom_endpoint.py @@ -63,7 +63,8 @@ def rds_utils(self): @pytest.fixture(scope='class') def default_props(self): p: Properties = Properties( - {"connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1"}) + {"connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1", + "auxiliary_query_timeout_sec": 3}) features = TestEnvironment.get_current().get_features() if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features \ diff --git a/tests/integration/container/test_host_monitoring_v2.py b/tests/integration/container/test_host_monitoring_v2.py index ee9ff4b68..cd517ddfc 100644 --- a/tests/integration/container/test_host_monitoring_v2.py +++ b/tests/integration/container/test_host_monitoring_v2.py @@ -59,6 +59,7 @@ def props(self): "connect_timeout": 10, "monitoring-connect_timeout": 5, "monitoring-socket_timeout": 5, + "auxiliary_query_timeout_sec": 3, "failure_detection_time_ms": 5_000, "failure_detection_interval_ms": 5_000, "failure_detection_count": 1, diff --git a/tests/integration/container/test_iam_authentication.py b/tests/integration/container/test_iam_authentication.py index c83f70a2e..346b491e2 100644 --- a/tests/integration/container/test_iam_authentication.py +++ b/tests/integration/container/test_iam_authentication.py @@ -150,6 +150,7 @@ def test_failover_with_iam( "connect_timeout": 10, "monitoring-connect_timeout": 5, "monitoring-socket_timeout": 5, + "auxiliary_query_timeout_sec": 3, "topology_refresh_ms": 10, "autocommit": True }) diff --git a/tests/integration/container/test_read_write_splitting.py b/tests/integration/container/test_read_write_splitting.py index 931b17c8a..835d4565f 100644 --- a/tests/integration/container/test_read_write_splitting.py +++ b/tests/integration/container/test_read_write_splitting.py @@ -93,6 +93,7 @@ def props(self, plugin_config, conn_utils): "plugins": plugin_value, "socket_timeout": 10, "connect_timeout": 10, + "auxiliary_query_timeout_sec": 3, "autocommit": True, } ) @@ -134,6 +135,7 @@ def failover_props(self, plugin_config, conn_utils): "plugins": f"{plugin_value},failover", "socket_timeout": 10, "connect_timeout": 10, + "auxiliary_query_timeout_sec": 3, "autocommit": True, "cluster_id": "cluster1" } From 3b6e4e29d3545f8aea42f400ce1d22aa77a8c56e Mon Sep 17 00:00:00 2001 From: karezche <64801825+karenc-bq@users.noreply.github.com> Date: Wed, 10 Jun 2026 12:46:31 -0700 Subject: [PATCH 2/4] chore: temporarily remove aurora connection tracker plugin from default plugin list and integration tests --- aws_advanced_python_wrapper/utils/properties.py | 5 +++-- tests/integration/container/test_aurora_failover.py | 2 +- tests/integration/container/test_basic_connectivity.py | 2 +- tests/integration/container/test_basic_functionality.py | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/aws_advanced_python_wrapper/utils/properties.py b/aws_advanced_python_wrapper/utils/properties.py index efec458f4..d2cedf78d 100644 --- a/aws_advanced_python_wrapper/utils/properties.py +++ b/aws_advanced_python_wrapper/utils/properties.py @@ -86,8 +86,9 @@ def set(self, props: Properties, value: Any): class WrapperProperties: - DEFAULT_PLUGINS = "initial_connection,aurora_connection_tracker,failover_v2,host_monitoring_v2" - MYSQL_CONNECTOR_DEFAULT_PLUGINS = "initial_connection,aurora_connection_tracker,failover_v2" + # TEMP: aurora_connection_tracker removed from defaults to test mysql heap-corruption crash during failover + DEFAULT_PLUGINS = "initial_connection,failover_v2,host_monitoring_v2" + MYSQL_CONNECTOR_DEFAULT_PLUGINS = "initial_connection,failover_v2" _DEFAULT_TOKEN_EXPIRATION_SEC = 15 * 60 PROFILE_NAME = WrapperProperty( diff --git a/tests/integration/container/test_aurora_failover.py b/tests/integration/container/test_aurora_failover.py index da9345aac..45bc2fb8e 100644 --- a/tests/integration/container/test_aurora_failover.py +++ b/tests/integration/container/test_aurora_failover.py @@ -327,7 +327,7 @@ def test_writer_fail_within_transaction_start_transaction( cursor_3.execute("DROP TABLE IF EXISTS test3_3") conn.commit() - @pytest.mark.parametrize("plugins", ["aurora_connection_tracker,failover", "aurora_connection_tracker,failover_v2"]) + @pytest.mark.parametrize("plugins", ["failover", "failover_v2"]) # TEMP: removed aurora_connection_tracker to test mysql crash @enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED]) @pytest.mark.repeat(5) # Run this test case a few more times since it is a flakey test def test_writer_failover_in_idle_connections( diff --git a/tests/integration/container/test_basic_connectivity.py b/tests/integration/container/test_basic_connectivity.py index 8aa738f25..9155031dd 100644 --- a/tests/integration/container/test_basic_connectivity.py +++ b/tests/integration/container/test_basic_connectivity.py @@ -52,7 +52,7 @@ def rds_utils(self): def props(self): # By default, don't load the host_monitoring plugin so that the test doesn't require abort connection support p: Properties = Properties({ - WrapperProperties.PLUGINS.name: "aurora_connection_tracker,failover", + WrapperProperties.PLUGINS.name: "failover", "connect_timeout": 3, "auxiliary_query_timeout_sec": 3, "autocommit": True, diff --git a/tests/integration/container/test_basic_functionality.py b/tests/integration/container/test_basic_functionality.py index 28cc35da7..2698559bd 100644 --- a/tests/integration/container/test_basic_functionality.py +++ b/tests/integration/container/test_basic_functionality.py @@ -59,7 +59,7 @@ def rds_utils(self): @pytest.fixture(scope='class') def props(self): p: Properties = Properties({ - "plugins": "aurora_connection_tracker,failover", + "plugins": "failover", "connect_timeout": 10, "auxiliary_query_timeout_sec": 3, "autocommit": True, From d56f1095539ab932eba413fbd3de7c0dd1c4a8b4 Mon Sep 17 00:00:00 2001 From: karezche <64801825+karenc-bq@users.noreply.github.com> Date: Wed, 10 Jun 2026 12:56:51 -0700 Subject: [PATCH 3/4] chore: run queries synchronously when using psycopg --- aws_advanced_python_wrapper/utils/decorators.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/aws_advanced_python_wrapper/utils/decorators.py b/aws_advanced_python_wrapper/utils/decorators.py index 7f76fb984..261fb1216 100644 --- a/aws_advanced_python_wrapper/utils/decorators.py +++ b/aws_advanced_python_wrapper/utils/decorators.py @@ -17,6 +17,8 @@ import functools from typing import TYPE_CHECKING +from aws_advanced_python_wrapper.driver_dialect_codes import DriverDialectCodes + if TYPE_CHECKING: from concurrent.futures import Executor @@ -35,10 +37,13 @@ def func_wrapper(*args, **kwargs): initial_transaction_status: bool = driver_dialect.is_in_transaction(conn) - future = executor.submit(func, *args, **kwargs) - - # raises TimeoutError on timeout - result = future.result(timeout=timeout_sec) + # Run the query synchronously for Psycopg since psycopg supports network timeouts. + if driver_dialect.dialect_code == DriverDialectCodes.PSYCOPG: + result = func(*args, **kwargs) + else: + future = executor.submit(func, *args, **kwargs) + # raises TimeoutError on timeout + result = future.result(timeout=timeout_sec) if not initial_transaction_status and driver_dialect.is_in_transaction(conn): # this condition is True when autocommit is False and the query started a new transaction. From 514c28195c54dbec52ad57dc6c52ec829308c82e Mon Sep 17 00:00:00 2001 From: karezche <64801825+karenc-bq@users.noreply.github.com> Date: Wed, 10 Jun 2026 18:57:52 -0700 Subject: [PATCH 4/4] chore: reenable aurora connection tracker in integration tests and invalidate connections in a thread managed by services container --- .../aurora_connection_tracker_plugin.py | 14 +++++++++----- aws_advanced_python_wrapper/driver_dialect.py | 3 +++ aws_advanced_python_wrapper/utils/properties.py | 5 ++--- .../integration/container/test_aurora_failover.py | 2 +- .../container/test_basic_connectivity.py | 2 +- .../container/test_basic_functionality.py | 2 +- 6 files changed, 17 insertions(+), 11 deletions(-) diff --git a/aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py b/aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py index 8efa67009..cbbfc7c0e 100644 --- a/aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py +++ b/aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py @@ -21,6 +21,7 @@ Optional, Set) from aws_advanced_python_wrapper.utils.notifications import HostEvent +from aws_advanced_python_wrapper.utils import services_container from aws_advanced_python_wrapper.utils.utils import Utils if TYPE_CHECKING: @@ -52,6 +53,7 @@ class OpenedConnectionTracker: _shutdown_event: ClassVar[threading.Event] = threading.Event() _safe_to_check_closed_classes: ClassVar[Set[str]] = {"psycopg"} _default_sleep_time: ClassVar[int] = 30 + _invalidate_executor_name: ClassVar[str] = "OpenedConnectionTrackerInvalidate" @classmethod def _start_prune_thread(cls): @@ -166,9 +168,12 @@ def invalidate_all_connections(self, host_info: Optional[HostInfo] = None, host: with self._lock: connection_set: Optional[WeakSet] = self._opened_connections.get(instance_endpoint) connections_list = list(connection_set) if connection_set is not None else None + if connection_set is not None: + connection_set.clear() + self._opened_connections.pop(instance_endpoint, None) - if connections_list is not None: - self._log_connection_set(instance_endpoint, connection_set) + if connections_list: + self._log_connection_set(instance_endpoint, connections_list) self._invalidate_connections(connections_list) def remove_connection_tracking(self, host_info: HostInfo, connection: Connection | None): @@ -211,9 +216,8 @@ def _task(connections_list: list): pass def _invalidate_connections(self, connections_list: list): - invalidate_connection_thread: Thread = Thread(daemon=True, target=self._task, - args=[connections_list]) # type: ignore - invalidate_connection_thread.start() + executor = services_container.get_thread_pool(self._invalidate_executor_name, drain_first=True) + executor.submit(self._task, connections_list) def log_opened_connections(self): with self._lock: diff --git a/aws_advanced_python_wrapper/driver_dialect.py b/aws_advanced_python_wrapper/driver_dialect.py index 47476fe77..acc470499 100644 --- a/aws_advanced_python_wrapper/driver_dialect.py +++ b/aws_advanced_python_wrapper/driver_dialect.py @@ -137,6 +137,9 @@ def execute( exec_timeout = WrapperProperties.SOCKET_TIMEOUT_SEC.get_float(self._props) if exec_timeout > 0: + if self.dialect_code == DriverDialectCodes.PSYCOPG: + return exec_func() + try: execute_with_timeout = timeout(self._thread_pool, exec_timeout)(exec_func) return execute_with_timeout() diff --git a/aws_advanced_python_wrapper/utils/properties.py b/aws_advanced_python_wrapper/utils/properties.py index d2cedf78d..efec458f4 100644 --- a/aws_advanced_python_wrapper/utils/properties.py +++ b/aws_advanced_python_wrapper/utils/properties.py @@ -86,9 +86,8 @@ def set(self, props: Properties, value: Any): class WrapperProperties: - # TEMP: aurora_connection_tracker removed from defaults to test mysql heap-corruption crash during failover - DEFAULT_PLUGINS = "initial_connection,failover_v2,host_monitoring_v2" - MYSQL_CONNECTOR_DEFAULT_PLUGINS = "initial_connection,failover_v2" + DEFAULT_PLUGINS = "initial_connection,aurora_connection_tracker,failover_v2,host_monitoring_v2" + MYSQL_CONNECTOR_DEFAULT_PLUGINS = "initial_connection,aurora_connection_tracker,failover_v2" _DEFAULT_TOKEN_EXPIRATION_SEC = 15 * 60 PROFILE_NAME = WrapperProperty( diff --git a/tests/integration/container/test_aurora_failover.py b/tests/integration/container/test_aurora_failover.py index 45bc2fb8e..da9345aac 100644 --- a/tests/integration/container/test_aurora_failover.py +++ b/tests/integration/container/test_aurora_failover.py @@ -327,7 +327,7 @@ def test_writer_fail_within_transaction_start_transaction( cursor_3.execute("DROP TABLE IF EXISTS test3_3") conn.commit() - @pytest.mark.parametrize("plugins", ["failover", "failover_v2"]) # TEMP: removed aurora_connection_tracker to test mysql crash + @pytest.mark.parametrize("plugins", ["aurora_connection_tracker,failover", "aurora_connection_tracker,failover_v2"]) @enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED]) @pytest.mark.repeat(5) # Run this test case a few more times since it is a flakey test def test_writer_failover_in_idle_connections( diff --git a/tests/integration/container/test_basic_connectivity.py b/tests/integration/container/test_basic_connectivity.py index 9155031dd..8aa738f25 100644 --- a/tests/integration/container/test_basic_connectivity.py +++ b/tests/integration/container/test_basic_connectivity.py @@ -52,7 +52,7 @@ def rds_utils(self): def props(self): # By default, don't load the host_monitoring plugin so that the test doesn't require abort connection support p: Properties = Properties({ - WrapperProperties.PLUGINS.name: "failover", + WrapperProperties.PLUGINS.name: "aurora_connection_tracker,failover", "connect_timeout": 3, "auxiliary_query_timeout_sec": 3, "autocommit": True, diff --git a/tests/integration/container/test_basic_functionality.py b/tests/integration/container/test_basic_functionality.py index 2698559bd..28cc35da7 100644 --- a/tests/integration/container/test_basic_functionality.py +++ b/tests/integration/container/test_basic_functionality.py @@ -59,7 +59,7 @@ def rds_utils(self): @pytest.fixture(scope='class') def props(self): p: Properties = Properties({ - "plugins": "failover", + "plugins": "aurora_connection_tracker,failover", "connect_timeout": 10, "auxiliary_query_timeout_sec": 3, "autocommit": True,