66
77use std:: sync:: Arc ;
88
9- use tokio:: sync:: { broadcast, watch} ;
9+ use tokio:: sync:: { broadcast, mpsc , watch} ;
1010use tokio:: task:: JoinHandle ;
1111use tokio_util:: sync:: CancellationToken ;
1212
@@ -35,11 +35,15 @@ pub trait EventHandler: Send + Sync + 'static {
3535impl EventHandler for ( ) { }
3636
3737/// Spawns a task that monitors a broadcast channel and dispatches events to the handler.
38+ ///
39+ /// On failure, the error message is sent via `on_failure` so the coordinator can report
40+ /// it as the single source of error handling.
3841pub ( crate ) fn spawn_broadcast_monitor < E , H , F > (
3942 name : & ' static str ,
4043 mut receiver : broadcast:: Receiver < E > ,
4144 handler : Arc < H > ,
4245 shutdown : CancellationToken ,
46+ on_failure : mpsc:: Sender < String > ,
4347 dispatch_fn : F ,
4448) -> JoinHandle < ( ) >
4549where
5458 result = receiver. recv( ) => {
5559 match result {
5660 Ok ( event) => dispatch_fn( & handler, & event) ,
57- Err ( broadcast:: error:: RecvError :: Closed ) => break ,
58- Err ( broadcast:: error:: RecvError :: Lagged ( _) ) => continue ,
61+ Err ( broadcast:: error:: RecvError :: Closed ) if shutdown. is_cancelled( ) => break ,
62+ Err ( broadcast:: error:: RecvError :: Closed ) => {
63+ let msg = format!( "{} monitor channel closed unexpectedly" , name) ;
64+ tracing:: error!( "{}" , msg) ;
65+ let _ = on_failure. try_send( msg) ;
66+ break ;
67+ }
68+ Err ( broadcast:: error:: RecvError :: Lagged ( _) ) if shutdown. is_cancelled( ) => break ,
69+ Err ( broadcast:: error:: RecvError :: Lagged ( n) ) => {
70+ let msg = format!( "{} monitor lagged, missed {} events" , name, n) ;
71+ tracing:: error!( "{}" , msg) ;
72+ let _ = on_failure. try_send( msg) ;
73+ break ;
74+ }
5975 }
6076 }
6177 _ = shutdown. cancelled( ) => break ,
@@ -72,6 +88,7 @@ pub(crate) fn spawn_progress_monitor<H: EventHandler>(
7288 mut receiver : watch:: Receiver < SyncProgress > ,
7389 handler : Arc < H > ,
7490 shutdown : CancellationToken ,
91+ on_failure : mpsc:: Sender < String > ,
7592) -> JoinHandle < ( ) > {
7693 tokio:: spawn ( async move {
7794 tracing:: debug!( "Progress monitoring task started" ) ;
@@ -83,7 +100,13 @@ pub(crate) fn spawn_progress_monitor<H: EventHandler>(
83100 result = receiver. changed( ) => {
84101 match result {
85102 Ok ( ( ) ) => handler. on_progress( & receiver. borrow_and_update( ) ) ,
86- Err ( _) => break ,
103+ Err ( _) if shutdown. is_cancelled( ) => break ,
104+ Err ( _) => {
105+ let msg = "Progress monitor channel closed unexpectedly" . to_string( ) ;
106+ tracing:: error!( "{}" , msg) ;
107+ let _ = on_failure. try_send( msg) ;
108+ break ;
109+ }
87110 }
88111 }
89112 _ = shutdown. cancelled( ) => break ,
@@ -99,7 +122,7 @@ mod tests {
99122 use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
100123 use std:: sync:: Arc ;
101124
102- use tokio:: sync:: { broadcast, watch} ;
125+ use tokio:: sync:: { broadcast, mpsc , watch} ;
103126 use tokio_util:: sync:: CancellationToken ;
104127
105128 use super :: { spawn_broadcast_monitor, spawn_progress_monitor, EventHandler } ;
@@ -166,12 +189,14 @@ mod tests {
166189 let ( tx, rx) = broadcast:: channel ( 16 ) ;
167190 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
168191 let shutdown = CancellationToken :: new ( ) ;
192+ let ( failure_tx, _failure_rx) = mpsc:: channel ( 1 ) ;
169193
170194 let task = spawn_broadcast_monitor (
171195 "test" ,
172196 rx,
173197 handler. clone ( ) ,
174198 shutdown. clone ( ) ,
199+ failure_tx,
175200 |h : & RecordingHandler , event : & SyncEvent | h. on_sync_event ( event) ,
176201 ) ;
177202
@@ -202,12 +227,14 @@ mod tests {
202227 let ( _tx, rx) = broadcast:: channel :: < SyncEvent > ( 16 ) ;
203228 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
204229 let shutdown = CancellationToken :: new ( ) ;
230+ let ( failure_tx, _failure_rx) = mpsc:: channel ( 1 ) ;
205231
206232 let task = spawn_broadcast_monitor (
207233 "test" ,
208234 rx,
209235 handler. clone ( ) ,
210236 shutdown. clone ( ) ,
237+ failure_tx,
211238 |h : & RecordingHandler , event : & SyncEvent | h. on_sync_event ( event) ,
212239 ) ;
213240
@@ -218,28 +245,36 @@ mod tests {
218245 }
219246
220247 #[ tokio:: test]
221- async fn broadcast_monitor_exits_on_channel_close ( ) {
248+ async fn broadcast_monitor_fails_on_unexpected_channel_close ( ) {
222249 let ( tx, rx) = broadcast:: channel :: < SyncEvent > ( 16 ) ;
223250 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
224251 let shutdown = CancellationToken :: new ( ) ;
252+ let ( failure_tx, mut failure_rx) = mpsc:: channel ( 1 ) ;
225253
226254 let task = spawn_broadcast_monitor (
227255 "test" ,
228256 rx,
229257 handler. clone ( ) ,
230258 shutdown. clone ( ) ,
259+ failure_tx,
231260 |h : & RecordingHandler , event : & SyncEvent | h. on_sync_event ( event) ,
232261 ) ;
233262
263+ // Drop sender without cancelling shutdown — this is unexpected
234264 drop ( tx) ;
235265 task. await . unwrap ( ) ;
266+
267+ let msg = failure_rx. try_recv ( ) . expect ( "should have received failure message" ) ;
268+ assert ! ( msg. contains( "closed unexpectedly" ) ) ;
269+ assert_eq ! ( handler. error_count. load( Ordering :: SeqCst ) , 0 ) ;
236270 }
237271
238272 #[ tokio:: test]
239- async fn broadcast_monitor_handles_lagged_receiver ( ) {
273+ async fn broadcast_monitor_exits_on_lagged_receiver ( ) {
240274 let ( tx, rx) = broadcast:: channel ( 2 ) ;
241275 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
242276 let shutdown = CancellationToken :: new ( ) ;
277+ let ( failure_tx, mut failure_rx) = mpsc:: channel ( 1 ) ;
243278
244279 // Send more messages than the buffer can hold before spawning the monitor
245280 tx. send ( SyncEvent :: BlockHeadersStored {
@@ -260,31 +295,28 @@ mod tests {
260295 rx,
261296 handler. clone ( ) ,
262297 shutdown. clone ( ) ,
298+ failure_tx,
263299 |h : & RecordingHandler , event : & SyncEvent | h. on_sync_event ( event) ,
264300 ) ;
265301
266- // Send one more after the monitor starts
267- tx. send ( SyncEvent :: BlockHeadersStored {
268- tip_height : 4 ,
269- } )
270- . unwrap ( ) ;
271- tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 50 ) ) . await ;
272-
273- shutdown. cancel ( ) ;
302+ // The monitor should exit on its own due to the lagged error
274303 task. await . unwrap ( ) ;
275304
276- // The monitor should have received at least the last message (and possibly
277- // one from the lagged recovery). The key thing is it doesn't crash.
278- assert ! ( handler. sync_count. load( Ordering :: SeqCst ) >= 1 ) ;
305+ // No sync events should have been dispatched, failure sent via channel
306+ assert_eq ! ( handler. sync_count. load( Ordering :: SeqCst ) , 0 ) ;
307+ assert_eq ! ( handler. error_count. load( Ordering :: SeqCst ) , 0 ) ;
308+ let msg = failure_rx. try_recv ( ) . expect ( "should have received failure message" ) ;
309+ assert ! ( msg. contains( "lagged" ) ) ;
279310 }
280311
281312 #[ tokio:: test]
282313 async fn progress_monitor_sends_initial_and_updates ( ) {
283314 let ( tx, rx) = watch:: channel ( SyncProgress :: default ( ) ) ;
284315 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
285316 let shutdown = CancellationToken :: new ( ) ;
317+ let ( failure_tx, _failure_rx) = mpsc:: channel ( 1 ) ;
286318
287- let task = spawn_progress_monitor ( rx, handler. clone ( ) , shutdown. clone ( ) ) ;
319+ let task = spawn_progress_monitor ( rx, handler. clone ( ) , shutdown. clone ( ) , failure_tx ) ;
288320
289321 // Give the task time to send initial progress
290322 tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 50 ) ) . await ;
@@ -302,34 +334,41 @@ mod tests {
302334 }
303335
304336 #[ tokio:: test]
305- async fn progress_monitor_exits_on_sender_drop ( ) {
337+ async fn progress_monitor_fails_on_unexpected_sender_drop ( ) {
306338 let ( tx, rx) = watch:: channel ( SyncProgress :: default ( ) ) ;
307339 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
308340 let shutdown = CancellationToken :: new ( ) ;
341+ let ( failure_tx, mut failure_rx) = mpsc:: channel ( 1 ) ;
309342
310- let task = spawn_progress_monitor ( rx, handler. clone ( ) , shutdown. clone ( ) ) ;
343+ let task = spawn_progress_monitor ( rx, handler. clone ( ) , shutdown. clone ( ) , failure_tx ) ;
311344
312345 // Give it time to send initial
313346 tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 50 ) ) . await ;
314347
348+ // Drop sender without cancelling shutdown — this is unexpected
315349 drop ( tx) ;
316350 task. await . unwrap ( ) ;
317351
318- // At least the initial progress was sent
352+ // At least the initial progress was sent, failure sent via channel
319353 assert ! ( handler. progress_count. load( Ordering :: SeqCst ) >= 1 ) ;
354+ assert_eq ! ( handler. error_count. load( Ordering :: SeqCst ) , 0 ) ;
355+ let msg = failure_rx. try_recv ( ) . expect ( "should have received failure message" ) ;
356+ assert ! ( msg. contains( "closed unexpectedly" ) ) ;
320357 }
321358
322359 #[ tokio:: test]
323360 async fn network_event_dispatch ( ) {
324361 let ( tx, rx) = broadcast:: channel ( 16 ) ;
325362 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
326363 let shutdown = CancellationToken :: new ( ) ;
364+ let ( failure_tx, _failure_rx) = mpsc:: channel ( 1 ) ;
327365
328366 let task = spawn_broadcast_monitor (
329367 "network" ,
330368 rx,
331369 handler. clone ( ) ,
332370 shutdown. clone ( ) ,
371+ failure_tx,
333372 |h : & RecordingHandler , event : & NetworkEvent | h. on_network_event ( event) ,
334373 ) ;
335374
0 commit comments