fix: handle and propagate errors in event channel monitors#573
fix: handle and propagate errors in event channel monitors#573xdustinface merged 3 commits intov0.42-devfrom
Conversation
📝 WalkthroughWalkthroughMonitor tasks now take an Changes
Sequence DiagramsequenceDiagram
participant Coordinator as Coordinator
participant BroadcastMonitor as Broadcast Monitor
participant ProgressMonitor as Progress Monitor
participant Handler as Event Handler
participant FailureChan as Failure mpsc
BroadcastMonitor->>BroadcastMonitor: detect RecvError::Closed / RecvError::Lagged
BroadcastMonitor->>Handler: log / (no immediate on_error)
BroadcastMonitor->>FailureChan: try_send("broadcast monitor: ...")
BroadcastMonitor->>BroadcastMonitor: break loop
ProgressMonitor->>ProgressMonitor: detect watch changed() closure/error
ProgressMonitor->>Handler: log / (no immediate on_error)
ProgressMonitor->>FailureChan: try_send("progress monitor channel closed unexpectedly")
ProgressMonitor->>ProgressMonitor: break loop
Coordinator->>FailureChan: await monitor_failure_rx.recv()
FailureChan-->>Coordinator: deliver failure message
Coordinator->>Coordinator: return SpvError::ChannelFailure(...), join monitors, call handler.on_error afterwards
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dash-spv/src/client/sync_coordinator.rs (1)
53-85:⚠️ Potential issue | 🟠 MajorReturn the real monitor failure instead of synthesizing
"broadcast receiver lagged".All four monitors cancel the same token, but this branch always constructs
ChannelFailure("event monitor", "broadcast receiver lagged"). That misreports progress-channel failures and unexpected broadcast closes, and it drops the more specific failure message already produced by the monitor task. Please propagate the concrete failure back intorun()and return that here. As per coding guidelines, "Use proper error types (thiserror) and propagate errors appropriately in Rust code".Also applies to: 106-111
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/client/sync_coordinator.rs` around lines 53 - 85, The code in run() currently maps any monitor cancellation to a synthesized ChannelFailure("event monitor", "broadcast receiver lagged") which hides the real cause; change the logic that handles monitor_shutdown/monitor_failure so it returns the concrete failure produced by the monitor tasks (propagate the actual error from monitor_failure) instead of constructing the static "broadcast receiver lagged" message. Locate the monitor spawns (spawn_broadcast_monitor and spawn_progress_monitor) and the branch that creates ChannelFailure("event monitor", "broadcast receiver lagged") and replace it so run() awaits or inspects monitor_failure (the propagated error type) and returns that error (preserving its thiserror type) when cancelling the token; ensure the same fix is applied for the second occurrence referenced (lines ~106-111).
🧹 Nitpick comments (1)
dash-spv/src/client/event_handler.rs (1)
242-263: Add a teardown test that closes the channel aftershutdown.cancel().The new guards on Line 55 and Line 98 only matter when the channel closes after shutdown has been requested. The added tests cover unexpected drops and explicit shutdown separately, but they still don't exercise that path, so a regression there would go unnoticed. As per coding guidelines, "Write unit tests for new functionality in Rust code" and "Place unit tests alongside code with
#[cfg(test)]attribute".Also applies to: 319-337
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/client/event_handler.rs` around lines 242 - 263, Add a new unit test placed alongside the existing tests (under #[cfg(test)]) that exercises the path where the broadcast channel is closed after shutdown has been requested: create a broadcast::channel::<SyncEvent>, spawn the same spawn_broadcast_monitor using the RecordingHandler, call shutdown.cancel() first, then drop the sender to close the channel, await the task, and assert the handler/error and on_failure behavior matches expectations; reference the existing test broadcast_monitor_fails_on_unexpected_channel_close and reuse its setup (RecordingHandler, shutdown CancellationToken, on_failure CancellationToken, and spawn_broadcast_monitor invocation) but invert the order of shutdown.cancel() and drop(tx) so the guards on lines 55 and 98 are exercised.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@dash-spv/src/client/sync_coordinator.rs`:
- Around line 53-85: The code in run() currently maps any monitor cancellation
to a synthesized ChannelFailure("event monitor", "broadcast receiver lagged")
which hides the real cause; change the logic that handles
monitor_shutdown/monitor_failure so it returns the concrete failure produced by
the monitor tasks (propagate the actual error from monitor_failure) instead of
constructing the static "broadcast receiver lagged" message. Locate the monitor
spawns (spawn_broadcast_monitor and spawn_progress_monitor) and the branch that
creates ChannelFailure("event monitor", "broadcast receiver lagged") and replace
it so run() awaits or inspects monitor_failure (the propagated error type) and
returns that error (preserving its thiserror type) when cancelling the token;
ensure the same fix is applied for the second occurrence referenced (lines
~106-111).
---
Nitpick comments:
In `@dash-spv/src/client/event_handler.rs`:
- Around line 242-263: Add a new unit test placed alongside the existing tests
(under #[cfg(test)]) that exercises the path where the broadcast channel is
closed after shutdown has been requested: create a
broadcast::channel::<SyncEvent>, spawn the same spawn_broadcast_monitor using
the RecordingHandler, call shutdown.cancel() first, then drop the sender to
close the channel, await the task, and assert the handler/error and on_failure
behavior matches expectations; reference the existing test
broadcast_monitor_fails_on_unexpected_channel_close and reuse its setup
(RecordingHandler, shutdown CancellationToken, on_failure CancellationToken, and
spawn_broadcast_monitor invocation) but invert the order of shutdown.cancel()
and drop(tx) so the guards on lines 55 and 98 are exercised.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: e4a090ec-4d14-4e67-b83c-1db7a7f43090
📒 Files selected for processing (2)
dash-spv/src/client/event_handler.rsdash-spv/src/client/sync_coordinator.rs
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## v0.42-dev #573 +/- ##
=============================================
+ Coverage 67.02% 67.06% +0.03%
=============================================
Files 320 320
Lines 67249 67293 +44
=============================================
+ Hits 45073 45128 +55
+ Misses 22176 22165 -11
|
31549f0 to
1f3f4c0
Compare
becff66 to
9ec7541
Compare
|
This PR has merge conflicts with the base branch. Please rebase or merge the base branch into your branch to resolve them. |
aaddece to
23accc0
Compare
The base branch was changed.
- Add `on_failure` cancellation token to `spawn_broadcast_monitor` and `spawn_progress_monitor` to signal the run loop on fatal monitor errors - Treat Lagged broadcast errors as fatal, lost events are unrecoverable at the moment - Distinguish expected vs unexpected channel close by checking the shutdown token - Run loop watches `monitor_failure.cancelled()` and exits with `ChannelFailure` error
1f3f4c0 to
f5d1e71
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@dash-spv/src/client/event_handler.rs`:
- Around line 68-74: The Lagged arm handling (matching
broadcast::error::RecvError::Lagged) currently unconditionally logs, calls
handler.on_error and on_failure.cancel(), which can spuriously trigger a
ChannelFailure during shutdown; wrap the Lagged branch with the same shutdown
check used in the Closed handler (i.e., skip the error handling and do not call
on_failure.cancel() when shutdown has already been requested) so that when
shutdown is active the code no-ops instead of propagating failure.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 91a3b02e-d6d3-4869-a357-9c76c61d224c
📒 Files selected for processing (2)
dash-spv/src/client/event_handler.rsdash-spv/src/client/sync_coordinator.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- dash-spv/src/client/sync_coordinator.rs
Addresses CodeRabbit review comment on PR #573 #573 (comment)
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dash-spv/src/client/event_handler.rs (1)
271-308:⚠️ Potential issue | 🟡 MinorAdd a regression test for the shutdown+lagged path.
This test covers the fatal lagged case, but the new guard at Line 68 is the subtle part of the fix. Please add a case that cancels
shutdownbefore the lagged recv and assertserror_count == 0andon_failurestays unset.As per coding guidelines, "Write unit tests for new functionality".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/client/event_handler.rs` around lines 271 - 308, Add a new tokio test similar to broadcast_monitor_exits_on_lagged_receiver that exercises the shutdown+lagged path: create a broadcast channel and RecordingHandler, send enough SyncEvent messages to overflow the buffer, but before awaiting the monitor spawn_broadcast_monitor call cancel the shutdown CancellationToken (shutdown.cancel()), then await the task and assert that handler.sync_count is 0, handler.error_count is 0, and on_failure.is_cancelled() is false; use the same symbols used in the existing test (spawn_broadcast_monitor, RecordingHandler, CancellationToken, SyncEvent, handler.error_count) so the guard at line 68 is covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@dash-spv/src/client/event_handler.rs`:
- Around line 60-74: The monitor error branches call on_failure.cancel() even
during a normal shutdown because they only check shutdown.is_cancelled(); update
the logic so Closed/Lagged arms treat a cancelled shutdown as a normal
termination and do not call on_failure.cancel(): in event_handler.rs adjust the
guards around Err(broadcast::error::RecvError::Closed) and Err(...::Lagged) (the
uses of shutdown.is_cancelled(), handler.on_error(), and on_failure.cancel()) to
skip on_failure.cancel() when the monitor shutdown token is cancelled or when
the outer coordinator run token has been cancelled; alternatively, ensure the
coordinator (sync_coordinator.rs run loop) cancels monitor_shutdown before
exiting so the monitor sees shutdown.is_cancelled() and does not call
on_failure.cancel().
- Line 45: Replace the bare CancellationToken "on_failure" with a channel that
carries the concrete failure (e.g., oneshot::Sender<SpvError> or a watch/tx of
Option<SpvError>) and stop calling on_failure.cancel(); instead send the
SpvError (or monitor name+reason) through that channel where monitors currently
call on_failure.cancel(). Update the consumers (the coordinator path that
currently maps a cancellation into ChannelFailure and then calls
handler.on_error) to take the received SpvError directly and invoke the single
error callback (handler.on_error) with that error, removing the
duplicate/mislabeled ChannelFailure branch; change references to on_failure, the
monitor failure sites, and the coordinator error handling to use the new
send/receive symbol names.
---
Outside diff comments:
In `@dash-spv/src/client/event_handler.rs`:
- Around line 271-308: Add a new tokio test similar to
broadcast_monitor_exits_on_lagged_receiver that exercises the shutdown+lagged
path: create a broadcast channel and RecordingHandler, send enough SyncEvent
messages to overflow the buffer, but before awaiting the monitor
spawn_broadcast_monitor call cancel the shutdown CancellationToken
(shutdown.cancel()), then await the task and assert that handler.sync_count is
0, handler.error_count is 0, and on_failure.is_cancelled() is false; use the
same symbols used in the existing test (spawn_broadcast_monitor,
RecordingHandler, CancellationToken, SyncEvent, handler.error_count) so the
guard at line 68 is covered.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: e7f34bb8-accd-40e5-bff5-fab9e84e7347
📒 Files selected for processing (1)
dash-spv/src/client/event_handler.rs
…r propagation Use an `mpsc::channel` instead of a `CancellationToken` for `on_failure` so monitors propagate the actual error message to the coordinator. The coordinator is now the single source of `handler.on_error()` calls, eliminating double-reporting. Also cancel `monitor_shutdown` immediately after the loop exits to close the shutdown race window. Addresses CodeRabbit review comment on PR #573 #573 (comment) #573 (comment)
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dash-spv/src/client/sync_coordinator.rs (1)
52-84:⚠️ Potential issue | 🟠 MajorMonitor task panics bypass the failure channel and are silently dropped.
The
monitor_failure_rxchannel only catches explicittry_send()failures from within the monitor helpers. If a handler callback panics inside a spawned monitor task, that task terminates with aJoinError, but line 120 discards allJoinHandleresults withlet _ = tokio::join!(...), so the panic goes unobserved. Additionally, if all sender clones are dropped,recv()returnsNone, which theselect!block at lines 104–114 ignores entirely.At minimum, capture join results and propagate any task failures:
Example fix
- let _ = tokio::join!(sync_task, network_task, wallet_task, progress_task); + let (sync_result, network_result, wallet_result, progress_result) = + tokio::join!(sync_task, network_task, wallet_task, progress_task); + + let join_error = [sync_result, network_result, wallet_result, progress_result] + .into_iter() + .find_map(Result::err) + .map(|join_err| { + SpvError::ChannelFailure( + "event monitor".into(), + format!("monitor task exited unexpectedly: {join_err}"), + ) + }); + + let error = error.or(join_error);Violates guideline: "Use proper error types with thiserror crate and propagate errors appropriately."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/client/sync_coordinator.rs` around lines 52 - 84, The spawned monitor tasks created by spawn_broadcast_monitor and spawn_progress_monitor can panic or be cancelled and currently their JoinHandle results are ignored by let _ = tokio::join!(...), so capture each JoinHandle result, check for panics/JoinError and forward a descriptive error via monitor_failure_tx (or monitor_failure_rx consumer) using the crate's error type (or wrap with thiserror) so failures propagate; also handle the case where recv() returns None inside the monitors by treating it as a shutdown/error condition and sending a failure notification instead of silently ignoring it; ensure monitor_shutdown is used to gracefully stop tasks but any unexpected task termination is sent through monitor_failure_tx to be handled upstream.
🧹 Nitpick comments (1)
dash-spv/src/client/sync_coordinator.rs (1)
97-124: Please add a coordinator-level regression test for this branch.The added tests stop at
event_handler.rs, but the contract this PR is fixing is finalized here:run()should exit withSpvError::ChannelFailure, cancelmonitor_shutdownbefore teardown, and callon_errorexactly once from the coordinator. A focusedrun()test that forces one monitor failure would pin that behavior.As per coding guidelines,
Write unit tests for new functionalityandImplement comprehensive unit tests in-module for individual components using #[cfg(test)] and integration tests in the tests/ directory.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/client/sync_coordinator.rs` around lines 97 - 124, Add a coordinator-level regression test that invokes the coordinator's run() loop and forces a single monitor failure to verify the fixed contract: the test should arrange for monitor_failure_rx to deliver a message so run() returns Some(SpvError::ChannelFailure(...)), verify monitor_shutdown.cancel() is called (i.e., monitors observe cancellation) before teardown completes, and assert handler.on_error(...) is called exactly once with the error string; use the actual coordinator struct and run() method, inject a mock/fake handler that records on_error invocations, and trigger the monitor failure path to exercise and assert these behaviors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@dash-spv/src/client/sync_coordinator.rs`:
- Around line 52-84: The spawned monitor tasks created by
spawn_broadcast_monitor and spawn_progress_monitor can panic or be cancelled and
currently their JoinHandle results are ignored by let _ = tokio::join!(...), so
capture each JoinHandle result, check for panics/JoinError and forward a
descriptive error via monitor_failure_tx (or monitor_failure_rx consumer) using
the crate's error type (or wrap with thiserror) so failures propagate; also
handle the case where recv() returns None inside the monitors by treating it as
a shutdown/error condition and sending a failure notification instead of
silently ignoring it; ensure monitor_shutdown is used to gracefully stop tasks
but any unexpected task termination is sent through monitor_failure_tx to be
handled upstream.
---
Nitpick comments:
In `@dash-spv/src/client/sync_coordinator.rs`:
- Around line 97-124: Add a coordinator-level regression test that invokes the
coordinator's run() loop and forces a single monitor failure to verify the fixed
contract: the test should arrange for monitor_failure_rx to deliver a message so
run() returns Some(SpvError::ChannelFailure(...)), verify
monitor_shutdown.cancel() is called (i.e., monitors observe cancellation) before
teardown completes, and assert handler.on_error(...) is called exactly once with
the error string; use the actual coordinator struct and run() method, inject a
mock/fake handler that records on_error invocations, and trigger the monitor
failure path to exercise and assert these behaviors.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 558aecd1-8c96-411b-9a40-46bb8bd7a406
📒 Files selected for processing (2)
dash-spv/src/client/event_handler.rsdash-spv/src/client/sync_coordinator.rs
on_failurecancellation token tospawn_broadcast_monitorandspawn_progress_monitorto signal the run loop on fatal monitor errorsmonitor_failure.cancelled()and exits withChannelFailureerrorSummary by CodeRabbit
Bug Fixes
Tests