@@ -30,11 +30,14 @@ pub trait EventHandler: Send + Sync + 'static {
3030impl EventHandler for ( ) { }
3131
3232/// Spawns a task that monitors a broadcast channel and dispatches events to the handler.
33+ ///
34+ /// `on_failure` is cancelled if the receiver lags, signalling the run loop to stop.
3335pub ( crate ) fn spawn_broadcast_monitor < E , H , F > (
3436 name : & ' static str ,
3537 mut receiver : broadcast:: Receiver < E > ,
3638 handler : Arc < H > ,
3739 shutdown : CancellationToken ,
40+ on_failure : CancellationToken ,
3841 dispatch_fn : F ,
3942) -> JoinHandle < ( ) >
4043where
4952 result = receiver. recv( ) => {
5053 match result {
5154 Ok ( event) => dispatch_fn( & handler, & event) ,
52- Err ( broadcast:: error:: RecvError :: Closed ) => break ,
53- Err ( broadcast:: error:: RecvError :: Lagged ( _) ) => continue ,
55+ Err ( broadcast:: error:: RecvError :: Closed ) if shutdown. is_cancelled( ) => break ,
56+ Err ( broadcast:: error:: RecvError :: Closed ) => {
57+ let msg = format!( "{} monitor channel closed unexpectedly" , name) ;
58+ tracing:: error!( "{}" , msg) ;
59+ handler. on_error( & msg) ;
60+ on_failure. cancel( ) ;
61+ break ;
62+ }
63+ Err ( broadcast:: error:: RecvError :: Lagged ( n) ) => {
64+ let msg = format!( "{} monitor lagged, missed {} events" , name, n) ;
65+ tracing:: error!( "{}" , msg) ;
66+ handler. on_error( & msg) ;
67+ on_failure. cancel( ) ;
68+ break ;
69+ }
5470 }
5571 }
5672 _ = shutdown. cancelled( ) => break ,
@@ -67,6 +83,7 @@ pub(crate) fn spawn_progress_monitor<H: EventHandler>(
6783 mut receiver : watch:: Receiver < SyncProgress > ,
6884 handler : Arc < H > ,
6985 shutdown : CancellationToken ,
86+ on_failure : CancellationToken ,
7087) -> JoinHandle < ( ) > {
7188 tokio:: spawn ( async move {
7289 tracing:: debug!( "Progress monitoring task started" ) ;
@@ -78,7 +95,14 @@ pub(crate) fn spawn_progress_monitor<H: EventHandler>(
7895 result = receiver. changed( ) => {
7996 match result {
8097 Ok ( ( ) ) => handler. on_progress( & receiver. borrow_and_update( ) ) ,
81- Err ( _) => break ,
98+ Err ( _) if shutdown. is_cancelled( ) => break ,
99+ Err ( _) => {
100+ let msg = "Progress monitor channel closed unexpectedly" ;
101+ tracing:: error!( "{}" , msg) ;
102+ handler. on_error( msg) ;
103+ on_failure. cancel( ) ;
104+ break ;
105+ }
82106 }
83107 }
84108 _ = shutdown. cancelled( ) => break ,
@@ -167,6 +191,7 @@ mod tests {
167191 rx,
168192 handler. clone ( ) ,
169193 shutdown. clone ( ) ,
194+ CancellationToken :: new ( ) ,
170195 |h : & RecordingHandler , event : & SyncEvent | h. on_sync_event ( event) ,
171196 ) ;
172197
@@ -203,6 +228,7 @@ mod tests {
203228 rx,
204229 handler. clone ( ) ,
205230 shutdown. clone ( ) ,
231+ CancellationToken :: new ( ) ,
206232 |h : & RecordingHandler , event : & SyncEvent | h. on_sync_event ( event) ,
207233 ) ;
208234
@@ -213,28 +239,35 @@ mod tests {
213239 }
214240
215241 #[ tokio:: test]
216- async fn broadcast_monitor_exits_on_channel_close ( ) {
242+ async fn broadcast_monitor_fails_on_unexpected_channel_close ( ) {
217243 let ( tx, rx) = broadcast:: channel :: < SyncEvent > ( 16 ) ;
218244 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
219245 let shutdown = CancellationToken :: new ( ) ;
246+ let on_failure = CancellationToken :: new ( ) ;
220247
221248 let task = spawn_broadcast_monitor (
222249 "test" ,
223250 rx,
224251 handler. clone ( ) ,
225252 shutdown. clone ( ) ,
253+ on_failure. clone ( ) ,
226254 |h : & RecordingHandler , event : & SyncEvent | h. on_sync_event ( event) ,
227255 ) ;
228256
257+ // Drop sender without cancelling shutdown — this is unexpected
229258 drop ( tx) ;
230259 task. await . unwrap ( ) ;
260+
261+ assert_eq ! ( handler. error_count. load( Ordering :: SeqCst ) , 1 ) ;
262+ assert ! ( on_failure. is_cancelled( ) ) ;
231263 }
232264
233265 #[ tokio:: test]
234- async fn broadcast_monitor_handles_lagged_receiver ( ) {
266+ async fn broadcast_monitor_exits_on_lagged_receiver ( ) {
235267 let ( tx, rx) = broadcast:: channel ( 2 ) ;
236268 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
237269 let shutdown = CancellationToken :: new ( ) ;
270+ let on_failure = CancellationToken :: new ( ) ;
238271
239272 // Send more messages than the buffer can hold before spawning the monitor
240273 tx. send ( SyncEvent :: BlockHeadersStored {
@@ -255,22 +288,17 @@ mod tests {
255288 rx,
256289 handler. clone ( ) ,
257290 shutdown. clone ( ) ,
291+ on_failure. clone ( ) ,
258292 |h : & RecordingHandler , event : & SyncEvent | h. on_sync_event ( event) ,
259293 ) ;
260294
261- // Send one more after the monitor starts
262- tx. send ( SyncEvent :: BlockHeadersStored {
263- tip_height : 4 ,
264- } )
265- . unwrap ( ) ;
266- tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 50 ) ) . await ;
267-
268- shutdown. cancel ( ) ;
295+ // The monitor should exit on its own due to the lagged error
269296 task. await . unwrap ( ) ;
270297
271- // The monitor should have received at least the last message (and possibly
272- // one from the lagged recovery). The key thing is it doesn't crash.
273- assert ! ( handler. sync_count. load( Ordering :: SeqCst ) >= 1 ) ;
298+ // No sync events should have been dispatched, but on_error must have fired
299+ assert_eq ! ( handler. sync_count. load( Ordering :: SeqCst ) , 0 ) ;
300+ assert_eq ! ( handler. error_count. load( Ordering :: SeqCst ) , 1 ) ;
301+ assert ! ( on_failure. is_cancelled( ) ) ;
274302 }
275303
276304 #[ tokio:: test]
@@ -279,7 +307,8 @@ mod tests {
279307 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
280308 let shutdown = CancellationToken :: new ( ) ;
281309
282- let task = spawn_progress_monitor ( rx, handler. clone ( ) , shutdown. clone ( ) ) ;
310+ let task =
311+ spawn_progress_monitor ( rx, handler. clone ( ) , shutdown. clone ( ) , CancellationToken :: new ( ) ) ;
283312
284313 // Give the task time to send initial progress
285314 tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 50 ) ) . await ;
@@ -297,21 +326,26 @@ mod tests {
297326 }
298327
299328 #[ tokio:: test]
300- async fn progress_monitor_exits_on_sender_drop ( ) {
329+ async fn progress_monitor_fails_on_unexpected_sender_drop ( ) {
301330 let ( tx, rx) = watch:: channel ( SyncProgress :: default ( ) ) ;
302331 let handler = Arc :: new ( RecordingHandler :: new ( ) ) ;
303332 let shutdown = CancellationToken :: new ( ) ;
333+ let on_failure = CancellationToken :: new ( ) ;
304334
305- let task = spawn_progress_monitor ( rx, handler. clone ( ) , shutdown. clone ( ) ) ;
335+ let task =
336+ spawn_progress_monitor ( rx, handler. clone ( ) , shutdown. clone ( ) , on_failure. clone ( ) ) ;
306337
307338 // Give it time to send initial
308339 tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 50 ) ) . await ;
309340
341+ // Drop sender without cancelling shutdown — this is unexpected
310342 drop ( tx) ;
311343 task. await . unwrap ( ) ;
312344
313- // At least the initial progress was sent
345+ // At least the initial progress was sent, plus on_error fired
314346 assert ! ( handler. progress_count. load( Ordering :: SeqCst ) >= 1 ) ;
347+ assert_eq ! ( handler. error_count. load( Ordering :: SeqCst ) , 1 ) ;
348+ assert ! ( on_failure. is_cancelled( ) ) ;
315349 }
316350
317351 #[ tokio:: test]
@@ -325,6 +359,7 @@ mod tests {
325359 rx,
326360 handler. clone ( ) ,
327361 shutdown. clone ( ) ,
362+ CancellationToken :: new ( ) ,
328363 |h : & RecordingHandler , event : & NetworkEvent | h. on_network_event ( event) ,
329364 ) ;
330365
0 commit comments