Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
872e75c
Add support for async activities
seherv May 22, 2026
413f16f
Merge branch 'main' into async-compat
seherv May 27, 2026
b52139c
Address Copilot feedback (1)
seherv Jun 1, 2026
0c64d1a
Address Copilot feedback (2)
seherv Jun 1, 2026
69ea96e
Address Copilot feedback (3)
seherv Jun 1, 2026
81fa323
Address Copilot feedback (4)
seherv Jun 1, 2026
dadaa9a
Fix linter
seherv Jun 1, 2026
8c4ce88
Merge branch 'main' into async-compat
seherv Jun 1, 2026
e8c4c05
Address Copilot feedback (5)
seherv Jun 1, 2026
7ec820e
Address Copilot feedback (6)
seherv Jun 2, 2026
9185482
Merge branch 'main' into async-compat
seherv Jun 2, 2026
73add2e
Reword warning
seherv Jun 2, 2026
5709bcd
Cleanup
seherv Jun 2, 2026
e68141c
Remove strands-agents-tools dependency
seherv Jun 2, 2026
feb60db
Redo benchmarks and add performance regression tests
seherv Jun 2, 2026
70c6fad
Merge branch 'main' into async-compat
seherv Jun 2, 2026
5fb88e6
Relax performance thresholds for CI
seherv Jun 2, 2026
6eb9ce0
More async detection tests
seherv Jun 3, 2026
8cf248b
Create gRPC channel in the caller's event loop
seherv Jun 9, 2026
a73e994
Silence gRPC error spam on EAGAIN
seherv Jun 9, 2026
a2d4ad8
Merge branch 'main' into async-compat
sicoyle Jun 9, 2026
b1e0c3f
Remove async benchmark code
seherv Jun 10, 2026
a4d23de
Address PR feedback (2)
seherv Jun 11, 2026
3f80ed6
Merge branch 'main' into async-compat
seherv Jun 11, 2026
7f86c10
Merge branch 'main' into async-compat
sicoyle Jun 11, 2026
67915a2
Merge origin/main into async-compat
seherv Jun 12, 2026
152f058
Update docs to match new dapr[ext] structure
seherv Jun 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion dapr/ext/workflow/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,26 @@ The entry point for registration and lifecycle:

Internally wraps user functions: workflow functions get a `DaprWorkflowContext`, activity functions get a `WorkflowActivityContext`. Tracks registration state via `_workflow_registered` / `_activity_registered` attributes on functions to prevent double registration.

#### Sync and async activities

Activities can be either `def my_activity(ctx, inp)` or `async def my_activity(ctx, inp)`. At registration, `_make_activity_wrapper` calls `_is_async_callable(fn)` to detect async-ness. That helper unwraps `functools.partial`, `@functools.wraps` chains, and callable-class `__call__` so common decorator patterns route correctly. The wrapper is built `async def` or `def` to match, then stored in the registry.

At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `is_async_callable(activity_fn)` on the wrapper selects between two handlers.

- **Async activities** go through `_execute_activity_async`, then `_ActivityExecutor.execute_async`, which awaits `fn(...)` directly on the event loop. The gRPC response is delivered via `loop.run_in_executor(self._async_worker_manager.thread_pool, stub.CompleteActivityTask, ...)` — the same pool sync activities use, sized by `maximum_thread_pool_workers`.
- **Sync activities** go through `_execute_activity`, dispatched to the thread pool by `_AsyncWorkerManager._run_func`. The activity runs on a worker thread, and the response is delivered from the same thread.

Workflow (orchestrator) functions must remain generators (`def` with `yield`). They cannot be `async def` because durabletask's deterministic replay depends on synchronous generator semantics. Only activities support async.

**Decorator ordering gotcha.** Wrapping `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them.

**`maximum_thread_pool_workers` covers both paths.** This knob sizes the worker thread pool used for sync-activity bodies and for async-activity gRPC response sends. Mixed workloads with long-running sync activities can starve async response delivery (and vice versa) since they share the pool — size to the sum of peak sync activity concurrency and peak in-flight async response sends.
Comment thread
seherv marked this conversation as resolved.

**Concurrency sizing and load characterization.** See `docs/concurrency.md` for sizing recommendations (`maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`) and an async-vs-sync decision tree. `tests/ext/workflow/durabletask/test_async_dispatch_regression.py` (marked `perf`) guards the core invariant: a batch of async activities overlaps on the event loop instead of serializing through the thread pool.

**grpc.aio poller log noise.** The async client can emit benign `BlockingIOError: [Errno 11]` ERROR lines from `grpc.aio`'s `PollerCompletionQueue` under load. It is harmless and retried. `get_grpc_aio_channel` installs an internal `asyncio`-logger filter (`_silence_grpc_aio_poller_noise`) that drops only those records, so the SDK suppresses it automatically with no user action.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I like using AGENTS.md as a development log of sorts to not waste time rediscovering old issues and rereading the code base. That's just a personal preference though, not sure how much we should let this file grow



### DaprWorkflowClient (`dapr_workflow_client.py`)

Client for workflow lifecycle management:
Expand Down Expand Up @@ -165,7 +185,7 @@ Retry configuration for activities and child workflows:
1. **Registration**: User decorates functions with `@wfr.workflow` / `@wfr.activity`. The runtime wraps them and stores them in the durabletask worker's registry.
2. **Startup**: `wfr.start()` opens a gRPC stream to the Dapr sidecar. The worker polls for work items.
3. **Scheduling**: Client calls `schedule_new_workflow(fn, input=...)`. The function's name (or `_dapr_alternate_name`) is sent to the backend.
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). The engine records history; on replay, yielded tasks return cached results without re-executing.
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). Activity functions are either sync (dispatched to the worker's thread pool) or `async def` (awaited directly on the worker's event loop). The engine records history; on replay, yielded tasks return cached results without re-executing.
5. **Determinism**: Workflows must be deterministic — no random, no wall-clock time, no I/O. Use `ctx.current_utc_datetime` instead of `datetime.now()`. Use `ctx.is_replaying` to guard side effects like logging.
6. **Completion**: Client polls via `wait_for_workflow_completion()` or `get_workflow_state()`.

Expand Down Expand Up @@ -193,6 +213,7 @@ Two example directories exercise workflows:
- `cross-app1.py`, `cross-app2.py`, `cross-app3.py` — cross-app calls
- `versioning.py` — workflow versioning with `is_patched()`
- `simple_aio_client.py` — async client variant
- `async_activities.py` — `async def` activities (fan-out/fan-in with simulated I/O, configurable payload sizes)

## Testing

Expand Down
50 changes: 32 additions & 18 deletions dapr/ext/workflow/_durabletask/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,32 @@ def __init__(
else:
interceptors = None

channel = get_grpc_aio_channel(
host_address=host_address,
secure_channel=secure_channel,
interceptors=interceptors,
options=channel_options,
)
self._channel = channel
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._host_address = host_address
self._secure_channel = secure_channel
self._interceptors = interceptors
self._channel_options = channel_options
self._channel: grpc.aio.Channel | None = None
self._stub: stubs.TaskHubSidecarServiceStub | None = None
self._logger = shared.get_logger('client', log_handler, log_formatter)

def _get_stub(self) -> stubs.TaskHubSidecarServiceStub:
"""Lazily create the channel and stub on first use.

Async grpc binds a channel to the loop active at creation, deferring it avoids binding to the wrong loop.
"""
if self._stub is None:
self._channel = get_grpc_aio_channel(
host_address=self._host_address,
secure_channel=self._secure_channel,
interceptors=self._interceptors,
options=self._channel_options,
)
self._stub = stubs.TaskHubSidecarServiceStub(self._channel)
return self._stub

async def aclose(self):
await self._channel.close()
if self._channel is not None:
await self._channel.close()

async def __aenter__(self):
return self
Expand Down Expand Up @@ -113,14 +127,14 @@ async def schedule_new_orchestration(
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
res: pb.CreateInstanceResponse = await self._get_stub().StartInstance(req)
return res.instanceId

async def get_orchestration_state(
self, instance_id: str, *, fetch_payloads: bool = True
) -> Optional[WorkflowState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
res: pb.GetInstanceResponse = await self._stub.GetInstance(req)
res: pb.GetInstanceResponse = await self._get_stub().GetInstance(req)
return new_orchestration_state(req.instanceId, res)

async def wait_for_orchestration_start(
Expand All @@ -132,7 +146,7 @@ async def wait_for_orchestration_start(
)

async def _call(grpc_timeout):
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceStart(
res: pb.GetInstanceResponse = await self._get_stub().WaitForInstanceStart(
req, timeout=grpc_timeout
)
return new_orchestration_state(req.instanceId, res)
Expand All @@ -151,7 +165,7 @@ async def wait_for_orchestration_completion(
)

async def _call(grpc_timeout):
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceCompletion(
res: pb.GetInstanceResponse = await self._get_stub().WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
state = new_orchestration_state(req.instanceId, res)
Expand Down Expand Up @@ -262,7 +276,7 @@ async def raise_orchestration_event(
)

self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
await self._stub.RaiseEvent(req)
await self._get_stub().RaiseEvent(req)

async def terminate_orchestration(
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
Expand All @@ -274,19 +288,19 @@ async def terminate_orchestration(
)

self._logger.info(f"Terminating instance '{instance_id}'.")
await self._stub.TerminateInstance(req)
await self._get_stub().TerminateInstance(req)

async def suspend_orchestration(self, instance_id: str):
req = pb.SuspendRequest(instanceId=instance_id)
self._logger.info(f"Suspending instance '{instance_id}'.")
await self._stub.SuspendInstance(req)
await self._get_stub().SuspendInstance(req)

async def resume_orchestration(self, instance_id: str):
req = pb.ResumeRequest(instanceId=instance_id)
self._logger.info(f"Resuming instance '{instance_id}'.")
await self._stub.ResumeInstance(req)
await self._get_stub().ResumeInstance(req)

async def purge_orchestration(self, instance_id: str, recursive: bool = True):
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
self._logger.info(f"Purging instance '{instance_id}'.")
await self._stub.PurgeInstances(req)
await self._get_stub().PurgeInstances(req)
27 changes: 27 additions & 0 deletions dapr/ext/workflow/_durabletask/aio/internal/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Optional, Sequence, Union

import grpc
Expand All @@ -28,6 +29,30 @@
grpc_aio.StreamStreamClientInterceptor,
]

_POLLER_NOISE_MARKER = 'PollerCompletionQueue._handle_events'


class _GrpcAioPollerNoiseFilter(logging.Filter):
"""Drops the harmless grpc.aio poller BlockingIOError (EAGAIN) records.

The poller does a non-blocking read on its wake-up fd and can get EAGAIN, which
asyncio logs at ERROR even though the read is retried and nothing is lost.
"""

def filter(self, record: logging.LogRecord) -> bool:
exc = record.exc_info[1] if record.exc_info else None
is_poller_noise = isinstance(exc, BlockingIOError) and (
_POLLER_NOISE_MARKER in record.getMessage()
)
return not is_poller_noise


def _silence_grpc_aio_poller_noise() -> None:
Comment thread
seherv marked this conversation as resolved.
"""Install the poller-noise filter on the asyncio logger if not already present."""
asyncio_logger = logging.getLogger('asyncio')
if not any(isinstance(f, _GrpcAioPollerNoiseFilter) for f in asyncio_logger.filters):
asyncio_logger.addFilter(_GrpcAioPollerNoiseFilter())


def get_grpc_aio_channel(
host_address: Optional[str],
Expand All @@ -43,6 +68,8 @@ def get_grpc_aio_channel(
interceptors: Optional sequence of client interceptors to apply to the channel.
options: Optional sequence of gRPC channel options as (key, value) tuples. Keys defined in https://grpc.github.io/grpc/core/group__grpc__arg__keys.html
"""
_silence_grpc_aio_poller_noise()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only on the asyncio side of things?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, grpc.aio spams the error logs when their client is used from multiple event loops, and that was the case for FastAPI applications using this SDK. Nothing was actually an error but the logs got extremely noisy in Linux.

It got fixed on their 1.80.0 release, as soon as we update to that dep (in a separate PR ofc) we can delete this


if host_address is None:
host_address = get_default_host_address()

Expand Down
28 changes: 28 additions & 0 deletions dapr/ext/workflow/_durabletask/internal/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# limitations under the License.

import dataclasses
import functools
import inspect
import json
import logging
import os
Expand All @@ -20,6 +22,32 @@

from dapr.ext.workflow import _model_protocol

logger = logging.getLogger(__name__)


def is_async_callable(fn: Any) -> bool:
Comment thread
seherv marked this conversation as resolved.
"""Return True if ``fn`` is async. Catches ``functools.partial`` of coroutines,
sync decorators that wrap async functions, and callable instances with ``async __call__``.
"""
candidate = fn
while isinstance(candidate, functools.partial):
candidate = candidate.func
if callable(candidate):
try:
candidate = inspect.unwrap(candidate)
except ValueError:
# Cyclic ``__wrapped__`` chain from a malformed decorator. Fall back to the
# outermost callable; misclassification is preferable to crashing dispatch.
logger.warning(
f'Cyclic __wrapped__ on {fn!r}, using outermost callable for async detection.'
)
if inspect.iscoroutinefunction(candidate):
return True
if not inspect.isfunction(candidate) and hasattr(candidate, '__call__'):
return inspect.iscoroutinefunction(candidate.__call__)
return False
Comment thread
seherv marked this conversation as resolved.


ClientInterceptor = Union[
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
Expand Down
Loading
Loading