diff --git a/.gitignore b/.gitignore index ef2cd267b..5dadf6d5b 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,9 @@ coverage.lcov # macOS specific files .DS_Store + +# Integration test scratch dirs +tests/integration/.binding-data/ + +# Crypto test material generated at test time (see tests/crypto_utils.py) +tests/integration/keys/ diff --git a/CLAUDE.md b/CLAUDE.md index 490328fe9..b57ac6daf 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,14 +1,20 @@ @AGENTS.md -Use pathlib instead of os.path. -Use httpx instead of urllib. -subprocess(`shell=True`) is used only when it makes the code more readable. Use either shlex or args lists. -subprocess calls should have a reasonable timeout. -Use modern Python (3.10+) features. Make all code strongly typed. Keep conditional nesting to a minimum, and use guard clauses when possible. -Aim for medium "visual complexity": use intermediate variables to store results of nested/complex function calls, but don't create a new variable for everything. -Avoid comments unless there is a gotcha, a complex algorithm or anything an experienced code reviewer needs to be aware of. Focus on making better Google-style docstrings instead. +Aim for medium visual complexity: use intermediate variables to store results of nested/complex function calls. A complex function call could be: +- `f(Object(a=1, b=2, c=3))`, the inner object has more than 2 meaningful args +- `f(Object((a, b)))`, 2 levels of nesting or anything with a long chain of closing parens +- `small_transformation(ImportantObject())`, the object itself is the main subject of the function but the transformation steals the focus +Use descriptive, self-documenting names for these intermediate variables. +Closely related variable names should share a root and use different suffixes. For example, `request_original` and `request_clean`, but not `clean_request`. +Avoid comments unless there is a gotcha, a complex algorithm or anything an experienced code reviewer needs to be aware of. Focus on making short but descriptive Google-style docstrings instead. -The user is not always right. Be skeptical and do not blindly comply if something doesn't make sense. +Use modern Python (3.10+) features. +Use pathlib instead of os.path. +Use httpx instead of urllib. +`subprocess(shell=True)` is used only when it makes the code more readable. Use either shlex or args lists. +Anything that can have an explicit timeout should have one. Code should be cross-platform and production ready. + +The user is not always right. Be skeptical and do not blindly comply if something doesn't make sense. diff --git a/dev-requirements.txt b/dev-requirements.txt index d5ea78f85..0e2df4c3d 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -4,12 +4,16 @@ mypy-protobuf>=2.9 tox>=4.3.0 coverage>=5.3 pytest>=7.0 +pytest-asyncio>=0.23 wheel # used in unit test only opentelemetry-sdk opentelemetry-instrumentation-grpc httpx>=0.28.1 pyOpenSSL>=26.0.0 +# used by tests to generate crypto keys at runtime +cryptography>=42.0.0 +redis>=7.4.0 # needed for type checking Flask>=1.1 # needed for auto fix @@ -20,3 +24,4 @@ python-dotenv>=1.2.2 pydantic>=2.13.3 # needed for yaml file generation in examples PyYAML>=6.0.3 +# needed for direct Redis access in integration tests diff --git a/examples/pubsub-streaming-async/publisher.py b/examples/pubsub-streaming-async/publisher.py index e4abf3593..5f0445ca2 100644 --- a/examples/pubsub-streaming-async/publisher.py +++ b/examples/pubsub-streaming-async/publisher.py @@ -44,7 +44,7 @@ async def publish_events(): ) # Print the request - print(req_data, flush=True) + print(req_data) await asyncio.sleep(1) diff --git a/examples/pubsub-streaming-async/subscriber.py b/examples/pubsub-streaming-async/subscriber.py index de51a797e..02fd2bd20 100644 --- a/examples/pubsub-streaming-async/subscriber.py +++ b/examples/pubsub-streaming-async/subscriber.py @@ -19,7 +19,7 @@ def process_message(message): global counter counter += 1 # Process the message here - print(f'Processing message: {message.data()} from {message.topic()}...', flush=True) + print(f'Processing message: {message.data()} from {message.topic()}...') return 'success' diff --git a/pyproject.toml b/pyproject.toml index 7f0d3cbb1..8870d6a5f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,3 +28,4 @@ markers = [ 'example_dir(name): set the example directory for the dapr fixture', ] pythonpath = ["."] +asyncio_mode = "auto" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 000000000..d4accdfc3 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,40 @@ +import subprocess +from typing import Callable + +import pytest + +REDIS_CONTAINER = 'dapr_redis' + + +@pytest.fixture(scope='session') +def flush_redis() -> None: + """Flush the ``dapr_redis`` container once per session.""" + subprocess.run( + args=('docker', 'exec', REDIS_CONTAINER, 'redis-cli', 'FLUSHDB'), + check=True, + capture_output=True, + timeout=10, + ) + + +@pytest.fixture(scope='session') +def redis_set_config() -> Callable[..., None]: + """Dapr encodes values in the config store as ``value||version``""" + + def _set(key: str, value: str, version: int = 1) -> None: + subprocess.run( + args=( + 'docker', + 'exec', + REDIS_CONTAINER, + 'redis-cli', + 'SET', + key, + f'{value}||{version}', + ), + check=True, + capture_output=True, + timeout=10, + ) + + return _set diff --git a/tests/crypto_utils.py b/tests/crypto_utils.py new file mode 100644 index 000000000..d9f674863 --- /dev/null +++ b/tests/crypto_utils.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import secrets +from pathlib import Path + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + +RSA_KEY_FILENAME = 'rsa-private-key.pem' +SYMMETRIC_KEY_FILENAME = 'symmetric-key-256' + +_RSA_KEY_SIZE = 4096 +_SYMMETRIC_KEY_BYTES = 32 + + +def write_test_keys(target_dir: Path) -> None: + """Write a fresh 4096-bit RSA private key (PKCS8 PEM) and a 256-bit AES key. + + File names match those expected by ``examples/crypto/crypto.py`` and the + ``cryptostore.yaml`` component used by the integration tests. + """ + target_dir.mkdir(parents=True, exist_ok=True) + + rsa_key = rsa.generate_private_key(public_exponent=65537, key_size=_RSA_KEY_SIZE) + rsa_pem = rsa_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + (target_dir / RSA_KEY_FILENAME).write_bytes(rsa_pem) + (target_dir / SYMMETRIC_KEY_FILENAME).write_bytes(secrets.token_bytes(_SYMMETRIC_KEY_BYTES)) + + +def remove_test_keys(target_dir: Path) -> None: + for name in (RSA_KEY_FILENAME, SYMMETRIC_KEY_FILENAME): + (target_dir / name).unlink(missing_ok=True) diff --git a/tests/examples/conftest.py b/tests/examples/conftest.py index 33be85c66..50c381117 100644 --- a/tests/examples/conftest.py +++ b/tests/examples/conftest.py @@ -8,7 +8,7 @@ import pytest -from tests._process_utils import get_kwargs_for_process_group, terminate_process_group +from tests.process_utils import get_kwargs_for_process_group, terminate_process_group REPO_ROOT = Path(__file__).resolve().parent.parent.parent EXAMPLES_DIR = REPO_ROOT / 'examples' diff --git a/tests/examples/test_configuration.py b/tests/examples/test_configuration.py index 45f76624a..0d11eb966 100644 --- a/tests/examples/test_configuration.py +++ b/tests/examples/test_configuration.py @@ -1,10 +1,7 @@ -import subprocess import time import pytest -REDIS_CONTAINER = 'dapr_redis' - EXPECTED_LINES = [ 'Got key=orderId1 value=100 version=1 metadata={}', 'Got key=orderId2 value=200 version=1 metadata={}', @@ -13,33 +10,17 @@ ] -@pytest.fixture() -def redis_config(): - """Seed configuration values in Redis before the test.""" - subprocess.run( - ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId1', '100||1'), - check=True, - capture_output=True, - ) - subprocess.run( - ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId2', '200||1'), - check=True, - capture_output=True, - ) - - @pytest.mark.example_dir('configuration') -def test_configuration(dapr, redis_config): +def test_configuration(dapr, redis_set_config): + redis_set_config('orderId1', '100') + redis_set_config('orderId2', '200') + dapr.start( '--app-id configexample --resources-path components/ -- python3 configuration.py', wait=5, ) # Update Redis to trigger the subscription notification - subprocess.run( - ('docker', 'exec', 'dapr_redis', 'redis-cli', 'SET', 'orderId2', '210||2'), - check=True, - capture_output=True, - ) + redis_set_config('orderId2', '210', version=2) # configuration.py sleeps 10s after subscribing before it unsubscribes. # Wait long enough for the full script to finish. time.sleep(10) diff --git a/tests/examples/test_crypto.py b/tests/examples/test_crypto.py index 881159aa2..3de996467 100644 --- a/tests/examples/test_crypto.py +++ b/tests/examples/test_crypto.py @@ -16,7 +16,7 @@ @pytest.fixture() -def crypto_artifacts(): +def cleanup_crypto_outputs(): """Clean up output files written by the crypto example on teardown. Example RSA and AES keys are in ``examples/crypto/keys/``. @@ -27,7 +27,7 @@ def crypto_artifacts(): @pytest.mark.example_dir('crypto') -def test_crypto(dapr, crypto_artifacts): +def test_crypto(dapr, cleanup_crypto_outputs): output = dapr.run( '--app-id crypto --resources-path ./components/ -- python3 crypto.py', timeout=30, @@ -38,7 +38,7 @@ def test_crypto(dapr, crypto_artifacts): @pytest.mark.example_dir('crypto') -def test_crypto_async(dapr, crypto_artifacts): +def test_crypto_async(dapr, cleanup_crypto_outputs): output = dapr.run( '--app-id crypto-async --resources-path ./components/ -- python3 crypto-async.py', timeout=30, diff --git a/tests/examples/test_langgraph_checkpointer.py b/tests/examples/test_langgraph_checkpointer.py index 07f58788f..8c57e6250 100644 --- a/tests/examples/test_langgraph_checkpointer.py +++ b/tests/examples/test_langgraph_checkpointer.py @@ -1,9 +1,10 @@ import subprocess -import time import httpx import pytest +from tests.wait_utils import wait_until + OLLAMA_URL = 'http://localhost:11434' MODEL = 'llama3.2:latest' @@ -31,15 +32,6 @@ def _model_available() -> bool: return any(m['name'] == MODEL for m in resp.json().get('models', [])) -def _wait_for_ollama(timeout: float = 30.0, interval: float = 0.5) -> None: - deadline = time.monotonic() + timeout - while time.monotonic() < deadline: - if _ollama_ready(): - return - time.sleep(interval) - raise TimeoutError(f'ollama serve did not become ready within {timeout}s') - - @pytest.fixture() def ollama(): """Ensure Ollama is running and the required model is pulled. @@ -57,7 +49,7 @@ def ollama(): ) except FileNotFoundError: pytest.skip('ollama is not installed') - _wait_for_ollama() + wait_until(_ollama_ready, timeout=30.0, interval=0.5) if not _model_available(): subprocess.run(['ollama', 'pull', MODEL], check=True, capture_output=True) @@ -69,17 +61,6 @@ def ollama(): started.wait(timeout=10) -@pytest.fixture() -def flush_redis(): - """This test is not replayable if the checkpointer state store is not clean.""" - subprocess.run( - ['docker', 'exec', 'dapr_redis', 'redis-cli', 'FLUSHDB'], - capture_output=True, - check=True, - timeout=10, - ) - - @pytest.mark.example_dir('langgraph-checkpointer') def test_langgraph_checkpointer(dapr, ollama, flush_redis): output = dapr.run( diff --git a/tests/examples/test_pubsub_streaming_async.py b/tests/examples/test_pubsub_streaming_async.py index 4ea446968..56edda4dc 100644 --- a/tests/examples/test_pubsub_streaming_async.py +++ b/tests/examples/test_pubsub_streaming_async.py @@ -6,7 +6,7 @@ "Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_B1...", "Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_B1...", "Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_B1...", - 'Closing subscription...', + "Closing subscription...", ] EXPECTED_HANDLER_SUBSCRIBER = [ @@ -15,7 +15,7 @@ "Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_B2...", "Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_B2...", "Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_B2...", - 'Closing subscription...', + "Closing subscription...", ] EXPECTED_PUBLISHER = [ @@ -30,12 +30,12 @@ @pytest.mark.example_dir('pubsub-streaming-async') def test_pubsub_streaming_async(dapr): dapr.start( - '--app-id python-subscriber --app-protocol grpc -- python3 subscriber.py --topic=TOPIC_B1', + '--app-id python-subscriber --app-protocol grpc -- python3 -u subscriber.py --topic=TOPIC_B1', wait=5, ) publisher_output = dapr.run( '--app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 ' - '--enable-app-health-check -- python3 publisher.py --topic=TOPIC_B1', + '--enable-app-health-check -- python3 -u publisher.py --topic=TOPIC_B1', timeout=30, ) for line in EXPECTED_PUBLISHER: @@ -49,12 +49,12 @@ def test_pubsub_streaming_async(dapr): @pytest.mark.example_dir('pubsub-streaming-async') def test_pubsub_streaming_async_handler(dapr): dapr.start( - '--app-id python-subscriber --app-protocol grpc -- python3 subscriber-handler.py --topic=TOPIC_B2', + '--app-id python-subscriber --app-protocol grpc -- python3 -u subscriber-handler.py --topic=TOPIC_B2', wait=5, ) publisher_output = dapr.run( '--app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 ' - '--enable-app-health-check -- python3 publisher.py --topic=TOPIC_B2', + '--enable-app-health-check -- python3 -u publisher.py --topic=TOPIC_B2', timeout=30, ) for line in EXPECTED_PUBLISHER: diff --git a/tests/integration/AGENTS.md b/tests/integration/AGENTS.md index 2f40750f8..862a3b383 100644 --- a/tests/integration/AGENTS.md +++ b/tests/integration/AGENTS.md @@ -27,18 +27,23 @@ tox -e integration -- test_state_store.py -k test_save_and_get ``` tests/integration/ -├── conftest.py # DaprTestEnvironment + fixtures (dapr_env, apps_dir, components_dir) +├── conftest.py # DaprTestEnvironment + fixtures (dapr_env, apps_dir, resources_dir, crypto_keys) ├── test_*.py # Test files (one per building block) ├── apps/ # Helper apps started alongside sidecars │ ├── invoke_receiver.py # gRPC method handler for invoke tests │ └── pubsub_subscriber.py # Subscriber that persists messages to state store -├── components/ # Dapr component YAMLs loaded by all sidecars -│ ├── statestore.yaml # state.redis +├── resources/ # Dapr component YAMLs loaded by all sidecars +│ ├── statestore.yaml # state.redis (also configured as actor state store) │ ├── pubsub.yaml # pubsub.redis │ ├── lockstore.yaml # lock.redis │ ├── configurationstore.yaml # configuration.redis -│ └── localsecretstore.yaml # secretstores.local.file -└── secrets.json # Secrets file for localsecretstore component +│ ├── localsecretstore.yaml # secretstores.local.file +│ ├── localbinding.yaml # bindings.localstorage (rootPath=./.binding-data) +│ ├── cryptostore.yaml # crypto.dapr.localstorage (path=./keys) +│ └── conversation.yaml # conversation.echo +├── keys/ # RSA + symmetric keys for cryptostore (generated at test time, gitignored) +├── secrets.json # Secrets file for localsecretstore component +└── .binding-data/ # Created on demand for localbinding rootPath (gitignored) ``` ## Fixtures @@ -49,10 +54,22 @@ Sidecar and client fixtures are **module-scoped** — one sidecar per test file. |---------|-------|------|-------------| | `dapr_env` | module | `DaprTestEnvironment` | Manages sidecar lifecycle; call `start_sidecar()` to get a client | | `apps_dir` | module | `Path` | Path to `tests/integration/apps/` | -| `components_dir` | module | `Path` | Path to `tests/integration/components/` | -| `wait_until` | function | `Callable` | Polling helper `(predicate, timeout=10, interval=0.1)` for eventual-consistency assertions | +| `resources_dir` | module | `Path` | Path to `tests/integration/resources/` | +| `crypto_keys` | session | `Path` | Generates ephemeral RSA + AES keys under `tests/integration/keys/` for the cryptostore component (see `tests/crypto_utils.py`) | +| `flush_redis` | session | `None` | Side-effect fixture that clears the `dapr_redis` container once per session | +| `redis_set_config` | session | `Callable` | Returns `_set(key, value, version=1)` that seeds a Dapr configuration value into Redis (`value||version`) | -Each test file defines its own module-scoped `client` fixture that calls `dapr_env.start_sidecar(...)`. +`flush_redis` and `redis_set_config` are session-scoped (defined in `tests/conftest.py`) so module-scoped fixtures can depend on them. + +Polling helpers are **plain functions**, not fixtures — import them directly: + +```python +from tests.wait_utils import wait_until, wait_until_async +``` + +Both have signature `(condition, timeout=10.0, interval=0.1)` and raise `TimeoutError` if the deadline elapses. `wait_until_async` awaits an awaitable condition. + +Each test file defines its own module-scoped fixture (`client` or `sidecar`) that calls `dapr_env.start_sidecar(...)`. ## Building blocks covered @@ -60,11 +77,36 @@ Each test file defines its own module-scoped `client` fixture that calls `dapr_e |-----------|---------------|-------------------| | `test_state_store.py` | State management | `save_state`, `get_state`, `save_bulk_state`, `get_bulk_state`, `execute_state_transaction`, `delete_state` | | `test_invoke.py` | Service invocation | `invoke_method` | -| `test_pubsub.py` | Pub/sub | `publish_event`, `get_state` (to verify delivery) | +| `test_pubsub.py` | Pub/sub | `publish_event`, `publish_events`, `get_state` (to verify delivery) | | `test_secret_store.py` | Secrets | `get_secret`, `get_bulk_secret` | | `test_metadata.py` | Metadata | `get_metadata`, `set_metadata` | | `test_distributed_lock.py` | Distributed lock | `try_lock`, `unlock`, context manager | | `test_configuration.py` | Configuration | `get_configuration`, `subscribe_configuration`, `unsubscribe_configuration` | +| `test_jobs.py` | Jobs scheduler | `schedule_job_alpha1`, `get_job_alpha1`, `delete_job_alpha1` | +| `test_invoke_binding.py` | Output bindings | `invoke_binding` (create/get/delete against `bindings.localstorage`) | +| `test_crypto.py` | Cryptography | `encrypt`, `decrypt` (RSA + AES round-trips against `crypto.dapr.localstorage`) | +| `test_conversation.py` | Conversation | `converse_alpha1`, `converse_alpha2` against `conversation.echo` | +| `test_workflow.py` | Workflow (`dapr-ext-workflow`) | `WorkflowRuntime`, `DaprWorkflowClient.schedule_new_workflow`, `wait_for_workflow_start`, `wait_for_workflow_completion`, `raise_workflow_event`, `pause_workflow`, `resume_workflow`, `terminate_workflow`, `purge_workflow`, `get_workflow_state` | + +### Async client coverage + +Async counterparts exercise `dapr.aio.clients.DaprClient` (the gRPC async client). Each file mirrors its sync sibling with smoke tests — the sync suite validates SDK logic end-to-end, the async suite verifies the `aio` transport. + +| File | Covers | +|------|--------| +| `test_state_store_async.py` | `save_state`, `get_state`, `delete_state`, `execute_state_transaction` | +| `test_invoke_async.py` | `invoke_method` | +| `test_invoke_binding_async.py` | `invoke_binding` (create/get) | +| `test_pubsub_async.py` | `publish_event`, `publish_events` | +| `test_secret_store_async.py` | `get_secret`, `get_bulk_secret` | +| `test_configuration_async.py` | `get_configuration` | +| `test_distributed_lock_async.py` | `try_lock`, `unlock` | +| `test_metadata_async.py` | `get_metadata`, `set_metadata` | +| `test_jobs_async.py` | `schedule_job_alpha1`, `get_job_alpha1`, `delete_job_alpha1` | +| `test_crypto_async.py` | `encrypt`, `decrypt` | +| `test_conversation_async.py` | `converse_alpha1`, `converse_alpha2` | + +Async tests use `pytest-asyncio` in auto mode (configured in `pyproject.toml`). Any `async def test_*` is run as a coroutine — no decorator required. The sidecar fixture stays sync (it just starts `dapr run`); each test creates a short-lived `async with AsyncDaprClient(address='127.0.0.1:50001') as d:` block. ## Port allocation @@ -81,7 +123,7 @@ Some building blocks (invoke, pubsub) require an app process running alongside t 1. Create `test_.py` 2. Add a module-scoped `client` fixture that calls `dapr_env.start_sidecar(app_id='test-')` -3. If the building block needs a new Dapr component, add a YAML to `components/` +3. If the building block needs a new Dapr component, add a YAML to `resources/` 4. If the building block needs a running app, add it to `apps/` and pass `app_cmd` / `app_port` to `start_sidecar()` 5. Use unique keys/resource IDs per test to avoid interference (the sidecar is shared within a module) 6. Assert on SDK return types and gRPC status codes, not on string output @@ -90,6 +132,11 @@ Some building blocks (invoke, pubsub) require an app process running alongside t - **Requires `dapr init`** — the tests assume a local Dapr runtime with Redis (`dapr_redis` container on `localhost:6379`), which `dapr init` sets up automatically. - **Configuration tests seed Redis directly** via `docker exec dapr_redis redis-cli`. -- **Lock and configuration APIs are alpha** and emit `UserWarning` on every call. Tests suppress these with `pytestmark = pytest.mark.filterwarnings('ignore::UserWarning')`. -- **`localsecretstore.yaml` uses a relative path** (`secrets.json`) resolved against `cwd=INTEGRATION_DIR`. +- **Alpha-API tests suppress `UserWarning`** via +- `pytestmark = pytest.mark.filterwarnings('ignore::UserWarning')`. The SDK's alpha APIs (lock, crypto, jobs) emit a `UserWarning` per call. In production this is dedup'd to one emission per call site by Python's default warning filter (`__warningregistry__`), but pytest resets that registry between tests via its per-test `catch_warnings` context, so the warning re-fires in every test. The suppression is a pytest workaround, not a sign of a bug in the SDK. +- **`localsecretstore.yaml` uses a relative path** (`secrets.json`) resolved against `cwd=INTEGRATION_DIR`. Same pattern applies to `localbinding.yaml` (`./.binding-data`) and `cryptostore.yaml` (`./keys`). +- **`bindings.localstorage` refuses to initialize if `rootPath` does not exist** — `conftest.py` creates `.binding-data/` at import time so every sidecar can load the component. +- **`statestore.yaml` has `actorStateStore: "true"`** because workflow uses the actor runtime. The flag is additive — regular state tests are unaffected. +- **Workflow tests run the `WorkflowRuntime` in-process** and connect to the sidecar's gRPC port (default 50001). No external app is needed. - **Dapr may normalize response fields** — e.g., `content_type` may lose charset parameters when proxied through gRPC. Assert on the media type prefix, not the full string. +- **Error shapes vary** — `invoke_binding` surfaces sidecar errors as raw `grpc.RpcError`, while other APIs (jobs, state) wrap them in `DaprGrpcError`. Match what the method actually raises. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index bd54b86c1..0d1a79643 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,24 +1,27 @@ import shlex +import shutil import subprocess import tempfile -import time from contextlib import contextmanager from pathlib import Path -from typing import Any, Callable, Generator, Iterator, TypeVar +from typing import IO, Any, Generator, Iterator import httpx import pytest from dapr.clients import DaprClient from dapr.conf import settings -from tests._process_utils import get_kwargs_for_process_group, terminate_process_group - -T = TypeVar('T') +from tests.crypto_utils import remove_test_keys, write_test_keys +from tests.process_utils import get_kwargs_for_process_group, terminate_process_group +from tests.wait_utils import wait_until INTEGRATION_DIR = Path(__file__).resolve().parent RESOURCES_DIR = INTEGRATION_DIR / 'resources' APPS_DIR = INTEGRATION_DIR / 'apps' +BINDING_DATA_DIR = INTEGRATION_DIR / '.binding-data' +CRYPTO_KEYS_DIR = INTEGRATION_DIR / 'keys' + class DaprTestEnvironment: """Manages Dapr sidecars and returns SDK clients for programmatic testing. @@ -30,6 +33,7 @@ class returns real DaprClient instances so tests can make assertions against SDK def __init__(self, default_resources: Path = RESOURCES_DIR) -> None: self._default_resources = default_resources self._processes: list[subprocess.Popen[str]] = [] + self._log_files: list[IO[str]] = [] self._clients: list[DaprClient] = [] def start_sidecar( @@ -50,7 +54,7 @@ def start_sidecar( http_port: Sidecar HTTP port (also used for the SDK health check). app_port: Port the app listens on (implies ``--app-protocol grpc``). app_cmd: Shell command to start alongside the sidecar. - resources: Path to resource YAML directory. Defaults to + resources: Path to resources YAML directory. Defaults to ``tests/integration/resources/``. """ resources = resources or self._default_resources @@ -72,16 +76,18 @@ def start_sidecar( if app_cmd is not None: cmd.extend(['--', *shlex.split(app_cmd)]) - with tempfile.NamedTemporaryFile(mode='w', suffix=f'-{app_id}.log') as log: - proc = subprocess.Popen( - cmd, - cwd=INTEGRATION_DIR, - stdout=log, - stderr=subprocess.STDOUT, - text=True, - **get_kwargs_for_process_group(), - ) + # Keep the log file handle alive for the lifetime of the sidecar + log_file = tempfile.NamedTemporaryFile(mode='w', suffix=f'-{app_id}.log') + proc = subprocess.Popen( + cmd, + cwd=INTEGRATION_DIR, + stdout=log_file, + stderr=subprocess.STDOUT, + text=True, + **get_kwargs_for_process_group(), + ) self._processes.append(proc) + self._log_files.append(log_file) # Point the SDK health check at the actual sidecar HTTP port. # DaprHealth.wait_for_sidecar() reads settings.DAPR_HTTP_PORT, which @@ -116,22 +122,9 @@ def cleanup(self) -> None: proc.wait() self._processes.clear() - -def _wait_until( - condition: Callable[[], T | None], - timeout: float = 10.0, - interval: float = 0.1, -) -> T: - """Poll `predicate` until it returns a truthy value. - Raises `TimeoutError` if it never returns.""" - deadline = time.monotonic() + timeout - while True: - result = condition() - if result: - return result - if time.monotonic() >= deadline: - raise TimeoutError(f'wait_until timed out after {timeout}s') - time.sleep(interval) + for log_file in self._log_files: + log_file.close() + self._log_files.clear() def _wait_for_app_health(http_port: int, timeout: float = 30.0) -> None: @@ -144,12 +137,11 @@ def _wait_for_app_health(http_port: int, timeout: float = 30.0) -> None: def _check() -> bool: try: - response = httpx.get(url, timeout=2.0) + return httpx.get(url, timeout=2.0).is_success except httpx.HTTPError: return False - return response.is_success - _wait_until(_check, timeout=timeout, interval=0.2) + wait_until(_check, timeout=timeout, interval=0.2) @contextmanager @@ -160,8 +152,8 @@ def _isolate_dapr_settings() -> Iterator[None]: ``dapr/clients/http/helpers.py``): - ``DAPR_HTTP_ENDPOINT``, if set, wins and bypasses host/port entirely. - - ``DAPR_RUNTIME_HOST`` is the host resource of the fallback URL. - - ``DAPR_HTTP_PORT`` is the port resource of the fallback URL. + - ``DAPR_RUNTIME_HOST`` is the host component of the fallback URL. + - ``DAPR_HTTP_PORT`` is the port component of the fallback URL. Any of these may be populated from the developer's environment (the Dapr CLI sets them); without an override the SDK health check could target the @@ -187,8 +179,7 @@ def dapr_env() -> Generator[DaprTestEnvironment, Any, None]: """Provides a DaprTestEnvironment for programmatic SDK testing. Module-scoped so that all tests in a file share a single Dapr sidecar, - avoiding port conflicts from rapid start/stop cycles and cutting total - test time significantly. + avoiding port conflicts from rapid start/stop cycles. """ with _isolate_dapr_settings(): env = DaprTestEnvironment() @@ -198,10 +189,18 @@ def dapr_env() -> Generator[DaprTestEnvironment, Any, None]: env.cleanup() -@pytest.fixture -def wait_until() -> Callable[..., Any]: - """Returns the ``_wait_until(condition, timeout=10, interval=0.1)`` helper.""" - return _wait_until +@pytest.fixture(autouse=True) +def fail_if_dead_sidecars(dapr_env: DaprTestEnvironment) -> None: + """Fail the next test cleanly if a managed sidecar has died. + + Without this, a crashed sidecar produces a cascade of gRPC connection + timeouts on every subsequent test in the module. + """ + dead = [proc for proc in dapr_env._processes if proc.poll() is not None] + if not dead: + return + details = ', '.join(f'pid={p.pid} exit={p.returncode}' for p in dead) + raise RuntimeError(f'Dapr sidecar exited unexpectedly: {details}') @pytest.fixture(scope='module') @@ -212,3 +211,24 @@ def apps_dir() -> Path: @pytest.fixture(scope='module') def resources_dir() -> Path: return RESOURCES_DIR + + +@pytest.fixture(scope='session', autouse=True) +def _binding_data_dir() -> Generator[None, None, None]: + """Provide a fresh ``.binding-data/`` for the localbinding component""" + shutil.rmtree(BINDING_DATA_DIR, ignore_errors=True) + BINDING_DATA_DIR.mkdir() + try: + yield + finally: + shutil.rmtree(BINDING_DATA_DIR, ignore_errors=True) + + +@pytest.fixture(scope='session') +def crypto_keys() -> Generator[Path, None, None]: + """Generate temporary RSA + AES keys for ``cryptostore.yaml``.""" + write_test_keys(CRYPTO_KEYS_DIR) + try: + yield CRYPTO_KEYS_DIR + finally: + remove_test_keys(CRYPTO_KEYS_DIR) diff --git a/tests/integration/resources/conversation.yaml b/tests/integration/resources/conversation.yaml new file mode 100644 index 000000000..efb651fef --- /dev/null +++ b/tests/integration/resources/conversation.yaml @@ -0,0 +1,7 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: echo +spec: + type: conversation.echo + version: v1 diff --git a/tests/integration/resources/cryptostore.yaml b/tests/integration/resources/cryptostore.yaml new file mode 100644 index 000000000..5926ca65d --- /dev/null +++ b/tests/integration/resources/cryptostore.yaml @@ -0,0 +1,10 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: cryptostore +spec: + type: crypto.dapr.localstorage + version: v1 + metadata: + - name: path + value: ./keys diff --git a/tests/integration/resources/localbinding.yaml b/tests/integration/resources/localbinding.yaml new file mode 100644 index 000000000..ae0ee7603 --- /dev/null +++ b/tests/integration/resources/localbinding.yaml @@ -0,0 +1,10 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: localbinding +spec: + type: bindings.localstorage + version: v1 + metadata: + - name: rootPath + value: ./.binding-data diff --git a/tests/integration/resources/statestore.yaml b/tests/integration/resources/statestore.yaml index a0c53bc40..2f676bff8 100644 --- a/tests/integration/resources/statestore.yaml +++ b/tests/integration/resources/statestore.yaml @@ -10,3 +10,5 @@ spec: value: localhost:6379 - name: redisPassword value: "" + - name: actorStateStore + value: "true" diff --git a/tests/integration/test_configuration.py b/tests/integration/test_configuration.py index e73f1a16a..f1fe35f92 100644 --- a/tests/integration/test_configuration.py +++ b/tests/integration/test_configuration.py @@ -1,90 +1,95 @@ -import subprocess import threading -import time import pytest +import redis from dapr.clients.grpc._response import ConfigurationResponse +from tests.wait_utils import wait_until STORE = 'configurationstore' -REDIS_CONTAINER = 'dapr_redis' -def _redis_set(key: str, value: str, version: int = 1) -> None: - """Seed a configuration value directly in Redis. +@pytest.fixture(scope='module') +def client(dapr_env, redis_set_config): + redis_set_config('cfg-key-1', 'val-1') + redis_set_config('cfg-key-2', 'val-2') + return dapr_env.start_sidecar(app_id='test-config') + - Dapr's Redis configuration store encodes values as ``value||version``. - """ - subprocess.run( - args=('docker', 'exec', REDIS_CONTAINER, 'redis-cli', 'SET', key, f'{value}||{version}'), - check=True, - capture_output=True, - timeout=10, +@pytest.mark.xfail( + reason='The sidecar returns the subscription ID before the subscription is active', +) +def test_subscribe_first_update_race(client): + r = redis.Redis(host='127.0.0.1', port=6379) + r.ping() + event = threading.Event() + sub_id = client.subscribe_configuration( + store_name=STORE, + keys=['cfg-race-key'], + handler=lambda _id, _resp: event.set(), ) + assert sub_id + r.set('cfg-race-key', 'val||1') + assert event.wait(timeout=2) -@pytest.fixture(scope='module') -def client(dapr_env): - _redis_set('cfg-key-1', 'val-1') - _redis_set('cfg-key-2', 'val-2') - return dapr_env.start_sidecar(app_id='test-config') +def test_get_single_key(client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1']) + assert 'cfg-key-1' in resp.items + assert resp.items['cfg-key-1'].value == 'val-1' + + +def test_get_multiple_keys(client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1', 'cfg-key-2']) + assert resp.items['cfg-key-1'].value == 'val-1' + assert resp.items['cfg-key-2'].value == 'val-2' + + +def test_get_missing_key_returns_empty_items(client): + resp = client.get_configuration(store_name=STORE, keys=['nonexistent-cfg-key']) + # Dapr omits keys that don't exist from the response. + assert 'nonexistent-cfg-key' not in resp.items + +def test_items_have_version(client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1']) + item = resp.items['cfg-key-1'] + assert item.version -class TestGetConfiguration: - def test_get_single_key(self, client): - resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1']) - assert 'cfg-key-1' in resp.items - assert resp.items['cfg-key-1'].value == 'val-1' - - def test_get_multiple_keys(self, client): - resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1', 'cfg-key-2']) - assert resp.items['cfg-key-1'].value == 'val-1' - assert resp.items['cfg-key-2'].value == 'val-2' - - def test_get_missing_key_returns_empty_items(self, client): - resp = client.get_configuration(store_name=STORE, keys=['nonexistent-cfg-key']) - # Dapr omits keys that don't exist from the response. - assert 'nonexistent-cfg-key' not in resp.items - - def test_items_have_version(self, client): - resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1']) - item = resp.items['cfg-key-1'] - assert item.version - - -class TestSubscribeConfiguration: - def test_subscribe_receives_update(self, client): - received: list[ConfigurationResponse] = [] - event = threading.Event() - - def handler(_id: str, resp: ConfigurationResponse) -> None: - received.append(resp) - event.set() - - sub_id = client.subscribe_configuration( - store_name=STORE, keys=['cfg-sub-key'], handler=handler - ) - assert sub_id - - # Give the subscription watcher thread time to establish its gRPC - # stream before pushing the update, otherwise the notification is missed. - time.sleep(1) - _redis_set('cfg-sub-key', 'updated-val', version=2) - event.wait(timeout=10) - - assert len(received) >= 1 - last = received[-1] - assert 'cfg-sub-key' in last.items - assert last.items['cfg-sub-key'].value == 'updated-val' - - ok = client.unsubscribe_configuration(store_name=STORE, id=sub_id) - assert ok - - def test_unsubscribe_returns_true(self, client): - sub_id = client.subscribe_configuration( - store_name=STORE, - keys=['cfg-unsub-key'], - handler=lambda _id, _resp: None, - ) - ok = client.unsubscribe_configuration(store_name=STORE, id=sub_id) - assert ok + +def test_subscribe_receives_update(client, redis_set_config): + received: list[ConfigurationResponse] = [] + event = threading.Event() + + def handler(_id: str, resp: ConfigurationResponse) -> None: + received.append(resp) + event.set() + + sub_id = client.subscribe_configuration(store_name=STORE, keys=['cfg-sub-key'], handler=handler) + assert sub_id + + # This is necessary because the Dapr runtime returns the subscription ID before the Redis + # configuration component finishes registering the subscription + def _set_and_check() -> bool: + redis_set_config('cfg-sub-key', 'updated-val', version=2) + return event.is_set() + + wait_until(_set_and_check, timeout=10, interval=0.2) + + assert len(received) >= 1 + last = received[-1] + assert 'cfg-sub-key' in last.items + assert last.items['cfg-sub-key'].value == 'updated-val' + + ok = client.unsubscribe_configuration(store_name=STORE, id=sub_id) + assert ok + + +def test_unsubscribe_returns_true(client): + sub_id = client.subscribe_configuration( + store_name=STORE, + keys=['cfg-unsub-key'], + handler=lambda _id, _resp: None, + ) + ok = client.unsubscribe_configuration(store_name=STORE, id=sub_id) + assert ok diff --git a/tests/integration/test_configuration_async.py b/tests/integration/test_configuration_async.py new file mode 100644 index 000000000..1d0fbc7f4 --- /dev/null +++ b/tests/integration/test_configuration_async.py @@ -0,0 +1,27 @@ +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient + +STORE = 'configurationstore' +GRPC_ADDRESS = '127.0.0.1:50001' + + +@pytest.fixture(scope='module') +def sidecar(dapr_env, redis_set_config): + redis_set_config('async-cfg-key-1', 'async-val-1') + dapr_env.start_sidecar(app_id='test-config-async') + + +async def test_get_configuration_single_key(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + resp = await d.get_configuration(store_name=STORE, keys=['async-cfg-key-1']) + + assert 'async-cfg-key-1' in resp.items + assert resp.items['async-cfg-key-1'].value == 'async-val-1' + + +async def test_get_configuration_missing_key_returns_empty_items(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + resp = await d.get_configuration(store_name=STORE, keys=['nonexistent-async-cfg-key']) + + assert 'nonexistent-async-cfg-key' not in resp.items diff --git a/tests/integration/test_conversation.py b/tests/integration/test_conversation.py new file mode 100644 index 000000000..b56f3de99 --- /dev/null +++ b/tests/integration/test_conversation.py @@ -0,0 +1,71 @@ +import pytest + +from dapr.clients.grpc.conversation import ( + ConversationInput, + ConversationInputAlpha2, + create_assistant_message, + create_system_message, + create_user_message, +) + +COMPONENT = 'echo' + + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-conversation') + + +def test_converse_alpha1_echoes_input(client): + response = client.converse_alpha1( + name=COMPONENT, + inputs=[ConversationInput(content='sync hello', role='user')], + ) + assert response.outputs[0].result == 'sync hello' + + +def test_converse_alpha1_with_multiple_inputs(client): + response = client.converse_alpha1( + name=COMPONENT, + inputs=[ + ConversationInput(content='one', role='user'), + ConversationInput(content='two', role='user'), + ], + ) + results = [out.result for out in response.outputs] + # The echo component concatenates all inputs into a single newline-joined output + # rather than echoing each input individually. + assert results == ['one\ntwo'] + + +def test_converse_alpha1_with_temperature(client): + response = client.converse_alpha1( + name=COMPONENT, + inputs=[ConversationInput(content='warm', role='user')], + temperature=0.7, + ) + assert response.outputs[0].result == 'warm' + + +def test_converse_alpha2_echoes_user_message(client): + response = client.converse_alpha2( + name=COMPONENT, + inputs=[ConversationInputAlpha2(messages=[create_user_message('sync world')])], + ) + assert response.outputs[0].choices[0].message.content == 'sync world' + + +def test_converse_alpha2_with_mixed_messages(client): + response = client.converse_alpha2( + name=COMPONENT, + inputs=[ + ConversationInputAlpha2( + messages=[ + create_system_message('be brief'), + create_user_message('hi'), + create_assistant_message('hello'), + ] + ) + ], + ) + assert response.outputs[0].choices[0].message.content diff --git a/tests/integration/test_conversation_async.py b/tests/integration/test_conversation_async.py new file mode 100644 index 000000000..993683c4f --- /dev/null +++ b/tests/integration/test_conversation_async.py @@ -0,0 +1,36 @@ +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient +from dapr.clients.grpc.conversation import ( + ConversationInput, + ConversationInputAlpha2, + create_user_message, +) + +COMPONENT = 'echo' +GRPC_ADDRESS = '127.0.0.1:50001' + + +@pytest.fixture(scope='module') +def sidecar(dapr_env): + dapr_env.start_sidecar(app_id='test-conversation-async') + + +async def test_converse_alpha1_echoes_input(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + response = await d.converse_alpha1( + name=COMPONENT, + inputs=[ConversationInput(content='async hello', role='user')], + ) + + assert response.outputs[0].result == 'async hello' + + +async def test_converse_alpha2_echoes_user_message(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + response = await d.converse_alpha2( + name=COMPONENT, + inputs=[ConversationInputAlpha2(messages=[create_user_message('async world')])], + ) + + assert response.outputs[0].choices[0].message.content == 'async world' diff --git a/tests/integration/test_crypto.py b/tests/integration/test_crypto.py new file mode 100644 index 000000000..ba42b9262 --- /dev/null +++ b/tests/integration/test_crypto.py @@ -0,0 +1,96 @@ +import pytest + +from dapr.clients.grpc._crypto import DecryptOptions, EncryptOptions + +CRYPTO_COMPONENT = 'cryptostore' +RSA_KEY = 'rsa-private-key.pem' +SYMMETRIC_KEY = 'symmetric-key-256' + +# The crypto API re-emits the alpha warnings on every test run. +pytestmark = pytest.mark.filterwarnings('ignore::UserWarning') + + +@pytest.fixture(scope='module') +def client(dapr_env, crypto_keys): + return dapr_env.start_sidecar(app_id='test-crypto') + + +def test_rsa_round_trip(client): + plaintext = b'sync crypto secret' + + encrypted_stream = client.encrypt( + data=plaintext, + options=EncryptOptions( + component_name=CRYPTO_COMPONENT, + key_name=RSA_KEY, + key_wrap_algorithm='RSA', + ), + ) + encrypted = encrypted_stream.read() + assert encrypted != plaintext + + decrypted_stream = client.decrypt( + data=encrypted, + options=DecryptOptions(component_name=CRYPTO_COMPONENT, key_name=RSA_KEY), + ) + assert decrypted_stream.read() == plaintext + + +def test_aes_round_trip(client): + plaintext = b'A' * (32 * 1024) + + encrypted_stream = client.encrypt( + data=plaintext, + options=EncryptOptions( + component_name=CRYPTO_COMPONENT, + key_name=SYMMETRIC_KEY, + key_wrap_algorithm='AES', + ), + ) + encrypted = encrypted_stream.read() + + decrypted_stream = client.decrypt( + data=encrypted, + options=DecryptOptions(component_name=CRYPTO_COMPONENT, key_name=SYMMETRIC_KEY), + ) + assert decrypted_stream.read() == plaintext + + +def test_string_input_round_trip(client): + plaintext = 'hello dapr crypto' + + encrypted_stream = client.encrypt( + data=plaintext, + options=EncryptOptions( + component_name=CRYPTO_COMPONENT, + key_name=RSA_KEY, + key_wrap_algorithm='RSA', + ), + ) + encrypted = encrypted_stream.read() + + decrypted_stream = client.decrypt( + data=encrypted, + options=DecryptOptions(component_name=CRYPTO_COMPONENT, key_name=RSA_KEY), + ) + assert decrypted_stream.read().decode() == plaintext + + +def test_encrypt_with_blank_component_raises(client): + with pytest.raises(ValueError): + client.encrypt( + data=b'payload', + options=EncryptOptions( + component_name='', + key_name=RSA_KEY, + key_wrap_algorithm='RSA', + ), + ) + + +def test_decrypt_with_blank_component_raises(client): + with pytest.raises(ValueError): + client.decrypt( + data=b'payload', + options=DecryptOptions(component_name='', key_name=RSA_KEY), + ) diff --git a/tests/integration/test_crypto_async.py b/tests/integration/test_crypto_async.py new file mode 100644 index 000000000..fe259a39a --- /dev/null +++ b/tests/integration/test_crypto_async.py @@ -0,0 +1,63 @@ +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient +from dapr.clients.grpc._crypto import DecryptOptions, EncryptOptions + +GRPC_ADDRESS = '127.0.0.1:50001' +CRYPTO_COMPONENT = 'cryptostore' +RSA_KEY = 'rsa-private-key.pem' +SYMMETRIC_KEY = 'symmetric-key-256' + +# The crypto API re-emits the alpha warnings on every test run. +pytestmark = pytest.mark.filterwarnings('ignore::UserWarning') + + +@pytest.fixture(scope='module') +def sidecar(dapr_env, crypto_keys): + dapr_env.start_sidecar(app_id='test-crypto-async') + + +async def test_rsa_round_trip(sidecar): + plaintext = b'async crypto secret' + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + encrypted_stream = await d.encrypt( + data=plaintext, + options=EncryptOptions( + component_name=CRYPTO_COMPONENT, + key_name=RSA_KEY, + key_wrap_algorithm='RSA', + ), + ) + encrypted = await encrypted_stream.read() + + decrypted_stream = await d.decrypt( + data=encrypted, + options=DecryptOptions(component_name=CRYPTO_COMPONENT, key_name=RSA_KEY), + ) + decrypted = await decrypted_stream.read() + + assert decrypted == plaintext + + +async def test_aes_round_trip(sidecar): + plaintext = b'A' * (32 * 1024) + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + encrypted_stream = await d.encrypt( + data=plaintext, + options=EncryptOptions( + component_name=CRYPTO_COMPONENT, + key_name=SYMMETRIC_KEY, + key_wrap_algorithm='AES', + ), + ) + encrypted = await encrypted_stream.read() + + decrypted_stream = await d.decrypt( + data=encrypted, + options=DecryptOptions(component_name=CRYPTO_COMPONENT, key_name=SYMMETRIC_KEY), + ) + decrypted = await decrypted_stream.read() + + assert decrypted == plaintext diff --git a/tests/integration/test_distributed_lock.py b/tests/integration/test_distributed_lock.py index 68362c296..44ee73da4 100644 --- a/tests/integration/test_distributed_lock.py +++ b/tests/integration/test_distributed_lock.py @@ -4,7 +4,7 @@ STORE = 'lockstore' -# The distributed lock API emits alpha warnings on every call. +# The distributed lock API re-emits the alpha warnings on every test run. pytestmark = pytest.mark.filterwarnings('ignore::UserWarning') @@ -13,54 +13,56 @@ def client(dapr_env): return dapr_env.start_sidecar(app_id='test-lock') -class TestTryLock: - def test_acquire_lock(self, client): - lock = client.try_lock(STORE, 'res-acquire', 'owner-a', expiry_in_seconds=10) - assert lock.success +def test_try_lock_acquires(client): + lock = client.try_lock(STORE, 'res-acquire', 'owner-a', expiry_in_seconds=10) + assert lock.success - def test_second_owner_is_rejected(self, client): - first = client.try_lock(STORE, 'res-contention', 'owner-a', expiry_in_seconds=10) - second = client.try_lock(STORE, 'res-contention', 'owner-b', expiry_in_seconds=10) - assert first.success - assert not second.success - def test_lock_is_truthy_on_success(self, client): - lock = client.try_lock(STORE, 'res-truthy', 'owner-a', expiry_in_seconds=10) - assert bool(lock) is True +def test_try_lock_second_owner_is_rejected(client): + first = client.try_lock(STORE, 'res-contention', 'owner-a', expiry_in_seconds=10) + second = client.try_lock(STORE, 'res-contention', 'owner-b', expiry_in_seconds=10) + assert first.success + assert not second.success - def test_failed_lock_is_falsy(self, client): - client.try_lock(STORE, 'res-falsy', 'owner-a', expiry_in_seconds=10) - contested = client.try_lock(STORE, 'res-falsy', 'owner-b', expiry_in_seconds=10) - assert bool(contested) is False +def test_try_lock_is_truthy_on_success(client): + lock = client.try_lock(STORE, 'res-truthy', 'owner-a', expiry_in_seconds=10) + assert bool(lock) is True -class TestUnlock: - def test_unlock_own_lock(self, client): - client.try_lock(STORE, 'res-unlock', 'owner-a', expiry_in_seconds=10) - resp = client.unlock(STORE, 'res-unlock', 'owner-a') - assert resp.status == UnlockResponseStatus.success - def test_unlock_wrong_owner(self, client): - client.try_lock(STORE, 'res-wrong-owner', 'owner-a', expiry_in_seconds=10) - resp = client.unlock(STORE, 'res-wrong-owner', 'owner-b') - assert resp.status == UnlockResponseStatus.lock_belongs_to_others +def test_try_lock_failed_lock_is_falsy(client): + client.try_lock(STORE, 'res-falsy', 'owner-a', expiry_in_seconds=10) + contested = client.try_lock(STORE, 'res-falsy', 'owner-b', expiry_in_seconds=10) + assert bool(contested) is False - def test_unlock_nonexistent(self, client): - resp = client.unlock(STORE, 'res-does-not-exist', 'owner-a') - assert resp.status == UnlockResponseStatus.lock_does_not_exist - def test_unlock_frees_resource_for_others(self, client): - client.try_lock(STORE, 'res-release', 'owner-a', expiry_in_seconds=10) - client.unlock(STORE, 'res-release', 'owner-a') - second = client.try_lock(STORE, 'res-release', 'owner-b', expiry_in_seconds=10) - assert second.success +def test_unlock_own_lock(client): + client.try_lock(STORE, 'res-unlock', 'owner-a', expiry_in_seconds=10) + resp = client.unlock(STORE, 'res-unlock', 'owner-a') + assert resp.status == UnlockResponseStatus.success -class TestLockContextManager: - def test_context_manager_auto_unlocks(self, client): - with client.try_lock(STORE, 'res-ctx', 'owner-a', expiry_in_seconds=10) as lock: - assert lock +def test_unlock_wrong_owner(client): + client.try_lock(STORE, 'res-wrong-owner', 'owner-a', expiry_in_seconds=10) + resp = client.unlock(STORE, 'res-wrong-owner', 'owner-b') + assert resp.status == UnlockResponseStatus.lock_belongs_to_others - # After the context manager exits, another owner should be able to acquire. - second = client.try_lock(STORE, 'res-ctx', 'owner-b', expiry_in_seconds=10) - assert second.success + +def test_unlock_nonexistent(client): + resp = client.unlock(STORE, 'res-does-not-exist', 'owner-a') + assert resp.status == UnlockResponseStatus.lock_does_not_exist + + +def test_unlock_frees_resource_for_others(client): + client.try_lock(STORE, 'res-release', 'owner-a', expiry_in_seconds=10) + client.unlock(STORE, 'res-release', 'owner-a') + second = client.try_lock(STORE, 'res-release', 'owner-b', expiry_in_seconds=10) + assert second.success + + +def test_context_manager_auto_unlocks(client): + with client.try_lock(STORE, 'res-ctx', 'owner-a', expiry_in_seconds=10) as lock: + assert lock + + second = client.try_lock(STORE, 'res-ctx', 'owner-b', expiry_in_seconds=10) + assert second.success diff --git a/tests/integration/test_distributed_lock_async.py b/tests/integration/test_distributed_lock_async.py new file mode 100644 index 000000000..ed4cf1370 --- /dev/null +++ b/tests/integration/test_distributed_lock_async.py @@ -0,0 +1,40 @@ +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient +from dapr.clients.grpc._response import UnlockResponseStatus + +STORE = 'lockstore' +GRPC_ADDRESS = '127.0.0.1:50001' + +# The distributed lock API re-emits the alpha warnings on every test run. +pytestmark = pytest.mark.filterwarnings('ignore::UserWarning') + + +@pytest.fixture(scope='module') +def sidecar(dapr_env): + dapr_env.start_sidecar(app_id='test-lock-async') + + +async def test_acquire_and_release_lock(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + lock = await d.try_lock(STORE, 'res-async-acquire', 'owner-a', expiry_in_seconds=10) + assert lock.success + + resp = await d.unlock(STORE, 'res-async-acquire', 'owner-a') + assert resp.status == UnlockResponseStatus.success + + +async def test_second_owner_is_rejected(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + first = await d.try_lock(STORE, 'res-async-contention', 'owner-a', expiry_in_seconds=10) + second = await d.try_lock(STORE, 'res-async-contention', 'owner-b', expiry_in_seconds=10) + + assert first.success + assert not second.success + + +async def test_unlock_nonexistent_returns_not_found(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + resp = await d.unlock(STORE, 'res-async-missing', 'owner-a') + + assert resp.status == UnlockResponseStatus.lock_does_not_exist diff --git a/tests/integration/test_invoke_async.py b/tests/integration/test_invoke_async.py new file mode 100644 index 000000000..ca53662f5 --- /dev/null +++ b/tests/integration/test_invoke_async.py @@ -0,0 +1,39 @@ +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient + +GRPC_ADDRESS = '127.0.0.1:50001' + + +@pytest.fixture(scope='module') +def sidecar(dapr_env, apps_dir): + dapr_env.start_sidecar( + app_id='invoke-receiver-async', + app_port=50051, + app_cmd=f'python3 {apps_dir / "invoke_receiver.py"}', + ) + + +async def test_invoke_method_returns_expected_response(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + resp = await d.invoke_method( + app_id='invoke-receiver-async', + method_name='my-method', + data=b'{"id": 1, "message": "async hello"}', + content_type='application/json', + ) + + assert resp.content_type.startswith('text/plain') + assert resp.data == b'INVOKE_RECEIVED' + + +async def test_invoke_method_with_text_data(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + resp = await d.invoke_method( + app_id='invoke-receiver-async', + method_name='my-method', + data=b'plain text', + content_type='text/plain', + ) + + assert resp.data == b'INVOKE_RECEIVED' diff --git a/tests/integration/test_invoke_binding.py b/tests/integration/test_invoke_binding.py new file mode 100644 index 000000000..44158e407 --- /dev/null +++ b/tests/integration/test_invoke_binding.py @@ -0,0 +1,92 @@ +import uuid +from pathlib import Path + +import pytest + +BINDING = 'localbinding' +BINDING_ROOT = Path(__file__).resolve().parent / '.binding-data' + + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-invoke-binding') + + +def test_create_writes_file_to_disk(client): + file_name = f'binding-{uuid.uuid4().hex[:8]}.txt' + payload = b'hello from sync invoke_binding' + + client.invoke_binding( + binding_name=BINDING, + operation='create', + data=payload, + binding_metadata={'fileName': file_name}, + ) + + assert (BINDING_ROOT / file_name).read_bytes() == payload + + +def test_create_then_get_round_trip(client): + file_name = f'binding-{uuid.uuid4().hex[:8]}.txt' + payload = b'sync round-trip payload' + + client.invoke_binding( + binding_name=BINDING, + operation='create', + data=payload, + binding_metadata={'fileName': file_name}, + ) + response = client.invoke_binding( + binding_name=BINDING, + operation='get', + binding_metadata={'fileName': file_name}, + ) + + assert response.data == payload + + +def test_create_with_string_payload(client): + file_name = f'binding-{uuid.uuid4().hex[:8]}.txt' + payload = 'sync string payload' + + client.invoke_binding( + binding_name=BINDING, + operation='create', + data=payload, + binding_metadata={'fileName': file_name}, + ) + + assert (BINDING_ROOT / file_name).read_text() == payload + + +def test_delete_removes_file(client): + file_name = f'binding-{uuid.uuid4().hex[:8]}.txt' + file_path = BINDING_ROOT / file_name + + client.invoke_binding( + binding_name=BINDING, + operation='create', + data=b'to be deleted', + binding_metadata={'fileName': file_name}, + ) + assert file_path.exists() + + client.invoke_binding( + binding_name=BINDING, + operation='delete', + binding_metadata={'fileName': file_name}, + ) + assert not file_path.exists() + + +def test_list_includes_created_file(client): + file_name = f'binding-{uuid.uuid4().hex[:8]}.txt' + client.invoke_binding( + binding_name=BINDING, + operation='create', + data=b'listed', + binding_metadata={'fileName': file_name}, + ) + + response = client.invoke_binding(binding_name=BINDING, operation='list') + assert file_name in response.data.decode() diff --git a/tests/integration/test_invoke_binding_async.py b/tests/integration/test_invoke_binding_async.py new file mode 100644 index 000000000..9dd001132 --- /dev/null +++ b/tests/integration/test_invoke_binding_async.py @@ -0,0 +1,50 @@ +import uuid +from pathlib import Path + +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient + +BINDING = 'localbinding' +BINDING_ROOT = Path(__file__).resolve().parent / '.binding-data' +GRPC_ADDRESS = '127.0.0.1:50001' + + +@pytest.fixture(scope='module') +def sidecar(dapr_env): + dapr_env.start_sidecar(app_id='test-invoke-binding-async') + + +async def test_create_writes_file_to_disk(sidecar): + file_name = f'binding-{uuid.uuid4().hex[:8]}.txt' + payload = b'hello from async invoke_binding' + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.invoke_binding( + binding_name=BINDING, + operation='create', + data=payload, + binding_metadata={'fileName': file_name}, + ) + + assert (BINDING_ROOT / file_name).read_bytes() == payload + + +async def test_create_then_get_round_trip(sidecar): + file_name = f'binding-{uuid.uuid4().hex[:8]}.txt' + payload = b'async round-trip payload' + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.invoke_binding( + binding_name=BINDING, + operation='create', + data=payload, + binding_metadata={'fileName': file_name}, + ) + response = await d.invoke_binding( + binding_name=BINDING, + operation='get', + binding_metadata={'fileName': file_name}, + ) + + assert response.data == payload diff --git a/tests/integration/test_jobs.py b/tests/integration/test_jobs.py new file mode 100644 index 000000000..b335498fb --- /dev/null +++ b/tests/integration/test_jobs.py @@ -0,0 +1,84 @@ +import uuid +from datetime import datetime, timedelta, timezone + +import pytest + +from dapr.clients import Job +from dapr.clients.exceptions import DaprGrpcError + +# The jobs API re-emits the alpha warnings on every test run. +pytestmark = pytest.mark.filterwarnings('ignore::UserWarning') + + +def _future(days: int) -> str: + return (datetime.now(timezone.utc) + timedelta(days=days)).strftime('%Y-%m-%dT%H:%M:%SZ') + + +def _unique_name(prefix: str) -> str: + return f'{prefix}-{uuid.uuid4().hex[:8]}' + + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-jobs') + + +def test_schedule_then_get_returns_job(client): + name = _unique_name('sync-job') + due = _future(days=365) + + client.schedule_job_alpha1(Job(name=name, due_time=due)) + try: + retrieved = client.get_job_alpha1(name=name) + assert retrieved.name == name + assert retrieved.due_time == due + finally: + client.delete_job_alpha1(name=name) + + +def test_delete_removes_job(client): + name = _unique_name('sync-job-del') + due = _future(days=365) + + client.schedule_job_alpha1(Job(name=name, due_time=due)) + client.delete_job_alpha1(name=name) + + with pytest.raises(DaprGrpcError): + client.get_job_alpha1(name=name) + + +def test_schedule_with_recurring_schedule(client): + name = _unique_name('sync-job-recurring') + schedule = '@every 1h' + + client.schedule_job_alpha1(Job(name=name, schedule=schedule, repeats=10)) + try: + retrieved = client.get_job_alpha1(name=name) + assert retrieved.schedule == schedule + assert retrieved.repeats == 10 + finally: + client.delete_job_alpha1(name=name) + + +def test_schedule_without_schedule_or_due_time_raises(client): + with pytest.raises(ValueError): + client.schedule_job_alpha1(Job(name=_unique_name('sync-job-bad'))) + + +def test_schedule_with_blank_name_raises(client): + with pytest.raises(ValueError): + client.schedule_job_alpha1(Job(name='', due_time=_future(days=1))) + + +def test_overwrite_replaces_existing_job(client): + name = _unique_name('sync-job-overwrite') + initial_due = _future(days=30) + updated_due = _future(days=60) + + client.schedule_job_alpha1(Job(name=name, due_time=initial_due)) + try: + client.schedule_job_alpha1(Job(name=name, due_time=updated_due), overwrite=True) + retrieved = client.get_job_alpha1(name=name) + assert retrieved.due_time == updated_due + finally: + client.delete_job_alpha1(name=name) diff --git a/tests/integration/test_jobs_async.py b/tests/integration/test_jobs_async.py new file mode 100644 index 000000000..71ff91db8 --- /dev/null +++ b/tests/integration/test_jobs_async.py @@ -0,0 +1,48 @@ +import uuid +from datetime import datetime, timedelta, timezone + +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient +from dapr.clients import Job +from dapr.clients.exceptions import DaprGrpcError + +GRPC_ADDRESS = '127.0.0.1:50001' + +# The jobs API re-emits the alpha warnings on every test run. +pytestmark = pytest.mark.filterwarnings('ignore::UserWarning') + + +def _future(days: int) -> str: + return (datetime.now(timezone.utc) + timedelta(days=days)).strftime('%Y-%m-%dT%H:%M:%SZ') + + +@pytest.fixture(scope='module') +def sidecar(dapr_env): + dapr_env.start_sidecar(app_id='test-jobs-async') + + +async def test_schedule_then_get_returns_job(sidecar): + name = f'async-job-{uuid.uuid4().hex[:8]}' + due = _future(days=365) + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.schedule_job_alpha1(Job(name=name, due_time=due)) + try: + retrieved = await d.get_job_alpha1(name=name) + assert retrieved.name == name + assert retrieved.due_time == due + finally: + await d.delete_job_alpha1(name=name) + + +async def test_delete_removes_job(sidecar): + name = f'async-job-del-{uuid.uuid4().hex[:8]}' + due = _future(days=365) + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.schedule_job_alpha1(Job(name=name, due_time=due)) + await d.delete_job_alpha1(name=name) + + with pytest.raises(DaprGrpcError): + await d.get_job_alpha1(name=name) diff --git a/tests/integration/test_metadata.py b/tests/integration/test_metadata.py index 88430ebbb..19126d6cf 100644 --- a/tests/integration/test_metadata.py +++ b/tests/integration/test_metadata.py @@ -6,37 +6,39 @@ def client(dapr_env): return dapr_env.start_sidecar(app_id='test-metadata') -class TestGetMetadata: - def test_application_id_matches(self, client): - meta = client.get_metadata() - assert meta.application_id == 'test-metadata' - - def test_registered_components_present(self, client): - meta = client.get_metadata() - component_types = {c.type for c in meta.registered_components} - assert any(t.startswith('state.') for t in component_types) - - def test_registered_components_have_names(self, client): - meta = client.get_metadata() - for comp in meta.registered_components: - assert comp.name - assert comp.type - - -class TestSetMetadata: - def test_set_and_get_roundtrip(self, client): - client.set_metadata('test-key', 'test-value') - meta = client.get_metadata() - assert meta.extended_metadata.get('test-key') == 'test-value' - - def test_overwrite_existing_key(self, client): - client.set_metadata('overwrite-key', 'first') - client.set_metadata('overwrite-key', 'second') - meta = client.get_metadata() - assert meta.extended_metadata['overwrite-key'] == 'second' - - def test_empty_value_is_allowed(self, client): - client.set_metadata('empty-key', '') - meta = client.get_metadata() - assert 'empty-key' in meta.extended_metadata - assert meta.extended_metadata['empty-key'] == '' +def test_application_id_matches(client): + meta = client.get_metadata() + assert meta.application_id == 'test-metadata' + + +def test_registered_components_present(client): + meta = client.get_metadata() + component_types = {c.type for c in meta.registered_components} + assert any(t.startswith('state.') for t in component_types) + + +def test_registered_components_have_names(client): + meta = client.get_metadata() + for comp in meta.registered_components: + assert comp.name + assert comp.type + + +def test_set_and_get_roundtrip(client): + client.set_metadata('test-key', 'test-value') + meta = client.get_metadata() + assert meta.extended_metadata.get('test-key') == 'test-value' + + +def test_overwrite_existing_key(client): + client.set_metadata('overwrite-key', 'first') + client.set_metadata('overwrite-key', 'second') + meta = client.get_metadata() + assert meta.extended_metadata['overwrite-key'] == 'second' + + +def test_empty_value_is_allowed(client): + client.set_metadata('empty-key', '') + meta = client.get_metadata() + assert 'empty-key' in meta.extended_metadata + assert meta.extended_metadata['empty-key'] == '' diff --git a/tests/integration/test_metadata_async.py b/tests/integration/test_metadata_async.py new file mode 100644 index 000000000..739496c9b --- /dev/null +++ b/tests/integration/test_metadata_async.py @@ -0,0 +1,25 @@ +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient + +GRPC_ADDRESS = '127.0.0.1:50001' + + +@pytest.fixture(scope='module') +def sidecar(dapr_env): + dapr_env.start_sidecar(app_id='test-metadata-async') + + +async def test_get_metadata_application_id_matches(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + meta = await d.get_metadata() + + assert meta.application_id == 'test-metadata-async' + + +async def test_set_and_get_metadata_round_trip(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.set_metadata('async-key', 'async-value') + meta = await d.get_metadata() + + assert meta.extended_metadata.get('async-key') == 'async-value' diff --git a/tests/integration/test_pubsub.py b/tests/integration/test_pubsub.py index c25a419d8..c5ced6636 100644 --- a/tests/integration/test_pubsub.py +++ b/tests/integration/test_pubsub.py @@ -1,34 +1,22 @@ import json -import subprocess +import threading import uuid +from concurrent.futures import Future import pytest +from dapr.clients.grpc._response import TopicEventResponse +from tests.wait_utils import wait_until + STORE = 'statestore' PUBSUB = 'pubsub' TOPIC = 'TOPIC_A' -REDIS_CONTAINER = 'dapr_redis' - - -def _flush_redis() -> None: - """Flush the Dapr Redis instance to prevent state leaking between runs. - - Both the state store and the pubsub component point at the same - ``dapr_redis`` container (see ``tests/integration/resources/``), so a - previous run's ``received-*`` keys could otherwise satisfy this test's - assertions even if no new message was delivered. - """ - subprocess.run( - args=('docker', 'exec', REDIS_CONTAINER, 'redis-cli', 'FLUSHDB'), - check=True, - capture_output=True, - timeout=10, - ) +TOPIC_STREAM = 'TOPIC_STREAM' +TOPIC_HANDLER = 'TOPIC_HANDLER' @pytest.fixture(scope='module') -def client(dapr_env, apps_dir): - _flush_redis() +def client(dapr_env, apps_dir, flush_redis): return dapr_env.start_sidecar( app_id='test-subscriber', grpc_port=50001, @@ -37,7 +25,7 @@ def client(dapr_env, apps_dir): ) -def test_published_messages_are_received_by_subscriber(client, wait_until): +def test_published_messages_are_received_by_subscriber(client): run_id = uuid.uuid4().hex for n in range(1, 4): client.publish_event( @@ -58,7 +46,7 @@ def test_published_messages_are_received_by_subscriber(client, wait_until): assert msg['message'] == 'hello world' -def test_publish_event_succeeds(client, wait_until): +def test_publish_event_succeeds(client): run_id = uuid.uuid4().hex client.publish_event( pubsub_name=PUBSUB, @@ -75,3 +63,91 @@ def test_publish_event_succeeds(client, wait_until): msg = json.loads(data) assert msg['id'] == 99 assert msg['message'] == 'smoke test' + + +def test_bulk_publish_delivers_all_messages(client): + run_id = uuid.uuid4().hex + payloads = [ + json.dumps({'run_id': run_id, 'id': n, 'message': f'bulk-{n}'}) for n in range(1, 4) + ] + + response = client.publish_events( + pubsub_name=PUBSUB, + topic_name=TOPIC, + data=payloads, + data_content_type='application/json', + ) + assert response.failed_entries == [] + + for n in range(1, 4): + key = f'received-{run_id}-{n}' + data = wait_until( + lambda k=key: client.get_state(store_name=STORE, key=k).data or None, + timeout=10, + ) + msg = json.loads(data) + assert msg['id'] == n + assert msg['message'] == f'bulk-{n}' + + +def test_streaming_subscribe_receives_published_message(client): + subscription = client.subscribe(pubsub_name=PUBSUB, topic=TOPIC_STREAM) + try: + run_id = uuid.uuid4().hex + client.publish_event( + pubsub_name=PUBSUB, + topic_name=TOPIC_STREAM, + data=json.dumps({'run_id': run_id, 'message': 'streaming hello'}), + data_content_type='application/json', + ) + + next_message_future: Future = Future() + + def read_next_message() -> None: + try: + next_message_future.set_result(subscription.next_message()) + except Exception as exc: + next_message_future.set_exception(exc) + + threading.Thread(target=read_next_message, daemon=True).start() + + message = next_message_future.result(timeout=10) + subscription.respond_success(message) + + payload = message.data() + assert payload['run_id'] == run_id + assert payload['message'] == 'streaming hello' + finally: + subscription.close() + + +def test_subscribe_with_handler_invokes_callback(client): + received: list[dict] = [] + handler_done = threading.Event() + + def handler(message) -> TopicEventResponse: + received.append(message.data()) + if len(received) >= 2: + handler_done.set() + return TopicEventResponse('success') + + close_fn = client.subscribe_with_handler( + pubsub_name=PUBSUB, + topic=TOPIC_HANDLER, + handler_fn=handler, + ) + try: + run_id = uuid.uuid4().hex + for n in range(1, 3): + client.publish_event( + pubsub_name=PUBSUB, + topic_name=TOPIC_HANDLER, + data=json.dumps({'run_id': run_id, 'id': n}), + data_content_type='application/json', + ) + + assert handler_done.wait(timeout=10), 'handler was not invoked' + ids = sorted(msg['id'] for msg in received if msg['run_id'] == run_id) + assert ids == [1, 2] + finally: + close_fn() diff --git a/tests/integration/test_pubsub_async.py b/tests/integration/test_pubsub_async.py new file mode 100644 index 000000000..87e852a6d --- /dev/null +++ b/tests/integration/test_pubsub_async.py @@ -0,0 +1,69 @@ +import json +import uuid + +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient +from tests.wait_utils import wait_until_async + +STORE = 'statestore' +PUBSUB = 'pubsub' +TOPIC = 'TOPIC_A' +GRPC_ADDRESS = '127.0.0.1:50001' + + +@pytest.fixture(scope='module') +def sidecar(dapr_env, apps_dir, flush_redis): + dapr_env.start_sidecar( + app_id='test-subscriber-async', + app_port=50051, + app_cmd=f'python3 {apps_dir / "pubsub_subscriber.py"}', + ) + + +async def test_publish_event_delivers_to_subscriber(sidecar): + run_id = uuid.uuid4().hex + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.publish_event( + pubsub_name=PUBSUB, + topic_name=TOPIC, + data=json.dumps({'run_id': run_id, 'id': 1, 'message': 'async hello'}), + data_content_type='application/json', + ) + + async def _fetch() -> bytes | None: + resp = await d.get_state(store_name=STORE, key=f'received-{run_id}-1') + return resp.data or None + + data = await wait_until_async(_fetch, timeout=10) + + msg = json.loads(data) + assert msg['message'] == 'async hello' + + +async def test_publish_events_bulk_delivery(sidecar): + run_id = uuid.uuid4().hex + payloads = [ + json.dumps({'run_id': run_id, 'id': n, 'message': f'bulk-async-{n}'}) for n in range(1, 3) + ] + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + response = await d.publish_events( + pubsub_name=PUBSUB, + topic_name=TOPIC, + data=payloads, + data_content_type='application/json', + ) + assert response.failed_entries == [] + + for n in range(1, 3): + key = f'received-{run_id}-{n}' + + async def _fetch(k: str = key) -> bytes | None: + resp = await d.get_state(store_name=STORE, key=k) + return resp.data or None + + data = await wait_until_async(_fetch, timeout=10) + msg = json.loads(data) + assert msg['message'] == f'bulk-async-{n}' diff --git a/tests/integration/test_secret_store_async.py b/tests/integration/test_secret_store_async.py new file mode 100644 index 000000000..07959c8d3 --- /dev/null +++ b/tests/integration/test_secret_store_async.py @@ -0,0 +1,26 @@ +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient + +STORE = 'localsecretstore' +GRPC_ADDRESS = '127.0.0.1:50001' + + +@pytest.fixture(scope='module') +def sidecar(dapr_env): + dapr_env.start_sidecar(app_id='test-secrets-async') + + +async def test_get_secret_returns_expected_value(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + resp = await d.get_secret(store_name=STORE, key='secretKey') + + assert resp.secret == {'secretKey': 'secretValue'} + + +async def test_get_bulk_secret_returns_all(sidecar): + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + resp = await d.get_bulk_secret(store_name=STORE) + + assert 'secretKey' in resp.secrets + assert resp.secrets['secretKey'] == {'secretKey': 'secretValue'} diff --git a/tests/integration/test_state_store.py b/tests/integration/test_state_store.py index 26ef51cad..bc094e19d 100644 --- a/tests/integration/test_state_store.py +++ b/tests/integration/test_state_store.py @@ -12,91 +12,91 @@ def client(dapr_env): return dapr_env.start_sidecar(app_id='test-state') -class TestSaveAndGetState: - def test_save_and_get(self, client): - client.save_state(store_name=STORE, key='k1', value='v1') - state = client.get_state(store_name=STORE, key='k1') - assert state.data == b'v1' - assert state.etag - - def test_save_with_wrong_etag_fails(self, client): - client.save_state(store_name=STORE, key='etag-test', value='original') - with pytest.raises(grpc.RpcError) as exc_info: - client.save_state(store_name=STORE, key='etag-test', value='bad', etag='9999') - assert exc_info.value.code() == grpc.StatusCode.ABORTED - - def test_get_missing_key_returns_empty(self, client): - state = client.get_state(store_name=STORE, key='nonexistent-key') - assert state.data == b'' - - -class TestBulkState: - def test_save_and_get_bulk(self, client): +def test_save_and_get(client): + client.save_state(store_name=STORE, key='k1', value='v1') + state = client.get_state(store_name=STORE, key='k1') + assert state.data == b'v1' + assert state.etag + + +def test_save_with_wrong_etag_fails(client): + client.save_state(store_name=STORE, key='etag-test', value='original') + with pytest.raises(grpc.RpcError) as exc_info: + client.save_state(store_name=STORE, key='etag-test', value='bad', etag='9999') + assert exc_info.value.code() == grpc.StatusCode.ABORTED + + +def test_get_missing_key_returns_empty(client): + state = client.get_state(store_name=STORE, key='nonexistent-key') + assert state.data == b'' + + +def test_save_and_get_bulk(client): + client.save_bulk_state( + store_name=STORE, + states=[ + StateItem(key='bulk-1', value='v1'), + StateItem(key='bulk-2', value='v2'), + ], + ) + items = client.get_bulk_state(store_name=STORE, keys=['bulk-1', 'bulk-2']).items + by_key = {i.key: i.data for i in items} + assert by_key['bulk-1'] == b'v1' + assert by_key['bulk-2'] == b'v2' + + +def test_save_bulk_with_wrong_etag_fails(client): + client.save_state(store_name=STORE, key='bulk-etag-1', value='original') + with pytest.raises(grpc.RpcError) as exc_info: client.save_bulk_state( store_name=STORE, - states=[ - StateItem(key='bulk-1', value='v1'), - StateItem(key='bulk-2', value='v2'), - ], + states=[StateItem(key='bulk-etag-1', value='updated', etag='9999')], ) - items = client.get_bulk_state(store_name=STORE, keys=['bulk-1', 'bulk-2']).items - by_key = {i.key: i.data for i in items} - assert by_key['bulk-1'] == b'v1' - assert by_key['bulk-2'] == b'v2' - - def test_save_bulk_with_wrong_etag_fails(self, client): - client.save_state(store_name=STORE, key='bulk-etag-1', value='original') - with pytest.raises(grpc.RpcError) as exc_info: - client.save_bulk_state( - store_name=STORE, - states=[StateItem(key='bulk-etag-1', value='updated', etag='9999')], - ) - assert exc_info.value.code() == grpc.StatusCode.ABORTED - - -class TestStateTransactions: - def test_transaction_upsert(self, client): - client.save_state(store_name=STORE, key='tx-1', value='original') - etag = client.get_state(store_name=STORE, key='tx-1').etag - - client.execute_state_transaction( - store_name=STORE, - operations=[ - TransactionalStateOperation( - operation_type=TransactionOperationType.upsert, - key='tx-1', - data='updated', - etag=etag, - ), - TransactionalStateOperation(key='tx-2', data='new'), - ], - ) - - assert client.get_state(store_name=STORE, key='tx-1').data == b'updated' - assert client.get_state(store_name=STORE, key='tx-2').data == b'new' - - def test_transaction_delete(self, client): - client.save_state(store_name=STORE, key='tx-del-1', value='v1') - client.save_state(store_name=STORE, key='tx-del-2', value='v2') - - client.execute_state_transaction( - store_name=STORE, - operations=[ - TransactionalStateOperation( - operation_type=TransactionOperationType.delete, key='tx-del-1' - ), - TransactionalStateOperation( - operation_type=TransactionOperationType.delete, key='tx-del-2' - ), - ], - ) - - assert client.get_state(store_name=STORE, key='tx-del-1').data == b'' - assert client.get_state(store_name=STORE, key='tx-del-2').data == b'' - - -class TestDeleteState: - def test_delete_single(self, client): - client.save_state(store_name=STORE, key='del-1', value='v1') - client.delete_state(store_name=STORE, key='del-1') - assert client.get_state(store_name=STORE, key='del-1').data == b'' + assert exc_info.value.code() == grpc.StatusCode.ABORTED + + +def test_transaction_upsert(client): + client.save_state(store_name=STORE, key='tx-1', value='original') + etag = client.get_state(store_name=STORE, key='tx-1').etag + + client.execute_state_transaction( + store_name=STORE, + operations=[ + TransactionalStateOperation( + operation_type=TransactionOperationType.upsert, + key='tx-1', + data='updated', + etag=etag, + ), + TransactionalStateOperation(key='tx-2', data='new'), + ], + ) + + assert client.get_state(store_name=STORE, key='tx-1').data == b'updated' + assert client.get_state(store_name=STORE, key='tx-2').data == b'new' + + +def test_transaction_delete(client): + client.save_state(store_name=STORE, key='tx-del-1', value='v1') + client.save_state(store_name=STORE, key='tx-del-2', value='v2') + + client.execute_state_transaction( + store_name=STORE, + operations=[ + TransactionalStateOperation( + operation_type=TransactionOperationType.delete, key='tx-del-1' + ), + TransactionalStateOperation( + operation_type=TransactionOperationType.delete, key='tx-del-2' + ), + ], + ) + + assert client.get_state(store_name=STORE, key='tx-del-1').data == b'' + assert client.get_state(store_name=STORE, key='tx-del-2').data == b'' + + +def test_delete_single(client): + client.save_state(store_name=STORE, key='del-1', value='v1') + client.delete_state(store_name=STORE, key='del-1') + assert client.get_state(store_name=STORE, key='del-1').data == b'' diff --git a/tests/integration/test_state_store_async.py b/tests/integration/test_state_store_async.py new file mode 100644 index 000000000..0795ced1f --- /dev/null +++ b/tests/integration/test_state_store_async.py @@ -0,0 +1,49 @@ +import uuid + +import pytest + +from dapr.aio.clients import DaprClient as AsyncDaprClient +from dapr.clients.grpc._request import TransactionalStateOperation + +STORE = 'statestore' +GRPC_ADDRESS = '127.0.0.1:50001' + + +@pytest.fixture(scope='module') +def sidecar(dapr_env): + dapr_env.start_sidecar(app_id='test-state-async') + + +async def test_save_and_get_round_trip(sidecar): + key = f'async-key-{uuid.uuid4().hex[:8]}' + value = b'async-value' + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.save_state(store_name=STORE, key=key, value=value) + resp = await d.get_state(store_name=STORE, key=key) + + assert resp.data == value + + +async def test_delete_state_removes_key(sidecar): + key = f'async-del-{uuid.uuid4().hex[:8]}' + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.save_state(store_name=STORE, key=key, value=b'bye') + await d.delete_state(store_name=STORE, key=key) + resp = await d.get_state(store_name=STORE, key=key) + + assert resp.data == b'' + + +async def test_transaction_upsert_then_get(sidecar): + key = f'async-txn-{uuid.uuid4().hex[:8]}' + + async with AsyncDaprClient(address=GRPC_ADDRESS) as d: + await d.execute_state_transaction( + store_name=STORE, + operations=[TransactionalStateOperation(key=key, data=b'txn-value')], + ) + resp = await d.get_state(store_name=STORE, key=key) + + assert resp.data == b'txn-value' diff --git a/tests/_process_utils.py b/tests/process_utils.py similarity index 70% rename from tests/_process_utils.py rename to tests/process_utils.py index 0fb6fd483..7aa693989 100644 --- a/tests/_process_utils.py +++ b/tests/process_utils.py @@ -1,9 +1,8 @@ -"""Cross-platform helpers for managing subprocess trees in tests. - -``dapr run`` spawns ``daprd`` and the user's app as siblings; signaling only -the immediate process can orphan them if the signal isn't forwarded, which -leaves stale listeners on the test ports across runs. Putting the whole -subtree in its own group lets cleanup take them all down together. +""" +``dapr run`` spawns ``daprd`` and the user's app as siblings, not as children. +Terminating only the immediate process can orphan them if the signal isn't forwarded, +leaving stale listeners on the test ports across runs. +Putting all the processes in the same group lets cleanup take them all down together. """ from __future__ import annotations diff --git a/tests/wait_utils.py b/tests/wait_utils.py new file mode 100644 index 000000000..0996561ac --- /dev/null +++ b/tests/wait_utils.py @@ -0,0 +1,40 @@ +import asyncio +import time +from typing import Awaitable, Callable, TypeVar + +T = TypeVar('T') + + +def wait_until( + condition: Callable[[], T | None], + timeout: float = 10.0, + interval: float = 0.1, +) -> T: + """Poll ``condition`` until it returns a truthy value. + + Raises ``TimeoutError`` if the deadline elapses first. + """ + deadline = time.monotonic() + timeout + while True: + result = condition() + if result: + return result + if time.monotonic() >= deadline: + raise TimeoutError(f'wait_until timed out after {timeout}s') + time.sleep(interval) + + +async def wait_until_async( + condition: Callable[[], Awaitable[T | None]], + timeout: float = 10.0, + interval: float = 0.1, +) -> T: + """Async counterpart to `wait_until`: polls an awaitable condition.""" + deadline = time.monotonic() + timeout + while True: + result = await condition() + if result: + return result + if time.monotonic() >= deadline: + raise TimeoutError(f'wait_until_async timed out after {timeout}s') + await asyncio.sleep(interval)