Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 82 additions & 6 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
@@ -1,40 +1,105 @@
name: Integration Tests

on:
workflow_dispatch:
push:
branches:
- main
- chore/run-integration-tests-on-codebuild
paths-ignore:
- '**/*.md'
- '**/*.jpg'
- '**/README.txt'
- '**/LICENSE.txt'
- 'docs/**'
- 'ISSUE_TEMPLATE/**'
- '**/remove-old-artifacts.yml'
pull_request_target:
branches:
- main
paths-ignore:
- '**/*.md'
- '**/*.jpg'
- '**/README.txt'
- '**/LICENSE.txt'
- 'docs/**'
- 'ISSUE_TEMPLATE/**'
- '**/remove-old-artifacts.yml'
workflow_dispatch:
inputs:
ref:
description: 'Git ref (branch, tag, or SHA) to test'
required: false
type: string
repository:
description: 'Repository to test in the form owner/repo (use for fork branches)'
required: false
type: string

permissions:
id-token: write # This is required for requesting the JWT
contents: read # This is required for actions/checkout

jobs:
approve:
# Auto-approve for non-fork scenarios (push, manual dispatch, or PR from the same repo)
if: >
github.event_name == 'push' ||
github.event_name == 'workflow_dispatch' ||
github.event.pull_request.head.repo.full_name == github.repository
runs-on: ubuntu-latest
steps:
- run: echo "Approved - not a fork PR"

approve-fork:
# Require manual maintainer approval (via the integration-tests environment) for fork PRs
if: >
github.event_name == 'pull_request_target' &&
github.event.pull_request.head.repo.full_name != github.repository
runs-on: ubuntu-latest
environment: integration-tests
steps:
- run: echo "Fork PR approved by maintainer"

lts-integration-tests:
name: Run LTS Integration Tests
needs: [ approve, approve-fork ]
if: |
always() &&
(needs.approve.result == 'success' || needs.approve-fork.result == 'success') &&
!(needs.approve.result == 'failure' || needs.approve-fork.result == 'failure')
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: [ "3.11", "3.12", "3.13" ]
python-version: [ "3.11", "3.12", "3.13" ]
environment: [ "mysql", "pg" ]

steps:
- name: 'Clone repository'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}

- name: 'Set up JDK 8'
uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5
with:
distribution: 'corretto'
java-version: 8

- name: 'Set up Python ${{ matrix.python-version }}'
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: ${{ matrix.python-version }}

- name: Install poetry
shell: bash
run: |
pipx install poetry==1.8.2
poetry config virtualenvs.prefer-active-python true
python -m pip install --upgrade pip
python -m pip install pipx
python -m pipx install poetry==1.8.2
PIPX_BIN_DIR=$(python -m pipx environment --value PIPX_BIN_DIR)
echo "$PIPX_BIN_DIR" >> "$GITHUB_PATH"
"$PIPX_BIN_DIR/poetry" config virtualenvs.prefer-active-python true

- name: Install dependencies
run: poetry install
Expand Down Expand Up @@ -77,18 +142,29 @@ jobs:
steps:
- name: 'Clone repository'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}

- name: 'Set up JDK 8'
uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5
with:
distribution: 'corretto'
java-version: 8

- name: 'Set up Python ${{ matrix.python-version }}'
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: ${{ matrix.python-version }}

- name: Install poetry
shell: bash
run: |
pipx install poetry==1.8.2
poetry config virtualenvs.prefer-active-python true
python -m pip install --upgrade pip
python -m pip install pipx
python -m pipx install poetry==1.8.2
PIPX_BIN_DIR=$(python -m pipx environment --value PIPX_BIN_DIR)
echo "$PIPX_BIN_DIR" >> "$GITHUB_PATH"
"$PIPX_BIN_DIR/poetry" config virtualenvs.prefer-active-python true

- name: Install dependencies
run: poetry install
Expand Down
10 changes: 8 additions & 2 deletions aws_advanced_python_wrapper/host_list_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,12 @@ def __init__(self, dialect: db_dialect.TopologyAwareDatabaseDialect, props: Prop

self.instance_template: HostInfo = instance_template
self._max_timeout_sec = WrapperProperties.AUXILIARY_QUERY_TIMEOUT_SEC.get_int(props)
self._thread_pool = services_container.get_thread_pool(self._executor_name)
# Need to drain topology query pools before monitor services shut down.
self._thread_pool = services_container.get_thread_pool(self._executor_name, drain_first=True)

@classmethod
def release_resources(cls, wait: bool = True) -> bool:
return services_container.release_thread_pool(cls._executor_name, wait=wait)

def _validate_host_pattern(self, host: str):
if not self._rds_utils.is_dns_pattern_valid(host):
Expand Down Expand Up @@ -495,8 +500,9 @@ def query_for_topology(
:return: a tuple of :py:class:`HostInfo` objects representing the database topology. If the query results did not include a writer instance,
an empty tuple will be returned.
"""
thread_pool = services_container.get_thread_pool(self._executor_name, drain_first=True)
query_for_topology_func_with_timeout = preserve_transaction_status_with_timeout(
self._thread_pool, self._max_timeout_sec, driver_dialect, conn)(self._query_for_topology)
thread_pool, self._max_timeout_sec, driver_dialect, conn)(self._query_for_topology)
x = query_for_topology_func_with_timeout(conn)
return x

Expand Down
30 changes: 26 additions & 4 deletions aws_advanced_python_wrapper/utils/services_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import threading
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from typing import Dict, Optional
from typing import Dict, Optional, Set

from aws_advanced_python_wrapper.allowed_and_blocked_hosts import \
AllowedAndBlockedHosts
Expand All @@ -37,6 +37,10 @@ def __init__(self) -> None:
self._storage_service: Optional[StorageService] = None
self._monitor_service: Optional[MonitorService] = None
self._thread_pools: Dict[str, ThreadPoolExecutor] = {}
# Some service pools must be drained BEFORE the monitor service is released and connections are closed.
# This prevents worker threads like the topology util threads from continuing to using connections
# after they are closed, causing segfaults.
self._drain_first_pools: Set[str] = set()
self._lock = threading.Lock()

def _ensure_initialized(self) -> None:
Expand All @@ -63,19 +67,24 @@ def monitor_service(self) -> MonitorService:
self._ensure_initialized()
return self._monitor_service # type: ignore

def get_thread_pool(self, name: str, max_workers: Optional[int] = None) -> ThreadPoolExecutor:
def get_thread_pool(self, name: str, max_workers: Optional[int] = None, drain_first: bool = False) -> ThreadPoolExecutor:
pool = self._thread_pools.get(name)
if pool is not None:
if drain_first:
self._drain_first_pools.add(name)
return pool
with self._lock:
if name not in self._thread_pools:
self._thread_pools[name] = ThreadPoolExecutor(
max_workers=max_workers, thread_name_prefix=name)
if drain_first:
self._drain_first_pools.add(name)
return self._thread_pools[name]

def release_thread_pool(self, name: str, wait: bool = True) -> bool:
with self._lock:
pool = self._thread_pools.pop(name, None)
self._drain_first_pools.discard(name)
if pool is not None:
try:
pool.shutdown(wait=wait)
Expand All @@ -85,6 +94,18 @@ def release_thread_pool(self, name: str, wait: bool = True) -> bool:
return False

def release_resources(self) -> None:
# Some thread pools need to be drained first before shutting down the monitor services.
# This prevents segfaults when monitor services shut down and close all the active monitoring connections.
with self._lock:
drain_names = list(self._drain_first_pools)
for name in drain_names:
pool = self._thread_pools.get(name)
if pool is not None:
try:
pool.shutdown(wait=True)
except Exception as e:
logger.debug("CoreServices.ErrorShuttingDownPool", name, e)

if self._monitor_service is not None:
try:
self._monitor_service.release_resources()
Expand Down Expand Up @@ -114,6 +135,7 @@ def release_resources(self) -> None:
except Exception as e:
logger.debug("CoreServices.ErrorShuttingDownPool", name, e)
self._thread_pools.clear()
self._drain_first_pools.clear()


_instance = _ServicesContainer()
Expand All @@ -132,8 +154,8 @@ def get_monitor_service() -> MonitorService:
return _instance.monitor_service


def get_thread_pool(name: str, max_workers: Optional[int] = None) -> ThreadPoolExecutor:
return _instance.get_thread_pool(name, max_workers)
def get_thread_pool(name: str, max_workers: Optional[int] = None, drain_first: bool = False) -> ThreadPoolExecutor:
return _instance.get_thread_pool(name, max_workers, drain_first)


def release_thread_pool(name: str, wait: bool = True) -> bool:
Expand Down
1 change: 1 addition & 0 deletions tests/integration/container/test_aurora_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def props(self):
"connect_timeout": 10,
"monitoring-connect_timeout": 5,
"monitoring-socket_timeout": 5,
"auxiliary_query_timeout_sec": 3,
"autocommit": True,
"cluster_id": "cluster1"
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/container/test_autoscaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def rds_utils(self):

@pytest.fixture
def props(self):
p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 10, "autocommit": True})
p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 10, "auxiliary_query_timeout_sec": 3, "autocommit": True})

if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in TestEnvironment.get_current().get_features() \
or TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in TestEnvironment.get_current().get_features():
Expand Down
1 change: 1 addition & 0 deletions tests/integration/container/test_aws_secrets_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def test_failover_with_secrets_manager(
"connect_timeout": 10,
"monitoring-connect_timeout": 5,
"monitoring-socket_timeout": 5,
"auxiliary_query_timeout_sec": 3,
"topology_refresh_ms": 10,
})

Expand Down
2 changes: 2 additions & 0 deletions tests/integration/container/test_basic_connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def props(self):
p: Properties = Properties({
WrapperProperties.PLUGINS.name: "aurora_connection_tracker,failover",
"connect_timeout": 3,
"auxiliary_query_timeout_sec": 3,
"autocommit": True,
"cluster_id": "cluster1"})

Expand Down Expand Up @@ -143,6 +144,7 @@ def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: T
"connect_timeout": 5,
"monitoring-connect_timeout": 3,
"monitoring-socket_timeout": 3,
"auxiliary_query_timeout_sec": 3,
"autocommit": True})
target_driver_connect = DriverHelper.get_connect_func(test_driver)
conn = AwsWrapperConnection.connect(target_driver_connect, **conn_utils.get_connect_params(conn_utils.reader_cluster_host), **props)
Expand Down
1 change: 1 addition & 0 deletions tests/integration/container/test_basic_functionality.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def props(self):
p: Properties = Properties({
"plugins": "aurora_connection_tracker,failover",
"connect_timeout": 10,
"auxiliary_query_timeout_sec": 3,
"autocommit": True,
"cluster_id": "cluster1"})

Expand Down
3 changes: 2 additions & 1 deletion tests/integration/container/test_custom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def rds_utils(self):
@pytest.fixture(scope='class')
def default_props(self):
p: Properties = Properties(
{"connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1"})
{"connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1",
"auxiliary_query_timeout_sec": 3})

features = TestEnvironment.get_current().get_features()
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features \
Expand Down
1 change: 1 addition & 0 deletions tests/integration/container/test_host_monitoring_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def props(self):
"connect_timeout": 10,
"monitoring-connect_timeout": 5,
"monitoring-socket_timeout": 5,
"auxiliary_query_timeout_sec": 3,
"failure_detection_time_ms": 5_000,
"failure_detection_interval_ms": 5_000,
"failure_detection_count": 1,
Expand Down
1 change: 1 addition & 0 deletions tests/integration/container/test_iam_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def test_failover_with_iam(
"connect_timeout": 10,
"monitoring-connect_timeout": 5,
"monitoring-socket_timeout": 5,
"auxiliary_query_timeout_sec": 3,
"topology_refresh_ms": 10,
"autocommit": True
})
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/container/test_read_write_splitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def props(self, plugin_config, conn_utils):
"plugins": plugin_value,
"socket_timeout": 10,
"connect_timeout": 10,
"auxiliary_query_timeout_sec": 3,
"autocommit": True,
}
)
Expand Down Expand Up @@ -134,6 +135,7 @@ def failover_props(self, plugin_config, conn_utils):
"plugins": f"{plugin_value},failover",
"socket_timeout": 10,
"connect_timeout": 10,
"auxiliary_query_timeout_sec": 3,
"autocommit": True,
"cluster_id": "cluster1"
}
Expand Down
Loading