Skip to content

Commit 0cbcf48

Browse files
andystaplesCopilot
andcommitted
Add replay-safe logger for orchestrations
Adds ReplaySafeLogger class and OrchestrationContext.create_replay_safe_logger() so orchestrators can log without generating duplicate messages during replay. The logger wraps a standard logging.Logger and suppresses all log calls when the orchestrator is replaying from history. All standard log levels are supported: debug, info, warning, error, critical, and exception. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 76052ea commit 0cbcf48

File tree

2 files changed

+186
-0
lines changed

2 files changed

+186
-0
lines changed

durabletask/task.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# See https://peps.python.org/pep-0563/
55
from __future__ import annotations
66

7+
import logging
78
import math
89
from abc import ABC, abstractmethod
910
from datetime import datetime, timedelta, timezone
@@ -279,6 +280,74 @@ def new_uuid(self) -> str:
279280
def _exit_critical_section(self) -> None:
280281
pass
281282

283+
def create_replay_safe_logger(self, logger: logging.Logger) -> ReplaySafeLogger:
284+
"""Create a replay-safe logger that suppresses log messages during orchestration replay.
285+
286+
The returned logger wraps the provided logger and only emits log messages when
287+
the orchestrator is not replaying. This prevents duplicate log messages from
288+
appearing as a side effect of orchestration replay.
289+
290+
Parameters
291+
----------
292+
logger : logging.Logger
293+
The underlying logger to wrap.
294+
295+
Returns
296+
-------
297+
ReplaySafeLogger
298+
A logger that only emits log messages when the orchestrator is not replaying.
299+
"""
300+
return ReplaySafeLogger(logger, lambda: self.is_replaying)
301+
302+
303+
class ReplaySafeLogger:
304+
"""A logger wrapper that suppresses log messages during orchestration replay.
305+
306+
This class wraps a standard :class:`logging.Logger` and only emits log
307+
messages when the orchestrator is *not* replaying. Use this to avoid
308+
duplicate log entries that would otherwise appear every time the
309+
orchestrator replays its history.
310+
311+
Obtain an instance by calling :meth:`OrchestrationContext.create_replay_safe_logger`.
312+
"""
313+
314+
def __init__(self, logger: logging.Logger, is_replaying: Callable[[], bool]) -> None:
315+
self._logger = logger
316+
self._is_replaying = is_replaying
317+
318+
def _should_log(self) -> bool:
319+
return not self._is_replaying()
320+
321+
def debug(self, msg: str, *args: Any, **kwargs: Any) -> None:
322+
"""Log a DEBUG-level message if the orchestrator is not replaying."""
323+
if self._should_log():
324+
self._logger.debug(msg, *args, **kwargs)
325+
326+
def info(self, msg: str, *args: Any, **kwargs: Any) -> None:
327+
"""Log an INFO-level message if the orchestrator is not replaying."""
328+
if self._should_log():
329+
self._logger.info(msg, *args, **kwargs)
330+
331+
def warning(self, msg: str, *args: Any, **kwargs: Any) -> None:
332+
"""Log a WARNING-level message if the orchestrator is not replaying."""
333+
if self._should_log():
334+
self._logger.warning(msg, *args, **kwargs)
335+
336+
def error(self, msg: str, *args: Any, **kwargs: Any) -> None:
337+
"""Log an ERROR-level message if the orchestrator is not replaying."""
338+
if self._should_log():
339+
self._logger.error(msg, *args, **kwargs)
340+
341+
def critical(self, msg: str, *args: Any, **kwargs: Any) -> None:
342+
"""Log a CRITICAL-level message if the orchestrator is not replaying."""
343+
if self._should_log():
344+
self._logger.critical(msg, *args, **kwargs)
345+
346+
def exception(self, msg: str, *args: Any, **kwargs: Any) -> None:
347+
"""Log an ERROR-level message with exception info if the orchestrator is not replaying."""
348+
if self._should_log():
349+
self._logger.exception(msg, *args, **kwargs)
350+
282351

283352
class FailureDetails:
284353
def __init__(self, message: str, error_type: str, stack_trace: Optional[str]):

tests/durabletask/test_orchestration_executor.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,123 @@ def orchestrator(ctx: task.OrchestrationContext, _):
11521152
assert complete_action.result.value == encoded_output
11531153

11541154

1155+
def test_replay_safe_logger_suppresses_during_replay():
1156+
"""Validates that the replay-safe logger suppresses log messages during replay."""
1157+
log_calls: list[str] = []
1158+
1159+
class _RecordingHandler(logging.Handler):
1160+
def emit(self, record: logging.LogRecord) -> None:
1161+
log_calls.append(record.getMessage())
1162+
1163+
inner_logger = logging.getLogger("test_replay_safe_logger")
1164+
inner_logger.setLevel(logging.DEBUG)
1165+
inner_logger.addHandler(_RecordingHandler())
1166+
1167+
activity_name = "say_hello"
1168+
1169+
def say_hello(_, name: str) -> str:
1170+
return f"Hello, {name}!"
1171+
1172+
def orchestrator(ctx: task.OrchestrationContext, _):
1173+
replay_logger = ctx.create_replay_safe_logger(inner_logger)
1174+
replay_logger.info("Starting orchestration")
1175+
result = yield ctx.call_activity(say_hello, input="World")
1176+
replay_logger.info("Activity completed: %s", result)
1177+
return result
1178+
1179+
registry = worker._Registry()
1180+
activity_name = registry.add_activity(say_hello)
1181+
orchestrator_name = registry.add_orchestrator(orchestrator)
1182+
1183+
# First execution: starts the orchestration. The orchestrator runs without
1184+
# replay, so both log calls should be emitted.
1185+
new_events = [
1186+
helpers.new_orchestrator_started_event(datetime.now()),
1187+
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
1188+
]
1189+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1190+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1191+
assert result.actions # should have scheduled the activity
1192+
1193+
assert log_calls == ["Starting orchestration"]
1194+
log_calls.clear()
1195+
1196+
# Second execution: the orchestrator replays from history and then processes the
1197+
# activity completion. The "Starting orchestration" message is emitted during
1198+
# replay and should be suppressed; "Activity completed" is emitted after replay
1199+
# ends and should appear exactly once.
1200+
old_events = new_events + [
1201+
helpers.new_task_scheduled_event(1, activity_name),
1202+
]
1203+
encoded_output = json.dumps(say_hello(None, "World"))
1204+
new_events = [helpers.new_task_completed_event(1, encoded_output)]
1205+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1206+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1207+
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
1208+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
1209+
1210+
assert log_calls == ["Activity completed: Hello, World!"]
1211+
1212+
1213+
def test_replay_safe_logger_all_levels():
1214+
"""Validates that all log levels are suppressed during replay and emitted otherwise."""
1215+
log_levels: list[str] = []
1216+
1217+
class _LevelRecorder(logging.Handler):
1218+
def emit(self, record: logging.LogRecord) -> None:
1219+
log_levels.append(record.levelname)
1220+
1221+
inner_logger = logging.getLogger("test_replay_safe_logger_levels")
1222+
inner_logger.setLevel(logging.DEBUG)
1223+
inner_logger.addHandler(_LevelRecorder())
1224+
1225+
def orchestrator(ctx: task.OrchestrationContext, _):
1226+
replay_logger = ctx.create_replay_safe_logger(inner_logger)
1227+
replay_logger.debug("debug msg")
1228+
replay_logger.info("info msg")
1229+
replay_logger.warning("warning msg")
1230+
replay_logger.error("error msg")
1231+
replay_logger.critical("critical msg")
1232+
return "done"
1233+
1234+
registry = worker._Registry()
1235+
orchestrator_name = registry.add_orchestrator(orchestrator)
1236+
1237+
new_events = [
1238+
helpers.new_orchestrator_started_event(datetime.now()),
1239+
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
1240+
]
1241+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1242+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1243+
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
1244+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
1245+
1246+
assert log_levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
1247+
1248+
1249+
def test_replay_safe_logger_direct():
1250+
"""Unit test for ReplaySafeLogger — verifies suppression based on is_replaying flag."""
1251+
log_calls: list[str] = []
1252+
1253+
class _RecordingHandler(logging.Handler):
1254+
def emit(self, record: logging.LogRecord) -> None:
1255+
log_calls.append(record.getMessage())
1256+
1257+
inner_logger = logging.getLogger("test_replay_safe_logger_direct")
1258+
inner_logger.setLevel(logging.DEBUG)
1259+
inner_logger.addHandler(_RecordingHandler())
1260+
1261+
replaying = True
1262+
replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying)
1263+
1264+
replay_logger.info("should be suppressed")
1265+
assert log_calls == []
1266+
1267+
replaying = False
1268+
replay_logger.info("should appear")
1269+
assert log_calls == ["should appear"]
1270+
1271+
11551272
def test_when_any_with_retry():
11561273
"""Tests that a when_any pattern works correctly with retries"""
11571274
def dummy_activity(_, inp: str):

0 commit comments

Comments
 (0)