Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- main
- feat/gdb-rw # temporary

permissions:
id-token: write # This is required for requesting the JWT
Expand All @@ -17,7 +18,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [ "3.11", "3.12", "3.13" ]
python-version: [ "3.11", "3.12", "3.13" ]
environment: [ "mysql", "pg" ]

steps:
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ The following table lists the connection properties used with the AWS Advanced P
| `secrets_manager_secret_username_key` | [Secrets Manager Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheAwsSecretsManagerPlugin.md) |
| `secrets_manager_secret_password_key` | [Secrets Manager Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheAwsSecretsManagerPlugin.md) |
| `reader_host_selector_strategy` | [Connection Strategy](docs/using-the-python-wrapper/using-plugins/UsingTheReadWriteSplittingPlugin.md#connection-strategies) |
| `gdb_rw_home_region` | [GDB Read/Write Splitting Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheGdbReadWriteSplittingPlugin.md) |
| `gdb_rw_restrict_writer_to_home_region` | [GDB Read/Write Splitting Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheGdbReadWriteSplittingPlugin.md) |
| `gdb_rw_restrict_reader_to_home_region` | [GDB Read/Write Splitting Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheGdbReadWriteSplittingPlugin.md) |
| `gdb_enable_global_write_forwarding` | [GDB Read/Write Splitting Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheGdbReadWriteSplittingPlugin.md) |
| `db_user` | [Federated Authentication Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheFederatedAuthenticationPlugin.md) |
| `idp_username` | [Federated Authentication Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheFederatedAuthenticationPlugin.md) |
| `idp_password` | [Federated Authentication Plugin](docs/using-the-python-wrapper/using-plugins/UsingTheFederatedAuthenticationPlugin.md) |
Expand Down
29 changes: 16 additions & 13 deletions aws_advanced_python_wrapper/failover_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,24 @@ def notify_host_list_changed(self, changes: Dict[str, Set[HostEvent]]):
if not self._enable_failover_setting:
return

msg = ""
for key in changes:
msg += f"\n\tHost '{key}': {changes[key]}"
logger.debug("FailoverPlugin.Changes", msg)

current_host = self._plugin_service.current_host_info
if current_host is not None:
if self._is_host_still_valid(current_host.url, changes):
return

for alias in current_host.aliases:
if self._is_host_still_valid(alias + '/', changes):
try:
msg = ""
for key in changes:
msg += f"\n\tHost '{key}': {changes[key]}"
logger.debug("FailoverPlugin.Changes", msg)

current_host = self._plugin_service.current_host_info
if current_host is not None:
if self._is_host_still_valid(current_host.url, changes):
return

logger.debug("FailoverPlugin.InvalidHost", current_host)
for alias in current_host.aliases:
if self._is_host_still_valid(alias + '/', changes):
return

logger.debug("FailoverPlugin.InvalidHost", current_host)
finally:
self._stale_dns_helper.notify_host_list_changed(changes)

def connect(
self,
Expand Down
181 changes: 181 additions & 0 deletions aws_advanced_python_wrapper/gdb_read_write_splitting_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Callable, List, Optional

from aws_advanced_python_wrapper.errors import ReadWriteSplittingError
from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory
from aws_advanced_python_wrapper.read_write_splitting_plugin import \
ReadWriteSplittingPlugin
from aws_advanced_python_wrapper.utils.log import Logger
from aws_advanced_python_wrapper.utils.messages import Messages
from aws_advanced_python_wrapper.utils.properties import (Properties,
WrapperProperties)
from aws_advanced_python_wrapper.utils.rds_utils import RdsUtils

if TYPE_CHECKING:
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
from aws_advanced_python_wrapper.hostinfo import HostInfo
from aws_advanced_python_wrapper.pep249 import Connection
from aws_advanced_python_wrapper.plugin_service import PluginService

logger = Logger(__name__)


class GdbReadWriteSplittingPlugin(ReadWriteSplittingPlugin):
"""Read/write splitting plugin for Aurora Global Database.

Extends the topology-based :class:`ReadWriteSplittingPlugin` to keep
reader and writer connections inside a configured home region. When
enabled, the plugin will refuse to switch to a writer or reader instance
that lives outside that region. Optionally, when Global Write
Forwarding is enabled, the plugin will keep the existing reader
connection in a secondary region instead of failing.
"""

def __init__(self, plugin_service: PluginService, props: Properties):
super().__init__(plugin_service, props)

self._rds_utils: RdsUtils = RdsUtils()
self._restrict_writer_to_home_region: bool = (
WrapperProperties.GDB_RW_RESTRICT_WRITER_TO_HOME_REGION.get_bool(props)
)
self._restrict_reader_to_home_region: bool = (
WrapperProperties.GDB_RW_RESTRICT_READER_TO_HOME_REGION.get_bool(props)
)
self._enable_global_write_forwarding: bool = (
WrapperProperties.GDB_ENABLE_GLOBAL_WRITE_FORWARDING.get_bool(props)
)
self._home_region: Optional[str] = None
self._initialized: bool = False

def connect(
self,
target_driver_func: Callable,
driver_dialect: DriverDialect,
host_info: HostInfo,
props: Properties,
is_initial_connection: bool,
connect_func: Callable,
) -> Connection:
self._init_settings(host_info, props)
return super().connect(
target_driver_func,
driver_dialect,
host_info,
props,
is_initial_connection,
connect_func,
)

def _init_settings(self, init_host_info: HostInfo, props: Properties) -> None:
if self._initialized:
return
self._initialized = True

home_region = WrapperProperties.GDB_RW_HOME_REGION.get(props)
if not home_region:
url_type = self._rds_utils.identify_rds_type(init_host_info.host)
if url_type is not None and url_type.has_region:
home_region = self._rds_utils.get_rds_region(init_host_info.host)

if not home_region:
raise ReadWriteSplittingError(
Messages.get_formatted(
"GdbReadWriteSplittingPlugin.MissingHomeRegion",
init_host_info.host,
)
)

self._home_region = home_region
logger.debug(
"GdbReadWriteSplittingPlugin.ParameterValue",
WrapperProperties.GDB_RW_HOME_REGION.name,
self._home_region,
)

def _initialize_writer_connection(self) -> None:
writer_host = self._get_writer_host_info()
if writer_host is not None and self._is_writer_outside_home_region(writer_host):
if self._enable_global_write_forwarding:
logger.debug(
"GdbReadWriteSplittingPlugin.EnabledGwf",
self._rds_utils.get_rds_region(writer_host.host),
)
return

raise ReadWriteSplittingError(
Messages.get_formatted(
"GdbReadWriteSplittingPlugin.CantConnectWriterOutOfHomeRegion",
writer_host.host,
self._home_region,
)
)

super()._initialize_writer_connection()

def _set_writer_connection(
self, writer_conn: Connection, writer_host_info: HostInfo
) -> None:
if self._is_writer_outside_home_region(writer_host_info):
raise ReadWriteSplittingError(
Messages.get_formatted(
"GdbReadWriteSplittingPlugin.CantConnectWriterOutOfHomeRegion",
writer_host_info.host,
self._home_region,
)
)
super()._set_writer_connection(writer_conn, writer_host_info)

def _get_reader_host_candidates(self) -> List[HostInfo]:
if not self._restrict_reader_to_home_region:
return super()._get_reader_host_candidates()

hosts_in_region = [
host
for host in self._plugin_service.hosts
if self._is_in_home_region(host)
]

if not hosts_in_region:
raise ReadWriteSplittingError(
Messages.get_formatted(
"GdbReadWriteSplittingPlugin.NoAvailableReadersInHomeRegion",
self._home_region,
)
)

return hosts_in_region

def _is_writer_outside_home_region(self, host_info: HostInfo) -> bool:
return (
self._restrict_writer_to_home_region
and not self._is_in_home_region(host_info)
)

def _is_in_home_region(self, host_info: HostInfo) -> bool:
if self._home_region is None:
return True
host_region = self._rds_utils.get_rds_region(host_info.host)
if host_region is None:
return False
return host_region.casefold() == self._home_region.casefold()


class GdbReadWriteSplittingPluginFactory(PluginFactory):
@staticmethod
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
return GdbReadWriteSplittingPlugin(plugin_service, props)
4 changes: 4 additions & 0 deletions aws_advanced_python_wrapper/plugin_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
from aws_advanced_python_wrapper.execute_time_plugin import \
ExecuteTimePluginFactory
from aws_advanced_python_wrapper.failover_plugin import FailoverPluginFactory
from aws_advanced_python_wrapper.gdb_read_write_splitting_plugin import \
GdbReadWriteSplittingPluginFactory
from aws_advanced_python_wrapper.host_availability import HostAvailability
from aws_advanced_python_wrapper.host_list_provider import (
ConnectionStringHostListProvider, HostListProvider,
Expand Down Expand Up @@ -830,6 +832,7 @@ class PluginManager(CanReleaseResources):
"failover_v2": FailoverV2PluginFactory,
"read_write_splitting": ReadWriteSplittingPluginFactory,
"srw": SimpleReadWriteSplittingPluginFactory,
"gdb_rw": GdbReadWriteSplittingPluginFactory,
"fastest_response_strategy": FastestResponseStrategyPluginFactory,
"stale_dns": StaleDnsPluginFactory,
"custom_endpoint": CustomEndpointPluginFactory,
Expand All @@ -855,6 +858,7 @@ class PluginManager(CanReleaseResources):
StaleDnsPluginFactory: 200,
ReadWriteSplittingPluginFactory: 300,
SimpleReadWriteSplittingPluginFactory: 310,
GdbReadWriteSplittingPluginFactory: 320,
FailoverPluginFactory: 400,
FailoverV2PluginFactory: 410,
HostMonitoringPluginFactory: 500,
Expand Down
26 changes: 18 additions & 8 deletions aws_advanced_python_wrapper/read_write_splitting_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from abc import abstractmethod
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Callable, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Set, Tuple

if TYPE_CHECKING:
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
Expand Down Expand Up @@ -344,9 +344,10 @@ def _close_connections(self, close_only_if_idle: bool = True):
self._close_connection(self._writer_connection, close_only_if_idle)

@staticmethod
def log_and_raise_exception(log_msg: str):
logger.error(log_msg)
raise ReadWriteSplittingError(Messages.get(log_msg))
def log_and_raise_exception(log_msg: str, *args):
msg = Messages.get_formatted(log_msg, *args) if args else Messages.get(log_msg)
logger.error(log_msg, *args)
raise ReadWriteSplittingError(msg)

@staticmethod
def _is_connection_usable(conn: Optional[Connection], driver_dialect: DriverDialect):
Expand Down Expand Up @@ -509,14 +510,14 @@ def _initialize_writer_connection(self):
writer_host = self._get_writer_host_info()
if writer_host is None:
self.log_and_raise_exception(
"ReadWriteSplittingPlugin.FailedToConnectToWriter"
"ReadWriteSplittingPlugin.NoWriterFound"
)
return

conn = self._plugin_service.connect(writer_host, self._properties, self)
if conn is None:
self.log_and_raise_exception(
"ReadWriteSplittingPlugin.FailedToConnectToWriter"
"ReadWriteSplittingPlugin.FailedToConnectToWriter", writer_host.url
)
return

Expand Down Expand Up @@ -595,16 +596,25 @@ def _can_host_be_used(self, host_info: HostInfo) -> bool:
hosts = [h.get_host_and_port() for h in self._hosts]
return host_info.get_host_and_port() in hosts

def _get_reader_host_candidates(self) -> List[HostInfo]:
"""Return the list of host candidates used when selecting a reader.

Subclasses can override this method to filter the candidate list,
for example to restrict readers to a specific region.
"""
return list(self._plugin_service.hosts)

def _open_new_reader_connection(
self,
) -> tuple[Optional[Connection], Optional[HostInfo]]:
conn: Optional[Connection] = None
reader_host: Optional[HostInfo] = None

conn_attempts = len(self._plugin_service.hosts) * 2
host_candidates = self._get_reader_host_candidates()
conn_attempts = len(host_candidates) * 2
for _ in range(conn_attempts):
host = self._plugin_service.get_host_info_by_strategy(
HostRole.READER, self._reader_selector_strategy
HostRole.READER, self._reader_selector_strategy, host_candidates
)
if host is not None:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight=[WeightedRandomHos
SimpleReadWriteSplittingPlugin.MissingRequiredConfigParameter=[SimpleReadWriteSplittingPlugin] Configuration parameter {} is required.
SimpleReadWriteSplittingPlugin.IncorrectConfiguration=[SimpleReadWriteSplittingPlugin] Unable to verify connections with this current configuration. Ensure a correct value is provided to the configuration parameter {}.

GdbReadWriteSplittingPlugin.MissingHomeRegion=[GdbReadWriteSplittingPlugin] Unable to parse home region from endpoint '{}'. Please ensure you have set the 'gdb_rw_home_region' connection parameter.
GdbReadWriteSplittingPlugin.CantConnectWriterOutOfHomeRegion=[GdbReadWriteSplittingPlugin] Writer connection to '{}' is not allowed since it is out of home region '{}'.
GdbReadWriteSplittingPlugin.NoAvailableReadersInHomeRegion=[GdbReadWriteSplittingPlugin] No available reader nodes in home region '{}'.
GdbReadWriteSplittingPlugin.ParameterValue=[GdbReadWriteSplittingPlugin] {}={}
GdbReadWriteSplittingPlugin.EnabledGwf=[GdbReadWriteSplittingPlugin] The current primary writer region is '{}' and is not within the home region. Keeping the current connection and letting Global Write Forwarding redirect writes to the primary region.

SqlAlchemyPooledConnectionProvider.PoolNone=[SqlAlchemyPooledConnectionProvider] Attempted to find or create a pool for '{}' but the result of the attempt evaluated to None.
SqlAlchemyPooledConnectionProvider.UnableToCreateDefaultKey=[SqlAlchemyPooledConnectionProvider] Unable to create a default key for internal connection pools. By default, the user parameter is used, but the given user evaluated to None or the empty string (""). Please ensure you have passed a valid user in the connection properties.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def _initialize_writer_connection(self):

if conn is None:
self.log_and_raise_exception(
"ReadWriteSplittingPlugin.FailedToConnectToWriter"
"ReadWriteSplittingPlugin.FailedToConnectToWriter",
self._write_endpoint_host_info.url
)
return

Expand Down
26 changes: 26 additions & 0 deletions aws_advanced_python_wrapper/utils/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,32 @@ class WrapperProperties:
1000,
)

# Global Database Read/Write Splitting
GDB_RW_HOME_REGION = WrapperProperty(
"gdb_rw_home_region",
"Specifies the home region for read/write splitting in a Global Database setup.",
None,
)

GDB_RW_RESTRICT_WRITER_TO_HOME_REGION = WrapperProperty(
"gdb_rw_restrict_writer_to_home_region",
"Prevents connections to a writer instance outside of the defined home region.",
True,
)

GDB_RW_RESTRICT_READER_TO_HOME_REGION = WrapperProperty(
"gdb_rw_restrict_reader_to_home_region",
"Prevents connections to a reader instance outside of the defined home region.",
True,
)

GDB_ENABLE_GLOBAL_WRITE_FORWARDING = WrapperProperty(
"gdb_enable_global_write_forwarding",
"Set to True to enable Global Write Forwarding when connected to a "
"reader connection in a secondary global region.",
False,
)


class PropertiesUtils:
_MONITORING_PROPERTY_PREFIX = "monitoring-"
Expand Down
Loading
Loading