Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ jobs:
with:
path: language-sdk

- name: Checkout the latest Testing SDK
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
repository: aws/aws-durable-execution-sdk-python-testing
path: testing-sdk

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
Expand All @@ -34,6 +40,7 @@ jobs:
working-directory: language-sdk
run: |
echo "Running SDK tests..."
hatch run -- test:pip install -e ../testing-sdk
hatch run -- test:pip install -e ../language-sdk
hatch fmt --check
hatch run types:check
Expand All @@ -54,6 +61,12 @@ jobs:
with:
path: language-sdk

- name: Checkout the latest Testing SDK
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
repository: aws/aws-durable-execution-sdk-python-testing
path: testing-sdk

- name: Set up Python 3.13
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
Expand Down Expand Up @@ -92,6 +105,7 @@ jobs:
KMS_KEY_ARN: ${{ secrets.KMS_KEY_ARN }}
run: |
echo "Building examples..."
hatch run -- examples:pip install -e ../testing-sdk
hatch run examples:build

# Get first integration example for testing
Expand Down
22 changes: 22 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@
},
"path": "./src/parallel/parallel.py"
},
{
"name": "Parallel Operations",
"description": "Executing multiple durable operations in parallel",
"handler": "parallel.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/parallel/parallel.py"
},
{
"name": "Map Operations",
"description": "Processing collections using map-like durable operations",
Expand All @@ -232,6 +243,17 @@
},
"path": "./src/map/map_operations.py"
},
{
"name": "Map Operations Flat",
"description": "Processing collections using map-like durable operations in FLAT mode",
"handler": "map_operations_flat.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_operations_flat.py"
},
{
"name": "Map Large Scale",
"description": "Processing collections using map-like durable operations in large scale",
Expand Down
23 changes: 23 additions & 0 deletions examples/src/map/map_operations_flat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Example demonstrating map operations for processing collections durably."""

from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig, NestingType
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[int]:
"""Process a list of items using context.map()."""
items = [1, 2, 3, 4, 5]

# Use context.map() to process items concurrently and extract results immediately
return context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.step(
lambda _: item * 2, name=f"map_item_{index}"
),
name="map_operation",
config=MapConfig(max_concurrency=2, nesting_type=NestingType.FLAT),
).get_results()
27 changes: 27 additions & 0 deletions examples/src/parallel/parallel_flat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Example demonstrating parallel operations for concurrent execution."""

from typing import Any

from aws_durable_execution_sdk_python.config import ParallelConfig, NestingType
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.config import Duration


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[str]:
"""Execute multiple operations in parallel using context.parallel()."""

# Use context.parallel() to execute functions concurrently and extract results immediately
return context.parallel(
functions=[
lambda ctx: ctx.step(lambda _: "task 1 completed", name="task1"),
lambda ctx: ctx.step(lambda _: "task 2 completed", name="task2"),
lambda ctx: (
ctx.wait(Duration.from_seconds(1), name="wait_in_task3"),
"task 3 completed after wait",
)[1],
],
name="parallel_operation",
config=ParallelConfig(max_concurrency=2, nesting_type=NestingType.FLAT),
).get_results()
18 changes: 18 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,24 @@
}
}
},
"MapOperationsFlat": {
"Type": "AWS::Serverless::Function",
"Properties": {
"CodeUri": "build/",
"Handler": "map_operations_flat.handler",
"Description": "Processing collections using map-like durable operations in FLAT mode",
"Role": {
"Fn::GetAtt": [
"DurableFunctionRole",
"Arn"
]
},
"DurableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
}
}
},
"MapWithLargeScale": {
"Type": "AWS::Serverless::Function",
"Properties": {
Expand Down
39 changes: 39 additions & 0 deletions examples/test/map/test_map_operations_flat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Tests for map_operations example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus

from src.map import map_operations_flat
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=map_operations_flat.handler,
lambda_function_name="Map Operations Flat",
)
def test_map_operations_flat(durable_runner):
"""Test map_operations example using context.map()."""
with durable_runner:
result = durable_runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [2, 4, 6, 8, 10]

# Get the map operation (CONTEXT type with MAP subtype)
map_op = result.get_context("map_operation")
assert map_op is not None
assert map_op.status is OperationStatus.SUCCEEDED

# Verify all five child operations exist
assert len(map_op.child_operations) == 5

# Verify child step operation names
child_names = {op.name for op in map_op.child_operations}
expected_names = {f"map_item_{i}" for i in range(5)}
assert child_names == expected_names

# Verify all children succeeded
for child in map_op.child_operations:
assert child.status is OperationStatus.SUCCEEDED
6 changes: 5 additions & 1 deletion examples/test/parallel/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
from aws_durable_execution_sdk_python.lambda_service import (
OperationStatus,
OperationType,
)

from src.parallel import parallel
from test.conftest import deserialize_operation_payload
Expand Down Expand Up @@ -35,4 +38,5 @@ def test_parallel(durable_runner):

# Verify all children succeeded
for child in parallel_op.child_operations:
assert child.operation_type == OperationType.CONTEXT
assert child.status is OperationStatus.SUCCEEDED
42 changes: 42 additions & 0 deletions examples/test/parallel/test_parallel_flat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Tests for parallel example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import (
OperationStatus,
OperationType,
)

from src.parallel import parallel_flat
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=parallel_flat.handler,
lambda_function_name="Parallel Operations Flat",
)
def test_parallel_flat(durable_runner):
"""Test parallel example using context.parallel()."""
with durable_runner:
result = durable_runner.run(input="test", timeout=100)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [
"task 1 completed",
"task 2 completed",
"task 3 completed after wait",
]

# Get the parallel operation (CONTEXT type with PARALLEL subtype)
parallel_op = result.get_context("parallel_operation")
assert parallel_op is not None
assert parallel_op.status is OperationStatus.SUCCEEDED

# Verify all three child operations exist
assert len(parallel_op.child_operations) == 3

# Verify all children succeeded
for child in parallel_op.child_operations:
assert child.operation_type != OperationType.CONTEXT
assert child.status is OperationStatus.SUCCEEDED
Loading