diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index ba1f6ba..13441ab 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -22,6 +22,12 @@ jobs: with: path: language-sdk + - name: Checkout the latest Testing SDK + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + repository: aws/aws-durable-execution-sdk-python-testing + path: testing-sdk + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 with: @@ -34,6 +40,7 @@ jobs: working-directory: language-sdk run: | echo "Running SDK tests..." + hatch run -- test:pip install -e ../testing-sdk hatch run -- test:pip install -e ../language-sdk hatch fmt --check hatch run types:check @@ -54,6 +61,12 @@ jobs: with: path: language-sdk + - name: Checkout the latest Testing SDK + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + repository: aws/aws-durable-execution-sdk-python-testing + path: testing-sdk + - name: Set up Python 3.13 uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 with: @@ -92,6 +105,7 @@ jobs: KMS_KEY_ARN: ${{ secrets.KMS_KEY_ARN }} run: | echo "Building examples..." + hatch run -- examples:pip install -e ../testing-sdk hatch run examples:build # Get first integration example for testing diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index df8ea36..e80e3ba 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -221,6 +221,17 @@ }, "path": "./src/parallel/parallel.py" }, + { + "name": "Parallel Operations", + "description": "Executing multiple durable operations in parallel", + "handler": "parallel.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/parallel/parallel.py" + }, { "name": "Map Operations", "description": "Processing collections using map-like durable operations", @@ -232,6 +243,17 @@ }, "path": "./src/map/map_operations.py" }, + { + "name": "Map Operations Flat", + "description": "Processing collections using map-like durable operations in FLAT mode", + "handler": "map_operations_flat.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_operations_flat.py" + }, { "name": "Map Large Scale", "description": "Processing collections using map-like durable operations in large scale", diff --git a/examples/src/map/map_operations_flat.py b/examples/src/map/map_operations_flat.py new file mode 100644 index 0000000..272afb5 --- /dev/null +++ b/examples/src/map/map_operations_flat.py @@ -0,0 +1,23 @@ +"""Example demonstrating map operations for processing collections durably.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import MapConfig, NestingType +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> list[int]: + """Process a list of items using context.map().""" + items = [1, 2, 3, 4, 5] + + # Use context.map() to process items concurrently and extract results immediately + return context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: item * 2, name=f"map_item_{index}" + ), + name="map_operation", + config=MapConfig(max_concurrency=2, nesting_type=NestingType.FLAT), + ).get_results() diff --git a/examples/src/parallel/parallel_flat.py b/examples/src/parallel/parallel_flat.py new file mode 100644 index 0000000..5ae26eb --- /dev/null +++ b/examples/src/parallel/parallel_flat.py @@ -0,0 +1,27 @@ +"""Example demonstrating parallel operations for concurrent execution.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import ParallelConfig, NestingType +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> list[str]: + """Execute multiple operations in parallel using context.parallel().""" + + # Use context.parallel() to execute functions concurrently and extract results immediately + return context.parallel( + functions=[ + lambda ctx: ctx.step(lambda _: "task 1 completed", name="task1"), + lambda ctx: ctx.step(lambda _: "task 2 completed", name="task2"), + lambda ctx: ( + ctx.wait(Duration.from_seconds(1), name="wait_in_task3"), + "task 3 completed after wait", + )[1], + ], + name="parallel_operation", + config=ParallelConfig(max_concurrency=2, nesting_type=NestingType.FLAT), + ).get_results() diff --git a/examples/template.yaml b/examples/template.yaml index 5e2d5ae..0a9dcb9 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -420,6 +420,24 @@ } } }, + "MapOperationsFlat": { + "Type": "AWS::Serverless::Function", + "Properties": { + "CodeUri": "build/", + "Handler": "map_operations_flat.handler", + "Description": "Processing collections using map-like durable operations in FLAT mode", + "Role": { + "Fn::GetAtt": [ + "DurableFunctionRole", + "Arn" + ] + }, + "DurableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + } + } + }, "MapWithLargeScale": { "Type": "AWS::Serverless::Function", "Properties": { diff --git a/examples/test/map/test_map_operations_flat.py b/examples/test/map/test_map_operations_flat.py new file mode 100644 index 0000000..2bc909e --- /dev/null +++ b/examples/test/map/test_map_operations_flat.py @@ -0,0 +1,39 @@ +"""Tests for map_operations example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus + +from src.map import map_operations_flat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_operations_flat.handler, + lambda_function_name="Map Operations Flat", +) +def test_map_operations_flat(durable_runner): + """Test map_operations example using context.map().""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == [2, 4, 6, 8, 10] + + # Get the map operation (CONTEXT type with MAP subtype) + map_op = result.get_context("map_operation") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # Verify all five child operations exist + assert len(map_op.child_operations) == 5 + + # Verify child step operation names + child_names = {op.name for op in map_op.child_operations} + expected_names = {f"map_item_{i}" for i in range(5)} + assert child_names == expected_names + + # Verify all children succeeded + for child in map_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel.py b/examples/test/parallel/test_parallel.py index 184e854..67a1b9f 100644 --- a/examples/test/parallel/test_parallel.py +++ b/examples/test/parallel/test_parallel.py @@ -2,7 +2,10 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, +) from src.parallel import parallel from test.conftest import deserialize_operation_payload @@ -35,4 +38,5 @@ def test_parallel(durable_runner): # Verify all children succeeded for child in parallel_op.child_operations: + assert child.operation_type == OperationType.CONTEXT assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel_flat.py b/examples/test/parallel/test_parallel_flat.py new file mode 100644 index 0000000..b666d0b --- /dev/null +++ b/examples/test/parallel/test_parallel_flat.py @@ -0,0 +1,42 @@ +"""Tests for parallel example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, +) + +from src.parallel import parallel_flat +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel_flat.handler, + lambda_function_name="Parallel Operations Flat", +) +def test_parallel_flat(durable_runner): + """Test parallel example using context.parallel().""" + with durable_runner: + result = durable_runner.run(input="test", timeout=100) + + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == [ + "task 1 completed", + "task 2 completed", + "task 3 completed after wait", + ] + + # Get the parallel operation (CONTEXT type with PARALLEL subtype) + parallel_op = result.get_context("parallel_operation") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all three child operations exist + assert len(parallel_op.child_operations) == 3 + + # Verify all children succeeded + for child in parallel_op.child_operations: + assert child.operation_type != OperationType.CONTEXT + assert child.status is OperationStatus.SUCCEEDED