From 1edf8b2b3bb8d9a010db6f53f2679553560d3f6e Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 22 Apr 2026 15:11:18 -0700 Subject: [PATCH 1/2] [feature]: add nesting_type to concurrency operations --- examples/src/map/map_operations_flat.py | 23 +++++++++++ examples/src/parallel/parallel_flat.py | 27 +++++++++++++ examples/test/map/test_map_operations_flat.py | 39 +++++++++++++++++++ examples/test/parallel/test_parallel_flat.py | 38 ++++++++++++++++++ 4 files changed, 127 insertions(+) create mode 100644 examples/src/map/map_operations_flat.py create mode 100644 examples/src/parallel/parallel_flat.py create mode 100644 examples/test/map/test_map_operations_flat.py create mode 100644 examples/test/parallel/test_parallel_flat.py diff --git a/examples/src/map/map_operations_flat.py b/examples/src/map/map_operations_flat.py new file mode 100644 index 00000000..272afb58 --- /dev/null +++ b/examples/src/map/map_operations_flat.py @@ -0,0 +1,23 @@ +"""Example demonstrating map operations for processing collections durably.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import MapConfig, NestingType +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> list[int]: + """Process a list of items using context.map().""" + items = [1, 2, 3, 4, 5] + + # Use context.map() to process items concurrently and extract results immediately + return context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: item * 2, name=f"map_item_{index}" + ), + name="map_operation", + config=MapConfig(max_concurrency=2, nesting_type=NestingType.FLAT), + ).get_results() diff --git a/examples/src/parallel/parallel_flat.py b/examples/src/parallel/parallel_flat.py new file mode 100644 index 00000000..5ae26ebc --- /dev/null +++ b/examples/src/parallel/parallel_flat.py @@ -0,0 +1,27 @@ +"""Example demonstrating parallel operations for concurrent execution.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import ParallelConfig, NestingType +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> list[str]: + """Execute multiple operations in parallel using context.parallel().""" + + # Use context.parallel() to execute functions concurrently and extract results immediately + return context.parallel( + functions=[ + lambda ctx: ctx.step(lambda _: "task 1 completed", name="task1"), + lambda ctx: ctx.step(lambda _: "task 2 completed", name="task2"), + lambda ctx: ( + ctx.wait(Duration.from_seconds(1), name="wait_in_task3"), + "task 3 completed after wait", + )[1], + ], + name="parallel_operation", + config=ParallelConfig(max_concurrency=2, nesting_type=NestingType.FLAT), + ).get_results() diff --git a/examples/test/map/test_map_operations_flat.py b/examples/test/map/test_map_operations_flat.py new file mode 100644 index 00000000..0f10ca4d --- /dev/null +++ b/examples/test/map/test_map_operations_flat.py @@ -0,0 +1,39 @@ +"""Tests for map_operations example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus + +from src.map import map_operations_flat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_operations_flat.handler, + lambda_function_name="map operations", +) +def test_map_operations(durable_runner): + """Test map_operations example using context.map().""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == [2, 4, 6, 8, 10] + + # Get the map operation (CONTEXT type with MAP subtype) + map_op = result.get_context("map_operation") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # Verify all five child operations exist + assert len(map_op.child_operations) == 5 + + # Verify child step operation names + child_names = {op.name for op in map_op.child_operations} + expected_names = {f"map_item_{i}" for i in range(5)} + assert child_names == expected_names + + # Verify all children succeeded + for child in map_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel_flat.py b/examples/test/parallel/test_parallel_flat.py new file mode 100644 index 00000000..d12b26ee --- /dev/null +++ b/examples/test/parallel/test_parallel_flat.py @@ -0,0 +1,38 @@ +"""Tests for parallel example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus + +from src.parallel import parallel_flat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel_flat.handler, + lambda_function_name="Parallel Operations", +) +def test_parallel_flat(durable_runner): + """Test parallel example using context.parallel().""" + with durable_runner: + result = durable_runner.run(input="test", timeout=100) + + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == [ + "task 1 completed", + "task 2 completed", + "task 3 completed after wait", + ] + + # Get the parallel operation (CONTEXT type with PARALLEL subtype) + parallel_op = result.get_context("parallel_operation") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all three child operations exist + assert len(parallel_op.child_operations) == 3 + + # Verify all children succeeded + for child in parallel_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED From 477da63def6d21b58f8361b4d83780c96e84901f Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Thu, 23 Apr 2026 10:19:50 -0700 Subject: [PATCH 2/2] attempt to fix integration tests --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e099009f..3315abcd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,7 +43,9 @@ jobs: echo "Using Python Language SDK branch override: $OVERRIDE" else echo "Using default Python Language SDK (main branch)" + OVERRIDE=main fi + hatch run -- test:python -m pip install -e "aws-durable-execution-sdk-python@git+https://github.com/aws/aws-durable-execution-sdk-python.git@$OVERRIDE" - name: static analysis run: hatch fmt --check - name: type checking