Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ jobs:
echo "Using Python Language SDK branch override: $OVERRIDE"
else
echo "Using default Python Language SDK (main branch)"
OVERRIDE=main
fi
hatch run -- test:python -m pip install -e "aws-durable-execution-sdk-python@git+https://github.com/aws/aws-durable-execution-sdk-python.git@$OVERRIDE"
- name: static analysis
run: hatch fmt --check
- name: type checking
Expand Down
23 changes: 23 additions & 0 deletions examples/src/map/map_operations_flat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Example demonstrating map operations for processing collections durably."""

from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig, NestingType
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[int]:
"""Process a list of items using context.map()."""
items = [1, 2, 3, 4, 5]

# Use context.map() to process items concurrently and extract results immediately
return context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.step(
lambda _: item * 2, name=f"map_item_{index}"
),
name="map_operation",
config=MapConfig(max_concurrency=2, nesting_type=NestingType.FLAT),
).get_results()
27 changes: 27 additions & 0 deletions examples/src/parallel/parallel_flat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Example demonstrating parallel operations for concurrent execution."""

from typing import Any

from aws_durable_execution_sdk_python.config import ParallelConfig, NestingType
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.config import Duration


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[str]:
"""Execute multiple operations in parallel using context.parallel()."""

# Use context.parallel() to execute functions concurrently and extract results immediately
return context.parallel(
functions=[
lambda ctx: ctx.step(lambda _: "task 1 completed", name="task1"),
lambda ctx: ctx.step(lambda _: "task 2 completed", name="task2"),
lambda ctx: (
ctx.wait(Duration.from_seconds(1), name="wait_in_task3"),
"task 3 completed after wait",
)[1],
],
name="parallel_operation",
config=ParallelConfig(max_concurrency=2, nesting_type=NestingType.FLAT),
).get_results()
39 changes: 39 additions & 0 deletions examples/test/map/test_map_operations_flat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Tests for map_operations example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus

from src.map import map_operations_flat
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=map_operations_flat.handler,
lambda_function_name="map operations",
)
def test_map_operations(durable_runner):
"""Test map_operations example using context.map()."""
with durable_runner:
result = durable_runner.run(input="test", timeout=10)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [2, 4, 6, 8, 10]

# Get the map operation (CONTEXT type with MAP subtype)
map_op = result.get_context("map_operation")
assert map_op is not None
assert map_op.status is OperationStatus.SUCCEEDED

# Verify all five child operations exist
assert len(map_op.child_operations) == 5

# Verify child step operation names
child_names = {op.name for op in map_op.child_operations}
expected_names = {f"map_item_{i}" for i in range(5)}
assert child_names == expected_names

# Verify all children succeeded
for child in map_op.child_operations:
assert child.status is OperationStatus.SUCCEEDED
38 changes: 38 additions & 0 deletions examples/test/parallel/test_parallel_flat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Tests for parallel example."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus

from src.parallel import parallel_flat
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=parallel_flat.handler,
lambda_function_name="Parallel Operations",
)
def test_parallel_flat(durable_runner):
"""Test parallel example using context.parallel()."""
with durable_runner:
result = durable_runner.run(input="test", timeout=100)

assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [
"task 1 completed",
"task 2 completed",
"task 3 completed after wait",
]

# Get the parallel operation (CONTEXT type with PARALLEL subtype)
parallel_op = result.get_context("parallel_operation")
assert parallel_op is not None
assert parallel_op.status is OperationStatus.SUCCEEDED

# Verify all three child operations exist
assert len(parallel_op.child_operations) == 3

# Verify all children succeeded
for child in parallel_op.child_operations:
assert child.status is OperationStatus.SUCCEEDED
Loading