Skip to content

Commit da925c6

Browse files
committed
plugins(lsps2): route recovered sessions through forward_event path
After restart, recovered session actors were stored in a separate recovery_handles Vec, unreachable by the forward_event notification path that routes via the sessions HashMap. This caused intermittent CI failures where on_payment_settled could not find the session and the internal forward-monitoring loop failed to detect settlement. Register recovered sessions in the sessions HashMap keyed by payment_hash so forward_event notifications reach them directly. For already-settled forwards, recover into Broadcasting state so the actor self-drives to completion without needing forward_event re-delivery. Remove the now-redundant internal polling loop (get_forward_activity + wait_for_forward_resolution).
1 parent 72194d2 commit da925c6

4 files changed

Lines changed: 187 additions & 172 deletions

File tree

plugins/lsps-plugin/src/cln_adapters/rpc.rs

Lines changed: 1 addition & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ use cln_rpc::{
2626
FundchannelCompleteRequest, FundchannelStartRequest, FundpsbtRequest, GetinfoRequest,
2727
ListdatastoreRequest, ListforwardsIndex, ListforwardsRequest, ListpeerchannelsRequest,
2828
SendpsbtRequest, SignpsbtRequest, UnreserveinputsRequest,
29-
WaitIndexname, WaitRequest, WaitSubsystem,
3029
},
31-
responses::{ListdatastoreResponse, ListforwardsForwardsStatus, WaitForwardsStatus},
30+
responses::{ListdatastoreResponse, ListforwardsForwardsStatus},
3231
},
3332
primitives::{Amount, AmountOrAll, ChannelState, Feerate, Sha256},
3433
ClnRpc,
@@ -845,63 +844,6 @@ impl RecoveryProvider for ClnRecoveryProvider {
845844
}
846845
}
847846

848-
async fn wait_for_forward_resolution(
849-
&self,
850-
channel_id: &str,
851-
from_index: u64,
852-
) -> Result<(ForwardActivity, u64)> {
853-
// Get the scid for this channel so we can match wait responses.
854-
let scid = self.rpc.get_channel_scid(channel_id).await?;
855-
856-
let mut next_index = from_index + 1;
857-
loop {
858-
let mut rpc = self.rpc.create_rpc().await?;
859-
let wait_res = rpc
860-
.call_typed(&WaitRequest {
861-
subsystem: WaitSubsystem::FORWARDS,
862-
indexname: WaitIndexname::UPDATED,
863-
nextvalue: next_index,
864-
})
865-
.await
866-
.with_context(|| {
867-
format!("calling wait for channel_id={channel_id} at index={next_index}")
868-
})?;
869-
870-
let new_index = wait_res.updated.unwrap_or(next_index);
871-
872-
// Check if this update is for our channel.
873-
let is_our_channel = match (&scid, &wait_res.forwards) {
874-
(Some(our_scid), Some(fwd)) => fwd
875-
.out_channel
876-
.as_ref()
877-
.map(|c| c == our_scid)
878-
.unwrap_or(false),
879-
_ => false,
880-
};
881-
882-
if is_our_channel {
883-
if let Some(fwd) = &wait_res.forwards {
884-
match fwd.status {
885-
Some(WaitForwardsStatus::SETTLED) => {
886-
return Ok((ForwardActivity::Settled, new_index));
887-
}
888-
Some(WaitForwardsStatus::OFFERED) => {
889-
return Ok((ForwardActivity::Offered, new_index));
890-
}
891-
Some(WaitForwardsStatus::FAILED)
892-
| Some(WaitForwardsStatus::LOCAL_FAILED) => {
893-
// Check full history to decide AllFailed vs Active.
894-
let activity = self.get_forward_activity(channel_id).await?;
895-
return Ok((activity, new_index));
896-
}
897-
None => {}
898-
}
899-
}
900-
}
901-
902-
next_index = new_index + 1;
903-
}
904-
}
905847
}
906848

907849
// ---------------------------------------------------------------------------

plugins/lsps-plugin/src/core/lsps2/actor.rs

Lines changed: 4 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
core::lsps2::{
33
event_sink::{EventSink, SessionEventEnvelope},
4-
provider::{DatastoreProvider, ForwardActivity, RecoveryProvider},
4+
provider::DatastoreProvider,
55
session::{PaymentPart, Session, SessionAction, SessionEvent, SessionInput},
66
},
77
proto::{
@@ -173,12 +173,9 @@ impl<A: ActionExecutor + Clone + Send + 'static, D: DatastoreProvider + Clone +
173173
session: Session,
174174
entry: DatastoreEntry,
175175
initial_actions: Vec<SessionAction>,
176-
channel_id: String,
177176
executor: A,
178177
scid: ShortChannelId,
179178
datastore: D,
180-
recovery: Arc<dyn RecoveryProvider>,
181-
forwards_updated_index: Option<u64>,
182179
event_sink: Arc<dyn EventSink>,
183180
) -> ActorInboxHandle {
184181
let (tx, inbox) = mpsc::channel(128);
@@ -200,12 +197,7 @@ impl<A: ActionExecutor + Clone + Send + 'static, D: DatastoreProvider + Clone +
200197
event_sink,
201198
};
202199

203-
tokio::spawn(actor.run_recovered(
204-
initial_actions,
205-
channel_id,
206-
recovery,
207-
forwards_updated_index,
208-
));
200+
tokio::spawn(actor.run_recovered(initial_actions));
209201
handle
210202
}
211203

@@ -376,9 +368,6 @@ impl<A: ActionExecutor + Clone + Send + 'static, D: DatastoreProvider + Clone +
376368
async fn run_recovered(
377369
mut self,
378370
initial_actions: Vec<SessionAction>,
379-
channel_id: String,
380-
recovery: Arc<dyn RecoveryProvider>,
381-
forwards_updated_index: Option<u64>,
382371
) {
383372
// Execute initial actions (e.g., BroadcastFundingTx for Broadcasting state)
384373
for action in initial_actions {
@@ -390,83 +379,11 @@ impl<A: ActionExecutor + Clone + Send + 'static, D: DatastoreProvider + Clone +
390379
return;
391380
}
392381

393-
// Start forward monitoring
394-
let from_index = forwards_updated_index.unwrap_or(0);
395-
let self_tx = self.self_send.clone();
396-
let monitor_handle = {
397-
let recovery = recovery.clone();
398-
let channel_id = channel_id.clone();
399-
let scid = self.scid;
400-
401-
tokio::spawn(async move {
402-
// First: check listforwards for already-settled forwards
403-
match recovery.get_forward_activity(&channel_id).await {
404-
Ok(ForwardActivity::Settled) => {
405-
let _ = self_tx
406-
.send(ActorInput::PaymentSettled {
407-
preimage: None,
408-
updated_index: None,
409-
})
410-
.await;
411-
return;
412-
}
413-
Ok(ForwardActivity::AllFailed) => {
414-
let _ = self_tx
415-
.send(ActorInput::PaymentFailed { updated_index: None })
416-
.await;
417-
return;
418-
}
419-
Ok(ForwardActivity::Offered)
420-
| Ok(ForwardActivity::NoForwards)
421-
| Err(_) => {
422-
// Fall through to wait loop
423-
}
424-
}
425-
426-
// Poll using wait subsystem
427-
let mut current_index = from_index;
428-
loop {
429-
match recovery
430-
.wait_for_forward_resolution(&channel_id, current_index)
431-
.await
432-
{
433-
Ok((ForwardActivity::Settled, new_index)) => {
434-
let _ = self_tx
435-
.send(ActorInput::PaymentSettled {
436-
preimage: None,
437-
updated_index: Some(new_index),
438-
})
439-
.await;
440-
return;
441-
}
442-
Ok((ForwardActivity::AllFailed, new_index)) => {
443-
let _ = self_tx
444-
.send(ActorInput::PaymentFailed {
445-
updated_index: Some(new_index),
446-
})
447-
.await;
448-
return;
449-
}
450-
Ok((ForwardActivity::Offered, new_index))
451-
| Ok((ForwardActivity::NoForwards, new_index)) => {
452-
current_index = new_index;
453-
continue;
454-
}
455-
Err(e) => {
456-
warn!("forward monitoring error for scid={scid}: {e}");
457-
tokio::time::sleep(Duration::from_secs(5)).await;
458-
continue;
459-
}
460-
}
461-
}
462-
})
463-
};
464-
465-
// Main loop: process inbox events
382+
// Main loop: process inbox events from forward_event notifications
466383
loop {
467384
match self.inbox.recv().await {
468385
Some(actor_input) => {
469-
// Only process the three shared persistence arms
386+
// Only process settlement/failure/broadcast events
470387
let session_input = match &actor_input {
471388
ActorInput::PaymentSettled { .. }
472389
| ActorInput::PaymentFailed { .. }
@@ -486,7 +403,6 @@ impl<A: ActionExecutor + Clone + Send + 'static, D: DatastoreProvider + Clone +
486403
}
487404
}
488405

489-
monitor_handle.abort();
490406
Self::finalize(&self.session, &self.datastore, self.scid).await;
491407
}
492408

0 commit comments

Comments
 (0)