fix: use task-stream index instead of wall clock in get_task_stream context manager (#9253)#9282
fix: use task-stream index instead of wall clock in get_task_stream context manager (#9253)#9282MohammadYusif wants to merge 1 commit into
Conversation
…ontext manager (dask#9253) The get_task_stream context manager bounded the collected tasks with a wall-clock timestamp (time() - 0.1). collect() then bisected the buffer by comparing that boundary against each task's recorded stop time. When there was latency or clock skew between the client and the workers, a task that finished inside the block could carry a stop time earlier than the client's start boundary and be silently dropped, so get_task_stream() returned no tasks. Record the scheduler's monotonic task-stream append index on entry and collect everything appended after it on exit. This removes the dependency on synchronized clocks entirely, as the maintainers' FIXME suggested. Adds a get_task_stream_index scheduler RPC and a start_index path through collect()/get_task_stream(), with tests covering the index semantics and the clock-skew regression.
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 31 files ± 0 31 suites ±0 10h 49m 37s ⏱️ + 14m 21s For more details on these failures, see this check. Results for commit 2d46d3f. ± Comparison against base commit bcad953. This pull request skips 1 test. |
crusaderky
left a comment
There was a problem hiding this comment.
Some minor critiques
| plot=False, | ||
| filename="task-stream.html", | ||
| bokeh_resources=None, | ||
| start_index=None, |
There was a problem hiding this comment.
| start_index=None, |
this should not be exposed in the public API
| plot=plot, | ||
| filename=filename, | ||
| bokeh_resources=bokeh_resources, | ||
| start_index=start_index, |
There was a problem hiding this comment.
| start_index=start_index, |
| return self | ||
|
|
||
| async def __aexit__(self, exc_type, exc_value, traceback): | ||
| L = await self.client.get_task_stream( |
There was a problem hiding this comment.
| L = await self.client.get_task_stream( | |
| L = await self.client._get_task_stream( |
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc_value, traceback): | ||
| L = self.client.get_task_stream( |
There was a problem hiding this comment.
| L = self.client.get_task_stream( | |
| L = self.client.sync(self.client._get_task_stream( |
| L = self.client.get_task_stream( | ||
| start=self.start, plot=self._plot, filename=self._filename | ||
| start_index=self._start_index, plot=self._plot, filename=self._filename | ||
| ) |
| def test_collect_start_index_ignores_clock(): | ||
| # When the worker clock lags the client clock (or there is latency), a task | ||
| # can finish with a recorded stop time that is earlier than the client's | ||
| # ``start`` boundary. The time-based collection then drops the task, which | ||
| # is the latency/clock-skew failure from the original bug report. The | ||
| # index-based path must still return it. | ||
| plugin = TaskStreamPlugin.__new__(TaskStreamPlugin) | ||
| plugin.buffer = deque() | ||
| plugin.index = 0 | ||
|
|
||
| now = time() | ||
| plugin.buffer.append({"key": "task", "startstops": [{"stop": now - 100}]}) | ||
| plugin.index += 1 | ||
|
|
||
| # Time-based collection misses the task because its stop time is in the past. | ||
| assert plugin.collect(start=now) == [] | ||
| # Index-based collection captures it regardless of the clock. | ||
| assert len(plugin.collect(start_index=0)) == 1 | ||
|
|
||
|
|
There was a problem hiding this comment.
This test is very hacky and IMHO over-engineered; I'd rather stick to the public API whenever possible.
It should just be removed.
Closes #9253
pre-commit run --all-filesThe
get_task_streamcontext manager bounded the tasks it collected with awall-clock timestamp (
time() - 0.1), andcollect()bisects the buffer bycomparing that boundary against each task's recorded
stoptime. With latencyor clock skew between the client and the workers, a task that finished inside
the block can carry a
stoptime earlier than the client'sstartboundaryand be silently dropped — so
get_task_stream()returns no tasks. This matchesthe existing
# FIXME ... We should query TaskStreamPlugin.index instead.This records the scheduler's monotonic task-stream append index on entry and
collects everything appended after it on exit, removing the dependency on
synchronized clocks.
distributed/diagnostics/task_stream.py:collect()gains astart_indexpath that selects records by append position instead of timestamp.
distributed/scheduler.py: newget_task_stream_indexRPC +start_indexpassthrough on
get_task_stream. Refactored the plugin-init guard into_task_stream_plugin()helper used by both methods.distributed/client.py:get_task_stream/_get_task_streamforwardstart_index; the context manager snapshots the index on enter and collectsfrom it on exit (sync and async), removing the FIXME comment.
distributed/diagnostics/tests/test_task_stream.py: tests for the indexsemantics and the clock-skew regression.