From fa3151f82b885672a35973c68713929a25bc28bb Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 28 Apr 2026 11:06:31 -0700 Subject: [PATCH 1/6] add tests for nest_type --- examples/src/map/map_operation_flat.py | 23 +++++++++++ examples/src/parallel/parallel_flat.py | 27 +++++++++++++ examples/test/map/test_map_operations_flat.py | 39 +++++++++++++++++++ examples/test/parallel/test_parallel_flat.py | 38 ++++++++++++++++++ 4 files changed, 127 insertions(+) create mode 100644 examples/src/map/map_operation_flat.py create mode 100644 examples/src/parallel/parallel_flat.py create mode 100644 examples/test/map/test_map_operations_flat.py create mode 100644 examples/test/parallel/test_parallel_flat.py diff --git a/examples/src/map/map_operation_flat.py b/examples/src/map/map_operation_flat.py new file mode 100644 index 0000000..272afb5 --- /dev/null +++ b/examples/src/map/map_operation_flat.py @@ -0,0 +1,23 @@ +"""Example demonstrating map operations for processing collections durably.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import MapConfig, NestingType +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> list[int]: + """Process a list of items using context.map().""" + items = [1, 2, 3, 4, 5] + + # Use context.map() to process items concurrently and extract results immediately + return context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: item * 2, name=f"map_item_{index}" + ), + name="map_operation", + config=MapConfig(max_concurrency=2, nesting_type=NestingType.FLAT), + ).get_results() diff --git a/examples/src/parallel/parallel_flat.py b/examples/src/parallel/parallel_flat.py new file mode 100644 index 0000000..5ae26eb --- /dev/null +++ b/examples/src/parallel/parallel_flat.py @@ -0,0 +1,27 @@ +"""Example demonstrating parallel operations for concurrent execution.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import ParallelConfig, NestingType +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> list[str]: + """Execute multiple operations in parallel using context.parallel().""" + + # Use context.parallel() to execute functions concurrently and extract results immediately + return context.parallel( + functions=[ + lambda ctx: ctx.step(lambda _: "task 1 completed", name="task1"), + lambda ctx: ctx.step(lambda _: "task 2 completed", name="task2"), + lambda ctx: ( + ctx.wait(Duration.from_seconds(1), name="wait_in_task3"), + "task 3 completed after wait", + )[1], + ], + name="parallel_operation", + config=ParallelConfig(max_concurrency=2, nesting_type=NestingType.FLAT), + ).get_results() diff --git a/examples/test/map/test_map_operations_flat.py b/examples/test/map/test_map_operations_flat.py new file mode 100644 index 0000000..0f10ca4 --- /dev/null +++ b/examples/test/map/test_map_operations_flat.py @@ -0,0 +1,39 @@ +"""Tests for map_operations example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus + +from src.map import map_operations_flat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_operations_flat.handler, + lambda_function_name="map operations", +) +def test_map_operations(durable_runner): + """Test map_operations example using context.map().""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == [2, 4, 6, 8, 10] + + # Get the map operation (CONTEXT type with MAP subtype) + map_op = result.get_context("map_operation") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # Verify all five child operations exist + assert len(map_op.child_operations) == 5 + + # Verify child step operation names + child_names = {op.name for op in map_op.child_operations} + expected_names = {f"map_item_{i}" for i in range(5)} + assert child_names == expected_names + + # Verify all children succeeded + for child in map_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel_flat.py b/examples/test/parallel/test_parallel_flat.py new file mode 100644 index 0000000..d12b26e --- /dev/null +++ b/examples/test/parallel/test_parallel_flat.py @@ -0,0 +1,38 @@ +"""Tests for parallel example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus + +from src.parallel import parallel_flat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel_flat.handler, + lambda_function_name="Parallel Operations", +) +def test_parallel_flat(durable_runner): + """Test parallel example using context.parallel().""" + with durable_runner: + result = durable_runner.run(input="test", timeout=100) + + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == [ + "task 1 completed", + "task 2 completed", + "task 3 completed after wait", + ] + + # Get the parallel operation (CONTEXT type with PARALLEL subtype) + parallel_op = result.get_context("parallel_operation") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all three child operations exist + assert len(parallel_op.child_operations) == 3 + + # Verify all children succeeded + for child in parallel_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED From 52c365a26867ca691526ef421f16b83d40f60e03 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 28 Apr 2026 11:13:23 -0700 Subject: [PATCH 2/6] fix name typo --- .../src/map/{map_operation_flat.py => map_operations_flat.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename examples/src/map/{map_operation_flat.py => map_operations_flat.py} (100%) diff --git a/examples/src/map/map_operation_flat.py b/examples/src/map/map_operations_flat.py similarity index 100% rename from examples/src/map/map_operation_flat.py rename to examples/src/map/map_operations_flat.py From c72c3e6ad4306cce740d69a3c98ff15d4159d4be Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 28 Apr 2026 11:28:24 -0700 Subject: [PATCH 3/6] rename test case for flat --- examples/test/map/test_map_operations_flat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/test/map/test_map_operations_flat.py b/examples/test/map/test_map_operations_flat.py index 0f10ca4..8263157 100644 --- a/examples/test/map/test_map_operations_flat.py +++ b/examples/test/map/test_map_operations_flat.py @@ -13,7 +13,7 @@ handler=map_operations_flat.handler, lambda_function_name="map operations", ) -def test_map_operations(durable_runner): +def test_map_operations_flat(durable_runner): """Test map_operations example using context.map().""" with durable_runner: result = durable_runner.run(input="test", timeout=10) From 9cbdb1fde6e285684577d9ca5742e259a296efe5 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 28 Apr 2026 11:44:22 -0700 Subject: [PATCH 4/6] update workflow to install the latest testing lib --- .github/workflows/integration-tests.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index ba1f6ba..13441ab 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -22,6 +22,12 @@ jobs: with: path: language-sdk + - name: Checkout the latest Testing SDK + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + repository: aws/aws-durable-execution-sdk-python-testing + path: testing-sdk + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 with: @@ -34,6 +40,7 @@ jobs: working-directory: language-sdk run: | echo "Running SDK tests..." + hatch run -- test:pip install -e ../testing-sdk hatch run -- test:pip install -e ../language-sdk hatch fmt --check hatch run types:check @@ -54,6 +61,12 @@ jobs: with: path: language-sdk + - name: Checkout the latest Testing SDK + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + repository: aws/aws-durable-execution-sdk-python-testing + path: testing-sdk + - name: Set up Python 3.13 uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 with: @@ -92,6 +105,7 @@ jobs: KMS_KEY_ARN: ${{ secrets.KMS_KEY_ARN }} run: | echo "Building examples..." + hatch run -- examples:pip install -e ../testing-sdk hatch run examples:build # Get first integration example for testing From 56bc54109d389d67aee2324c930b6d9a18622be8 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 28 Apr 2026 11:51:24 -0700 Subject: [PATCH 5/6] fix cloud tests --- examples/examples-catalog.json | 22 +++++++++++++++++++ examples/template.yaml | 18 +++++++++++++++ examples/test/map/test_map_operations_flat.py | 2 +- examples/test/parallel/test_parallel.py | 3 ++- examples/test/parallel/test_parallel_flat.py | 5 +++-- 5 files changed, 46 insertions(+), 4 deletions(-) diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index df8ea36..e80e3ba 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -221,6 +221,17 @@ }, "path": "./src/parallel/parallel.py" }, + { + "name": "Parallel Operations", + "description": "Executing multiple durable operations in parallel", + "handler": "parallel.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/parallel/parallel.py" + }, { "name": "Map Operations", "description": "Processing collections using map-like durable operations", @@ -232,6 +243,17 @@ }, "path": "./src/map/map_operations.py" }, + { + "name": "Map Operations Flat", + "description": "Processing collections using map-like durable operations in FLAT mode", + "handler": "map_operations_flat.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_operations_flat.py" + }, { "name": "Map Large Scale", "description": "Processing collections using map-like durable operations in large scale", diff --git a/examples/template.yaml b/examples/template.yaml index 5e2d5ae..0a9dcb9 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -420,6 +420,24 @@ } } }, + "MapOperationsFlat": { + "Type": "AWS::Serverless::Function", + "Properties": { + "CodeUri": "build/", + "Handler": "map_operations_flat.handler", + "Description": "Processing collections using map-like durable operations in FLAT mode", + "Role": { + "Fn::GetAtt": [ + "DurableFunctionRole", + "Arn" + ] + }, + "DurableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + } + } + }, "MapWithLargeScale": { "Type": "AWS::Serverless::Function", "Properties": { diff --git a/examples/test/map/test_map_operations_flat.py b/examples/test/map/test_map_operations_flat.py index 8263157..2bc909e 100644 --- a/examples/test/map/test_map_operations_flat.py +++ b/examples/test/map/test_map_operations_flat.py @@ -11,7 +11,7 @@ @pytest.mark.example @pytest.mark.durable_execution( handler=map_operations_flat.handler, - lambda_function_name="map operations", + lambda_function_name="Map Operations Flat", ) def test_map_operations_flat(durable_runner): """Test map_operations example using context.map().""" diff --git a/examples/test/parallel/test_parallel.py b/examples/test/parallel/test_parallel.py index 184e854..e4191d0 100644 --- a/examples/test/parallel/test_parallel.py +++ b/examples/test/parallel/test_parallel.py @@ -2,7 +2,7 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus, OperationType from src.parallel import parallel from test.conftest import deserialize_operation_payload @@ -35,4 +35,5 @@ def test_parallel(durable_runner): # Verify all children succeeded for child in parallel_op.child_operations: + assert child.operation_type == OperationType.CONTEXT assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel_flat.py b/examples/test/parallel/test_parallel_flat.py index d12b26e..d62e080 100644 --- a/examples/test/parallel/test_parallel_flat.py +++ b/examples/test/parallel/test_parallel_flat.py @@ -2,7 +2,7 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus, OperationType from src.parallel import parallel_flat from test.conftest import deserialize_operation_payload @@ -11,7 +11,7 @@ @pytest.mark.example @pytest.mark.durable_execution( handler=parallel_flat.handler, - lambda_function_name="Parallel Operations", + lambda_function_name="Parallel Operations Flat", ) def test_parallel_flat(durable_runner): """Test parallel example using context.parallel().""" @@ -35,4 +35,5 @@ def test_parallel_flat(durable_runner): # Verify all children succeeded for child in parallel_op.child_operations: + assert child.operation_type != OperationType.CONTEXT assert child.status is OperationStatus.SUCCEEDED From a2945d09b5dcaf2d60e0c7f8e6dbc6b9fd892f72 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 28 Apr 2026 11:52:19 -0700 Subject: [PATCH 6/6] fix format --- examples/test/parallel/test_parallel.py | 5 ++++- examples/test/parallel/test_parallel_flat.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/test/parallel/test_parallel.py b/examples/test/parallel/test_parallel.py index e4191d0..67a1b9f 100644 --- a/examples/test/parallel/test_parallel.py +++ b/examples/test/parallel/test_parallel.py @@ -2,7 +2,10 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python.lambda_service import OperationStatus, OperationType +from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, +) from src.parallel import parallel from test.conftest import deserialize_operation_payload diff --git a/examples/test/parallel/test_parallel_flat.py b/examples/test/parallel/test_parallel_flat.py index d62e080..b666d0b 100644 --- a/examples/test/parallel/test_parallel_flat.py +++ b/examples/test/parallel/test_parallel_flat.py @@ -2,7 +2,10 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python.lambda_service import OperationStatus, OperationType +from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, +) from src.parallel import parallel_flat from test.conftest import deserialize_operation_payload