Skip to content

Commit 15c849f

Browse files
committed
feat: implement filesystem store
1 parent f0a6c08 commit 15c849f

13 files changed

Lines changed: 532 additions & 67 deletions

File tree

src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
if TYPE_CHECKING:
2828
from aws_durable_execution_sdk_python_testing.execution import Execution
2929
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
30-
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
30+
from aws_durable_execution_sdk_python_testing.stores import ExecutionStore
3131

3232

3333
class CheckpointProcessor:

src/aws_durable_execution_sdk_python_testing/cli.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class CliConfig:
5050
local_runner_region: str = "us-west-2"
5151
local_runner_mode: str = "local"
5252

53+
# Store configuration
54+
store_type: str = "memory"
55+
store_path: str | None = None
56+
5357
@classmethod
5458
def from_environment(cls) -> CliConfig:
5559
"""Create configuration from environment variables with defaults."""
@@ -65,6 +69,8 @@ def from_environment(cls) -> CliConfig:
6569
),
6670
local_runner_region=os.getenv("AWS_DEX_LOCAL_RUNNER_REGION", "us-west-2"),
6771
local_runner_mode=os.getenv("AWS_DEX_LOCAL_RUNNER_MODE", "local"),
72+
store_type=os.getenv("AWS_DEX_STORE_TYPE", "memory"),
73+
store_path=os.getenv("AWS_DEX_STORE_PATH"),
6874
)
6975

7076

@@ -175,6 +181,17 @@ def _create_start_server_parser(self, subparsers) -> None:
175181
default=self.config.local_runner_mode,
176182
help=f"Local Runner mode (default: {self.config.local_runner_mode}, env: AWS_DEX_LOCAL_RUNNER_MODE)",
177183
)
184+
start_server_parser.add_argument(
185+
"--store-type",
186+
choices=["memory", "filesystem"],
187+
default=self.config.store_type,
188+
help=f"Store type for execution persistence (default: {self.config.store_type}, env: AWS_DEX_STORE_TYPE)",
189+
)
190+
start_server_parser.add_argument(
191+
"--store-path",
192+
default=self.config.store_path,
193+
help=f"Path for filesystem store (default: {self.config.store_path or '.durable_executions'}, env: AWS_DEX_STORE_PATH)",
194+
)
178195
start_server_parser.set_defaults(func=self.start_server_command)
179196

180197
def _create_invoke_parser(self, subparsers) -> None:
@@ -245,6 +262,8 @@ def start_server_command(self, args: argparse.Namespace) -> int:
245262
local_runner_endpoint=args.local_runner_endpoint,
246263
local_runner_region=args.local_runner_region,
247264
local_runner_mode=args.local_runner_mode,
265+
store_type=args.store_type,
266+
store_path=args.store_path,
248267
)
249268

250269
logger.info(
@@ -260,6 +279,10 @@ def start_server_command(self, args: argparse.Namespace) -> int:
260279
logger.info(" Local Runner Endpoint: %s", args.local_runner_endpoint)
261280
logger.info(" Local Runner Region: %s", args.local_runner_region)
262281
logger.info(" Local Runner Mode: %s", args.local_runner_mode)
282+
logger.info(" Store Type: %s", args.store_type)
283+
if args.store_type == "filesystem":
284+
store_path = args.store_path or ".durable_executions"
285+
logger.info(" Store Path: %s", store_path)
263286

264287
# Use runner as context manager for proper lifecycle
265288
with WebRunner(runner_config) as runner:

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import json
44
from dataclasses import replace
55
from datetime import UTC, datetime
6-
from typing import TYPE_CHECKING
6+
from typing import TYPE_CHECKING, Any
77
from uuid import uuid4
88

99
from aws_durable_execution_sdk_python.execution import (
@@ -51,7 +51,7 @@ def __init__(
5151
# TODO: this will need to persist/rehydrate depending on inmemory vs sqllite store
5252
self.token_sequence: int = 0
5353
self.is_complete: bool = False
54-
self.result: DurableExecutionInvocationOutput | None
54+
self.result: DurableExecutionInvocationOutput | None = None
5555
self.consecutive_failed_invocation_attempts: int = 0
5656

5757
@staticmethod
@@ -63,6 +63,58 @@ def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
6363
durable_execution_arn=str(uuid4()), start_input=input, operations=[]
6464
)
6565

66+
def to_dict(self) -> dict[str, Any]:
67+
"""Serialize execution to dictionary."""
68+
return {
69+
"DurableExecutionArn": self.durable_execution_arn,
70+
"StartInput": self.start_input.to_dict(),
71+
"Operations": [op.to_dict() for op in self.operations],
72+
"Updates": [update.to_dict() for update in self.updates],
73+
"UsedTokens": list(self.used_tokens),
74+
"TokenSequence": self.token_sequence,
75+
"IsComplete": self.is_complete,
76+
"Result": self.result.to_dict() if self.result else None,
77+
"ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts,
78+
}
79+
80+
@classmethod
81+
def from_dict(cls, data: dict[str, Any]) -> Execution:
82+
"""Deserialize execution from dictionary."""
83+
from aws_durable_execution_sdk_python_testing.model import (
84+
StartDurableExecutionInput,
85+
)
86+
87+
# Reconstruct start_input
88+
start_input = StartDurableExecutionInput.from_dict(data["StartInput"])
89+
90+
# Reconstruct operations
91+
operations = [Operation.from_dict(op_data) for op_data in data["Operations"]]
92+
93+
# Create execution
94+
execution = cls(
95+
durable_execution_arn=data["DurableExecutionArn"],
96+
start_input=start_input,
97+
operations=operations,
98+
)
99+
100+
# Set additional fields
101+
execution.updates = [
102+
OperationUpdate.from_dict(update_data) for update_data in data["Updates"]
103+
]
104+
execution.used_tokens = set(data["UsedTokens"])
105+
execution.token_sequence = data["TokenSequence"]
106+
execution.is_complete = data["IsComplete"]
107+
execution.result = (
108+
DurableExecutionInvocationOutput.from_dict(data["Result"])
109+
if data["Result"]
110+
else None
111+
)
112+
execution.consecutive_failed_invocation_attempts = data[
113+
"ConsecutiveFailedInvocationAttempts"
114+
]
115+
116+
return execution
117+
66118
def start(self) -> None:
67119
# not thread safe, prob should be
68120
if self.start_input.invocation_id is None:

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
from aws_durable_execution_sdk_python_testing.invoker import Invoker
5050
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
51-
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
51+
from aws_durable_execution_sdk_python_testing.stores import ExecutionStore
5252

5353
logger = logging.getLogger(__name__)
5454

@@ -142,15 +142,15 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
142142
durable_execution_name=execution.start_input.execution_name,
143143
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
144144
status=status,
145-
start_timestamp=execution_op.start_timestamp.isoformat()
145+
start_timestamp=execution_op.start_timestamp.timestamp()
146146
if execution_op.start_timestamp
147-
else datetime.now(UTC).isoformat(),
147+
else datetime.now(UTC).timestamp(),
148148
input_payload=execution_op.execution_details.input_payload
149149
if execution_op.execution_details
150150
else None,
151151
result=result,
152152
error=error,
153-
end_timestamp=execution_op.end_timestamp.isoformat()
153+
end_timestamp=execution_op.end_timestamp.timestamp()
154154
if execution_op.end_timestamp
155155
else None,
156156
version="1.0",
@@ -223,10 +223,10 @@ def list_executions(
223223
durable_execution_name=execution.start_input.execution_name,
224224
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
225225
status=execution_status,
226-
start_timestamp=execution_op.start_timestamp.isoformat()
226+
start_timestamp=execution_op.start_timestamp.timestamp()
227227
if execution_op.start_timestamp
228-
else datetime.now(UTC).isoformat(),
229-
end_timestamp=execution_op.end_timestamp.isoformat()
228+
else datetime.now(UTC).timestamp(),
229+
end_timestamp=execution_op.end_timestamp.timestamp()
230230
if execution_op.end_timestamp
231231
else None,
232232
)
@@ -333,7 +333,7 @@ def stop_execution(
333333
# Stop the execution
334334
self.fail_execution(execution_arn, stop_error)
335335

336-
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).isoformat())
336+
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).timestamp())
337337

338338
def get_execution_state(
339339
self,

src/aws_durable_execution_sdk_python_testing/runner.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@
4848
StartDurableExecutionOutput,
4949
)
5050
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
51-
from aws_durable_execution_sdk_python_testing.store import InMemoryExecutionStore
51+
from aws_durable_execution_sdk_python_testing.stores import (
52+
ExecutionStore,
53+
FileSystemExecutionStore,
54+
InMemoryExecutionStore,
55+
)
5256
from aws_durable_execution_sdk_python_testing.web.server import WebServer
5357

5458

@@ -83,6 +87,10 @@ class WebRunnerConfig:
8387
local_runner_region: str = "us-west-2"
8488
local_runner_mode: str = "local"
8589

90+
# Store configuration
91+
store_type: str = "memory" # "memory" or "filesystem"
92+
store_path: str | None = None # Path for filesystem store
93+
8694

8795
@dataclass(frozen=True)
8896
class Operation:
@@ -543,7 +551,7 @@ def __init__(self, config: WebRunnerConfig) -> None:
543551
self._config = config
544552
self._server: WebServer | None = None
545553
self._scheduler: Scheduler | None = None
546-
self._store: InMemoryExecutionStore | None = None
554+
self._store: ExecutionStore | None = None
547555
self._invoker: LambdaInvoker | None = None
548556
self._executor: Executor | None = None
549557

@@ -581,7 +589,11 @@ def start(self) -> None:
581589
raise DurableFunctionsLocalRunnerError(msg)
582590

583591
# Create dependencies and server
584-
self._store = InMemoryExecutionStore()
592+
if self._config.store_type == "filesystem":
593+
store_path = self._config.store_path or ".durable_executions"
594+
self._store = FileSystemExecutionStore(store_path)
595+
else:
596+
self._store = InMemoryExecutionStore()
585597
self._scheduler = Scheduler()
586598
self._invoker = LambdaInvoker(self._create_boto3_client())
587599

src/aws_durable_execution_sdk_python_testing/store.py

Lines changed: 0 additions & 50 deletions
This file was deleted.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Execution stores for persisting durable function executions."""
2+
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING, Protocol
6+
7+
from aws_durable_execution_sdk_python_testing.stores.filesystem import (
8+
FileSystemExecutionStore,
9+
)
10+
from aws_durable_execution_sdk_python_testing.stores.memory import (
11+
InMemoryExecutionStore,
12+
)
13+
14+
15+
if TYPE_CHECKING:
16+
from aws_durable_execution_sdk_python_testing.execution import Execution
17+
18+
19+
class ExecutionStore(Protocol):
20+
"""Protocol for execution storage implementations."""
21+
22+
# ignore cover because coverage doesn't understand elipses
23+
def save(self, execution: Execution) -> None: ... # pragma: no cover
24+
def load(self, execution_arn: str) -> Execution: ... # pragma: no cover
25+
def update(self, execution: Execution) -> None: ... # pragma: no cover
26+
def list_all(self) -> list[Execution]: ... # pragma: no cover
27+
28+
29+
__all__ = ["ExecutionStore", "InMemoryExecutionStore", "FileSystemExecutionStore"]

0 commit comments

Comments
 (0)