Skip to content

Commit f4fdc51

Browse files
authored
Merge branch 'main' into andystaples/update-compatible-python-versions
2 parents 5f6824e + 0e95aa8 commit f4fdc51

File tree

7 files changed

+110
-11
lines changed

7 files changed

+110
-11
lines changed

docs/features.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,21 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and
148148

149149
### Retry policies
150150

151-
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
151+
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
152+
153+
### Logging configuration
154+
155+
Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources.
156+
157+
For example, to output logs to a file called `worker.log` at level `DEBUG`, the following syntax might apply:
158+
159+
```python
160+
log_handler = logging.FileHandler('durable.log', encoding='utf-8')
161+
log_handler.setLevel(logging.DEBUG)
162+
163+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
164+
taskhub=taskhub_name, token_credential=credential, log_handler=log_handler) as w:
165+
```
166+
167+
**NOTE**
168+
The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.

durabletask-azuremanaged/durabletask/azuremanaged/client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import logging
5+
46
from typing import Optional
57

68
from azure.core.credentials import TokenCredential
@@ -18,7 +20,9 @@ def __init__(self, *,
1820
taskhub: str,
1921
token_credential: Optional[TokenCredential],
2022
secure_channel: bool = True,
21-
default_version: Optional[str] = None):
23+
default_version: Optional[str] = None,
24+
log_handler: Optional[logging.Handler] = None,
25+
log_formatter: Optional[logging.Formatter] = None):
2226

2327
if not taskhub:
2428
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
@@ -31,5 +35,7 @@ def __init__(self, *,
3135
host_address=host_address,
3236
secure_channel=secure_channel,
3337
metadata=None,
38+
log_handler=log_handler,
39+
log_formatter=log_formatter,
3440
interceptors=interceptors,
3541
default_version=default_version)

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import logging
5+
46
from typing import Optional
57

68
from azure.core.credentials import TokenCredential
@@ -28,6 +30,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
2830
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
2931
for controlling worker concurrency limits. If None, default concurrency
3032
settings will be used.
33+
log_handler (Optional[logging.Handler], optional): Custom logging handler for worker logs.
34+
log_formatter (Optional[logging.Formatter], optional): Custom log formatter for worker logs.
3135
3236
Raises:
3337
ValueError: If taskhub is empty or None.
@@ -52,12 +56,15 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
5256
parameter is set to None since authentication is handled by the
5357
DTS interceptor.
5458
"""
59+
5560
def __init__(self, *,
5661
host_address: str,
5762
taskhub: str,
5863
token_credential: Optional[TokenCredential],
5964
secure_channel: bool = True,
60-
concurrency_options: Optional[ConcurrencyOptions] = None):
65+
concurrency_options: Optional[ConcurrencyOptions] = None,
66+
log_handler: Optional[logging.Handler] = None,
67+
log_formatter: Optional[logging.Formatter] = None):
6168

6269
if not taskhub:
6370
raise ValueError("The taskhub value cannot be empty.")
@@ -70,5 +77,7 @@ def __init__(self, *,
7077
host_address=host_address,
7178
secure_channel=secure_channel,
7279
metadata=None,
80+
log_handler=log_handler,
81+
log_formatter=log_formatter,
7382
interceptors=interceptors,
7483
concurrency_options=concurrency_options)

durabletask/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def lock_entities(self, entities: list[EntityInstanceId]) -> Task[EntityLock]:
201201
pass
202202

203203
@abstractmethod
204-
def call_sub_orchestrator(self, orchestrator: Orchestrator[TInput, TOutput], *,
204+
def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput], str], *,
205205
input: Optional[TInput] = None,
206206
instance_id: Optional[str] = None,
207207
retry_policy: Optional[RetryPolicy] = None,

durabletask/worker.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ class TaskHubGrpcWorker:
246246
Defaults to the value from environment variables or localhost.
247247
metadata (Optional[list[tuple[str, str]]], optional): gRPC metadata to include with
248248
requests. Used for authentication and routing. Defaults to None.
249-
log_handler (optional): Custom logging handler for worker logs. Defaults to None.
249+
log_handler (optional[logging.Handler]): Custom logging handler for worker logs. Defaults to None.
250250
log_formatter (Optional[logging.Formatter], optional): Custom log formatter.
251251
Defaults to None.
252252
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
@@ -314,7 +314,7 @@ def __init__(
314314
*,
315315
host_address: Optional[str] = None,
316316
metadata: Optional[list[tuple[str, str]]] = None,
317-
log_handler=None,
317+
log_handler: Optional[logging.Handler] = None,
318318
log_formatter: Optional[logging.Formatter] = None,
319319
secure_channel: bool = False,
320320
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
@@ -1029,15 +1029,18 @@ def lock_entities(self, entities: list[EntityInstanceId]) -> task.Task[EntityLoc
10291029

10301030
def call_sub_orchestrator(
10311031
self,
1032-
orchestrator: task.Orchestrator[TInput, TOutput],
1032+
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
10331033
*,
10341034
input: Optional[TInput] = None,
10351035
instance_id: Optional[str] = None,
10361036
retry_policy: Optional[task.RetryPolicy] = None,
10371037
version: Optional[str] = None,
10381038
) -> task.Task[TOutput]:
10391039
id = self.next_sequence_number()
1040-
orchestrator_name = task.get_name(orchestrator)
1040+
if isinstance(orchestrator, str):
1041+
orchestrator_name = orchestrator
1042+
else:
1043+
orchestrator_name = task.get_name(orchestrator)
10411044
default_version = self._registry.versioning.default_version if self._registry.versioning else None
10421045
orchestrator_version = version if version else default_version
10431046
self.call_activity_function_helper(
@@ -1233,13 +1236,21 @@ def execute(
12331236
old_events: Sequence[pb.HistoryEvent],
12341237
new_events: Sequence[pb.HistoryEvent],
12351238
) -> ExecutionResults:
1239+
orchestration_name = "<unknown>"
1240+
orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")]
1241+
if len(orchestration_started_events) >= 1:
1242+
orchestration_name = orchestration_started_events[0].executionStarted.name
1243+
1244+
self._logger.debug(
1245+
f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..."
1246+
)
1247+
12361248
self._entity_state = OrchestrationEntityContext(instance_id)
12371249

12381250
if not new_events:
12391251
raise task.OrchestrationStateError(
12401252
"The new history event list must have at least one event in it."
12411253
)
1242-
12431254
ctx = _RuntimeOrchestrationContext(instance_id, self._registry, self._entity_state)
12441255
try:
12451256
# Rebuild local state by replaying old history into the orchestrator function
@@ -1271,13 +1282,15 @@ def execute(
12711282

12721283
except Exception as ex:
12731284
# Unhandled exceptions fail the orchestration
1285+
self._logger.debug(f"{instance_id}: Orchestration {orchestration_name} failed")
12741286
ctx.set_failed(ex)
12751287

12761288
if not ctx._is_complete:
12771289
task_count = len(ctx._pending_tasks)
12781290
event_count = len(ctx._pending_events)
12791291
self._logger.info(
1280-
f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding."
1292+
f"{instance_id}: Orchestrator {orchestration_name} yielded with {task_count} task(s) "
1293+
f"and {event_count} event(s) outstanding."
12811294
)
12821295
elif (
12831296
ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
@@ -1286,7 +1299,7 @@ def execute(
12861299
ctx._completion_status
12871300
)
12881301
self._logger.info(
1289-
f"{instance_id}: Orchestration completed with status: {completion_status_str}"
1302+
f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}"
12901303
)
12911304

12921305
actions = ctx.get_actions()

tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,34 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
175175
assert activity_counter == 30
176176

177177

178+
def test_sub_orchestrator_by_name():
179+
sub_orchestrator_counter = 0
180+
181+
def orchestrator_child(ctx: task.OrchestrationContext, _):
182+
nonlocal sub_orchestrator_counter
183+
sub_orchestrator_counter += 1
184+
185+
def parent_orchestrator(ctx: task.OrchestrationContext, _):
186+
yield ctx.call_sub_orchestrator("orchestrator_child")
187+
188+
# Start a worker, which will connect to the sidecar in a background thread
189+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
190+
taskhub=taskhub_name, token_credential=None) as w:
191+
w.add_orchestrator(orchestrator_child)
192+
w.add_orchestrator(parent_orchestrator)
193+
w.start()
194+
195+
task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
196+
taskhub=taskhub_name, token_credential=None)
197+
id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=None)
198+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
199+
200+
assert state is not None
201+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
202+
assert state.failure_details is None
203+
assert sub_orchestrator_counter == 1
204+
205+
178206
def test_wait_for_multiple_external_events():
179207
def orchestrator(ctx: task.OrchestrationContext, _):
180208
a = yield ctx.wait_for_external_event('A')

tests/durabletask/test_orchestration_e2e.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,32 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
164164
assert activity_counter == 30
165165

166166

167+
def test_sub_orchestrator_by_name():
168+
sub_orchestrator_counter = 0
169+
170+
def orchestrator_child(ctx: task.OrchestrationContext, _):
171+
nonlocal sub_orchestrator_counter
172+
sub_orchestrator_counter += 1
173+
174+
def parent_orchestrator(ctx: task.OrchestrationContext, _):
175+
yield ctx.call_sub_orchestrator("orchestrator_child")
176+
177+
# Start a worker, which will connect to the sidecar in a background thread
178+
with worker.TaskHubGrpcWorker() as w:
179+
w.add_orchestrator(orchestrator_child)
180+
w.add_orchestrator(parent_orchestrator)
181+
w.start()
182+
183+
task_hub_client = client.TaskHubGrpcClient()
184+
id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=None)
185+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
186+
187+
assert state is not None
188+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
189+
assert state.failure_details is None
190+
assert sub_orchestrator_counter == 1
191+
192+
167193
def test_wait_for_multiple_external_events():
168194
def orchestrator(ctx: task.OrchestrationContext, _):
169195
a = yield ctx.wait_for_external_event('A')

0 commit comments

Comments
 (0)