1818 OperationType ,
1919 OperationUpdate ,
2020)
21+ from aws_durable_execution_sdk_python .threading import OrderedLock
2122
22- # Import AWS exceptions
2323from aws_durable_execution_sdk_python_testing .exceptions import (
2424 IllegalStateException ,
2525 InvalidParameterValueException ,
@@ -46,11 +46,24 @@ def __init__(
4646 self .updates : list [OperationUpdate ] = []
4747 self .used_tokens : set [str ] = set ()
4848 # TODO: this will need to persist/rehydrate depending on inmemory vs sqllite store
49- self .token_sequence : int = 0
49+
50+ self ._token_sequence : int = 0
51+ self ._state_lock : OrderedLock = OrderedLock ()
5052 self .is_complete : bool = False
5153 self .result : DurableExecutionInvocationOutput | None = None
5254 self .consecutive_failed_invocation_attempts : int = 0
5355
56+ @property
57+ def token_sequence (self ) -> int :
58+ """Get current token sequence value."""
59+ return self ._token_sequence
60+
61+ @token_sequence .setter
62+ def token_sequence (self , value : int ) -> None :
63+ """Set token sequence value."""
64+ with self ._state_lock :
65+ self ._token_sequence = value
66+
5467 @staticmethod
5568 def new (input : StartDurableExecutionInput ) -> Execution : # noqa: A002
5669 # make a nicer arn
@@ -68,7 +81,7 @@ def to_dict(self) -> dict[str, Any]:
6881 "Operations" : [op .to_dict () for op in self .operations ],
6982 "Updates" : [update .to_dict () for update in self .updates ],
7083 "UsedTokens" : list (self .used_tokens ),
71- "TokenSequence" : self .token_sequence ,
84+ "TokenSequence" : self ._token_sequence ,
7285 "IsComplete" : self .is_complete ,
7386 "Result" : self .result .to_dict () if self .result else None ,
7487 "ConsecutiveFailedInvocationAttempts" : self .consecutive_failed_invocation_attempts ,
@@ -109,23 +122,23 @@ def from_dict(cls, data: dict[str, Any]) -> Execution:
109122 return execution
110123
111124 def start (self ) -> None :
112- # not thread safe, prob should be
113125 if self .start_input .invocation_id is None :
114126 msg : str = "invocation_id is required"
115127 raise InvalidParameterValueException (msg )
116- self .operations .append (
117- Operation (
118- operation_id = self .start_input .invocation_id ,
119- parent_id = None ,
120- name = self .start_input .execution_name ,
121- start_timestamp = datetime .now (UTC ),
122- operation_type = OperationType .EXECUTION ,
123- status = OperationStatus .STARTED ,
124- execution_details = ExecutionDetails (
125- input_payload = json .dumps (self .start_input .input )
126- ),
128+ with self ._state_lock :
129+ self .operations .append (
130+ Operation (
131+ operation_id = self .start_input .invocation_id ,
132+ parent_id = None ,
133+ name = self .start_input .execution_name ,
134+ start_timestamp = datetime .now (UTC ),
135+ operation_type = OperationType .EXECUTION ,
136+ status = OperationStatus .STARTED ,
137+ execution_details = ExecutionDetails (
138+ input_payload = json .dumps (self .start_input .input )
139+ ),
140+ )
127141 )
128- )
129142
130143 def get_operation_execution_started (self ) -> Operation :
131144 if not self .operations :
@@ -137,15 +150,16 @@ def get_operation_execution_started(self) -> Operation:
137150
138151 def get_new_checkpoint_token (self ) -> str :
139152 """Generate a new checkpoint token with incremented sequence"""
140- # TODO: not thread safe and it should be
141- self .token_sequence += 1
142- new_token_sequence = self .token_sequence
143- token = CheckpointToken (
144- execution_arn = self .durable_execution_arn , token_sequence = new_token_sequence
145- )
146- token_str = token .to_str ()
147- self .used_tokens .add (token_str )
148- return token_str
153+ with self ._state_lock :
154+ self ._token_sequence += 1
155+ new_token_sequence = self ._token_sequence
156+ token = CheckpointToken (
157+ execution_arn = self .durable_execution_arn ,
158+ token_sequence = new_token_sequence ,
159+ )
160+ token_str = token .to_str ()
161+ self .used_tokens .add (token_str )
162+ return token_str
149163
150164 def get_navigable_operations (self ) -> list [Operation ]:
151165 """Get list of operations, but exclude child operations where the parent has already completed."""
@@ -205,17 +219,16 @@ def complete_wait(self, operation_id: str) -> Operation:
205219 )
206220 raise IllegalStateException (msg_not_wait )
207221
208- # TODO: make thread-safe. Increment sequence
209- self .token_sequence += 1
210-
211- # Build and assign updated operation
212- self .operations [index ] = replace (
213- operation ,
214- status = OperationStatus .SUCCEEDED ,
215- end_timestamp = datetime .now (UTC ),
216- )
217-
218- return self .operations [index ]
222+ # Thread-safe increment sequence and operation update
223+ with self ._state_lock :
224+ self ._token_sequence += 1
225+ # Build and assign updated operation
226+ self .operations [index ] = replace (
227+ operation ,
228+ status = OperationStatus .SUCCEEDED ,
229+ end_timestamp = datetime .now (UTC ),
230+ )
231+ return self .operations [index ]
219232
220233 def complete_retry (self , operation_id : str ) -> Operation :
221234 """Complete STEP retry when timer fires."""
@@ -231,21 +244,21 @@ def complete_retry(self, operation_id: str) -> Operation:
231244 )
232245 raise IllegalStateException (msg_not_step )
233246
234- # TODO: make thread-safe. Increment sequence
235- self .token_sequence += 1
236-
237- # Build updated step_details with cleared next_attempt_timestamp
238- new_step_details = None
239- if operation .step_details :
240- new_step_details = replace (
241- operation .step_details , next_attempt_timestamp = None
247+ # Thread-safe increment sequence and operation update
248+ with self ._state_lock :
249+ self ._token_sequence += 1
250+ # Build updated step_details with cleared next_attempt_timestamp
251+ new_step_details = None
252+ if operation .step_details :
253+ new_step_details = replace (
254+ operation .step_details , next_attempt_timestamp = None
255+ )
256+
257+ # Build updated operation
258+ updated_operation = replace (
259+ operation , status = OperationStatus .READY , step_details = new_step_details
242260 )
243261
244- # Build updated operation
245- updated_operation = replace (
246- operation , status = OperationStatus .READY , step_details = new_step_details
247- )
248-
249- # Assign
250- self .operations [index ] = updated_operation
251- return updated_operation
262+ # Assign
263+ self .operations [index ] = updated_operation
264+ return updated_operation
0 commit comments