diff --git a/lightning/src/ln/async_payments_tests.rs b/lightning/src/ln/async_payments_tests.rs index 6e8f38f847a..2158430808c 100644 --- a/lightning/src/ln/async_payments_tests.rs +++ b/lightning/src/ln/async_payments_tests.rs @@ -60,7 +60,7 @@ use crate::sign::NodeSigner; use crate::sync::Mutex; use crate::types::features::Bolt12InvoiceFeatures; use crate::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; -use crate::util::config::{HTLCInterceptionFlags, UserConfig}; +use crate::util::config::{ChannelConfigUpdate, HTLCInterceptionFlags, UserConfig}; use crate::util::ser::Writeable; use bitcoin::constants::ChainHash; use bitcoin::network::Network; @@ -416,6 +416,55 @@ fn extract_static_invoice_om<'a>( (peer_id, om, static_invoice.unwrap()) } +/// Extracts the next static invoice update while ignoring unrelated offer-path requests. +fn extract_serve_static_invoice_om<'a>( + recipient: &'a Node, next_hop_nodes: &[&'a Node], +) -> (PublicKey, msgs::OnionMessage, StaticInvoice) { + let mut static_invoice = None; + let mut expected_msg_type = |peeled_onion: &_| match peeled_onion { + PeeledOnion::AsyncPayments(AsyncPaymentsMessage::ServeStaticInvoice(msg), _, _) => { + static_invoice = Some(msg.invoice.clone()); + true + }, + _ => false, + }; + let expected_msg_type_to_ignore = |peeled_onion: &_| { + matches!( + peeled_onion, + &PeeledOnion::AsyncPayments(AsyncPaymentsMessage::OfferPathsRequest(_), _, _) + ) + }; + let (peer_id, om) = extract_expected_om( + recipient, + next_hop_nodes, + expected_msg_type, + expected_msg_type_to_ignore, + ) + .pop() + .unwrap(); + (peer_id, om, static_invoice.unwrap()) +} + +/// Delivers a static invoice update and checks that the server persists it in the expected slot. +fn expect_static_invoice_persist_event( + server: &Node, recipient: &Node, serve_static_invoice_om: &msgs::OnionMessage, + expected_invoice: &StaticInvoice, expected_invoice_slot: u16, expected_recipient_id: &[u8], +) { + server + .onion_messenger + .handle_onion_message(recipient.node.get_our_node_id(), serve_static_invoice_om); + let mut events = server.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + match events.pop().unwrap() { + Event::PersistStaticInvoice { invoice, invoice_slot, recipient_id, .. } => { + assert_eq!(&invoice, expected_invoice); + assert_eq!(invoice_slot, expected_invoice_slot); + assert_eq!(recipient_id, expected_recipient_id); + }, + _ => panic!(), + } +} + fn extract_held_htlc_available_oms<'a>( payer: &'a Node, next_hop_nodes: &[&'a Node], ) -> Vec<(PublicKey, msgs::OnionMessage)> { @@ -2507,6 +2556,135 @@ fn refresh_static_invoices_for_used_offers() { assert_eq!(res.0, Some(PaidBolt12Invoice::StaticInvoice(updated_invoice))); } +/// Checks that a used async receive offer gets a fresh server-side static invoice when a new +/// channel becomes usable. Used offers may already be published, so they should not wait for the +/// normal invoice refresh threshold after local payment paths change. +#[test] +fn refresh_static_invoices_for_used_offers_when_channel_opens() { + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + + let mut allow_priv_chan_fwds_cfg = test_default_channel_config(); + allow_priv_chan_fwds_cfg.accept_forwards_to_priv_channels = true; + let node_chanmgrs = + create_node_chanmgrs(3, &node_cfgs, &[None, Some(allow_priv_chan_fwds_cfg), None]); + + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 0); + create_unannounced_chan_between_nodes_with_value(&nodes, 1, 2, 1_000_000, 0); + let server = &nodes[1]; + let recipient = &nodes[2]; + + let recipient_id = vec![42; 32]; + let inv_server_paths = + server.node.blinded_paths_for_async_recipient(recipient_id.clone(), None).unwrap(); + recipient.node.set_paths_to_static_invoice_server(inv_server_paths).unwrap(); + expect_offer_paths_requests(recipient, &[&nodes[0], server]); + + let flow_res = pass_static_invoice_server_messages(server, recipient, recipient_id.clone()); + let original_invoice = flow_res.invoice; + assert_eq!(original_invoice.payment_paths().len(), 1); + + // Mark the offer as used so the cache treats it as potentially published by the application. + let _offer = recipient.node.get_async_receive_offer().unwrap(); + + // Keep onion delivery direct so the test only checks that opening a channel refreshes the + // invoice after its forwarding information is available. + server.message_router.peers_override.lock().unwrap().push(recipient.node.get_our_node_id()); + recipient.message_router.peers_override.lock().unwrap().push(server.node.get_our_node_id()); + + create_unannounced_chan_between_nodes_with_value(&nodes, 1, 2, 1_000_000, 0); + let (peer_node_id, serve_static_invoice_om, updated_invoice) = + extract_serve_static_invoice_om(recipient, &[server]); + assert_eq!(peer_node_id, server.node.get_our_node_id()); + assert_ne!(original_invoice, updated_invoice); + assert_eq!(updated_invoice.payment_paths().len(), 2); + + expect_static_invoice_persist_event( + server, + recipient, + &serve_static_invoice_om, + &updated_invoice, + flow_res.invoice_slot, + &recipient_id, + ); +} + +/// Checks that changed forwarding parameters refresh the static invoice for a used offer without +/// waiting for the normal invoice refresh threshold. +#[test] +fn refresh_static_invoices_for_used_offers_when_forwarding_fees_change() { + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + + let mut allow_priv_chan_fwds_cfg = test_default_channel_config(); + allow_priv_chan_fwds_cfg.accept_forwards_to_priv_channels = true; + let node_chanmgrs = + create_node_chanmgrs(3, &node_cfgs, &[None, Some(allow_priv_chan_fwds_cfg), None]); + + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 0); + create_unannounced_chan_between_nodes_with_value(&nodes, 1, 2, 1_000_000, 0); + let server = &nodes[1]; + let recipient = &nodes[2]; + + let recipient_id = vec![42; 32]; + let inv_server_paths = + server.node.blinded_paths_for_async_recipient(recipient_id.clone(), None).unwrap(); + recipient.node.set_paths_to_static_invoice_server(inv_server_paths).unwrap(); + expect_offer_paths_requests(recipient, &[&nodes[0], server]); + + let flow_res = pass_static_invoice_server_messages(server, recipient, recipient_id.clone()); + let original_invoice = flow_res.invoice; + let _offer = recipient.node.get_async_receive_offer().unwrap(); + + // Keep onion delivery direct so the test only checks the forwarding update trigger. + server.message_router.peers_override.lock().unwrap().push(recipient.node.get_our_node_id()); + recipient.message_router.peers_override.lock().unwrap().push(server.node.get_our_node_id()); + + let channel = server + .node + .list_channels() + .into_iter() + .find(|channel| channel.counterparty.node_id == recipient.node.get_our_node_id()) + .unwrap(); + let updated_fee_base_msat = channel.config.unwrap().forwarding_fee_base_msat + 10; + let config_update = ChannelConfigUpdate { + forwarding_fee_base_msat: Some(updated_fee_base_msat), + ..ChannelConfigUpdate::default() + }; + server + .node + .update_partial_channel_config( + &recipient.node.get_our_node_id(), + &[channel.channel_id], + &config_update, + ) + .unwrap(); + let channel_update = get_event_msg!( + server, + MessageSendEvent::SendChannelUpdate, + recipient.node.get_our_node_id() + ); + recipient.node.handle_channel_update(server.node.get_our_node_id(), &channel_update); + + let (peer_node_id, serve_static_invoice_om, updated_invoice) = + extract_serve_static_invoice_om(recipient, &[server]); + assert_eq!(peer_node_id, server.node.get_our_node_id()); + assert_ne!(original_invoice, updated_invoice); + assert_eq!(updated_invoice.payment_paths().len(), 1); + assert_eq!(updated_invoice.payment_paths()[0].payinfo.fee_base_msat, updated_fee_base_msat); + + expect_static_invoice_persist_event( + server, + recipient, + &serve_static_invoice_om, + &updated_invoice, + flow_res.invoice_slot, + &recipient_id, + ); +} + #[cfg_attr(feature = "std", ignore)] #[test] fn ignore_expired_static_invoice() { diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 2d7370bb15e..41f0447ff9a 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3006,6 +3006,8 @@ pub struct ChannelManager< funding_batch_states: Mutex>>, background_events_processed_since_startup: AtomicBool, + /// Set when a channel change may have made cached async receive static invoices stale. + async_receive_static_invoice_refresh_pending: AtomicBool, event_persist_notifier: Notifier, needs_persist_flag: AtomicBool, @@ -3766,6 +3768,7 @@ impl< pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), + async_receive_static_invoice_refresh_pending: AtomicBool::new(false), event_persist_notifier: Notifier::new(), needs_persist_flag: AtomicBool::new(false), funding_batch_states: Mutex::new(BTreeMap::new()), @@ -4564,6 +4567,8 @@ impl< )); } } + self.mark_async_receive_static_invoice_refresh_pending(); + for (err, counterparty_node_id) in shutdown_results.drain(..) { let _ = self.handle_error(err, counterparty_node_id); } @@ -4693,6 +4698,7 @@ impl< log_error!(logger, "Closing channel: {}", err_internal.err.err); self.finish_close_channel(shutdown_res); + self.process_pending_async_receive_static_invoice_refresh(); if let Some((update, node_id_1, node_id_2)) = update_option { let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); @@ -5905,6 +5911,30 @@ impl< } } + fn force_refresh_async_receive_static_invoices(&self) { + let router = &self.router; + + // Only collect peers and usable channels when async receiving is configured. This avoids reading + // channels during state transitions when there is no static invoice to refresh. + self.flow.force_refresh_async_receive_static_invoices( + || (self.get_peers_for_blinded_path(), self.list_usable_channels()), + router, + ); + } + + fn mark_async_receive_static_invoice_refresh_pending(&self) { + self.async_receive_static_invoice_refresh_pending.store(true, Ordering::Release); + } + + fn process_pending_async_receive_static_invoice_refresh(&self) { + // Channel state transitions often happen while a peer's channel lock is held. Defer the + // actual refresh until after those locks are released, because rebuilding static invoices + // needs a fresh snapshot of usable channels. + if self.async_receive_static_invoice_refresh_pending.swap(false, Ordering::AcqRel) { + self.force_refresh_async_receive_static_invoices(); + } + } + #[cfg(test)] pub(crate) fn test_check_refresh_async_receive_offers(&self) { self.check_refresh_async_receive_offer_cache(false); @@ -9130,6 +9160,7 @@ impl< .remove_stale_payments(duration_since_epoch, &self.pending_events); self.check_refresh_async_receive_offer_cache(true); + self.process_pending_async_receive_static_invoice_refresh(); if self.check_free_holding_cells() { // While we try to ensure we clear holding cells immediately, its possible we miss @@ -13750,6 +13781,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }, None, )); + self.mark_async_receive_static_invoice_refresh_pending(); splice_promotion.discarded_funding.into_iter().for_each(|funding_info| { let event = Event::DiscardFunding { channel_id: chan.context.channel_id(), @@ -16453,6 +16485,7 @@ impl< funding_txo: Some(funding_txo.into_bitcoin_outpoint()), channel_type: funded_channel.funding.get_channel_type().clone(), }, None)); + self.mark_async_receive_static_invoice_refresh_pending(); discarded_funding.into_iter().for_each(|funding_info| { let event = Event::DiscardFunding { channel_id: funded_channel.context.channel_id(), @@ -16873,16 +16906,19 @@ impl< #[rustfmt::skip] fn handle_splice_locked(&self, counterparty_node_id: PublicKey, msg: &msgs::SpliceLocked) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { - let res = self.internal_splice_locked(&counterparty_node_id, msg); - let persist = match &res { - Err(e) if e.closes_channel() => NotifyOption::DoPersist, - Err(_) => NotifyOption::SkipPersistHandleEvents, - Ok(()) => NotifyOption::DoPersist, - }; - let _ = self.handle_error(res, counterparty_node_id); - persist - }); + { + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_splice_locked(&counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::DoPersist, + }; + let _ = self.handle_error(res, counterparty_node_id); + persist + }); + } + self.process_pending_async_receive_static_invoice_refresh(); } fn handle_shutdown(&self, counterparty_node_id: PublicKey, msg: &msgs::Shutdown) { @@ -17017,14 +17053,22 @@ impl< } fn handle_channel_update(&self, counterparty_node_id: PublicKey, msg: &msgs::ChannelUpdate) { - PersistenceNotifierGuard::optionally_notify(self, || { - let res = self.internal_channel_update(&counterparty_node_id, msg); - if let Ok(persist) = self.handle_error(res, counterparty_node_id) { - persist - } else { - NotifyOption::DoPersist - } - }); + { + PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_update(&counterparty_node_id, msg); + if let Ok(persist) = self.handle_error(res, counterparty_node_id) { + if persist == NotifyOption::DoPersist { + // Static invoices encode the counterparty's forwarding parameters. Refresh + // them when an update changes those parameters for a local channel. + self.mark_async_receive_static_invoice_refresh_pending(); + } + persist + } else { + NotifyOption::DoPersist + } + }); + } + self.process_pending_async_receive_static_invoice_refresh(); } fn handle_channel_reestablish( @@ -20513,6 +20557,7 @@ impl< pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), + async_receive_static_invoice_refresh_pending: AtomicBool::new(false), event_persist_notifier: Notifier::new(), needs_persist_flag: AtomicBool::new(false), diff --git a/lightning/src/offers/async_receive_offer_cache.rs b/lightning/src/offers/async_receive_offer_cache.rs index 367cdb68fc8..02c73c2733b 100644 --- a/lightning/src/offers/async_receive_offer_cache.rs +++ b/lightning/src/offers/async_receive_offer_cache.rs @@ -469,6 +469,32 @@ impl AsyncReceiveOfferCache { }) } + /// Returns cached offers whose static invoices should be refreshed after a local channel change. + pub(super) fn offers_needing_forced_invoice_refresh( + &self, + ) -> impl Iterator { + self.offers_with_idx().filter_map(move |(_, offer)| { + let needs_invoice_update = match offer.status { + // Used offers may already be published by the application. Keep their server-side + // invoices aligned with our current channels instead of waiting for the timer + // threshold. + OfferStatus::Used { .. } => true, + // Pending offers have already been sent to the server, but are not confirmed yet. + // Re-sending them is safe and matches the normal timer retry behavior. + OfferStatus::Pending => true, + // Ready offers have not been handed to the application yet. They are rotated by the + // offer-refresh path, so forcing invoice updates for them would mostly create extra + // server churn without helping published offers. + OfferStatus::Ready { .. } => false, + }; + if needs_invoice_update { + Some((&offer.offer, offer.offer_nonce, &offer.update_static_invoice_path)) + } else { + None + } + }) + } + /// Should be called when we receive a [`StaticInvoicePersisted`] message from the static invoice /// server, which indicates that a new offer was persisted by the server and they are ready to /// serve the corresponding static invoice to payers on our behalf. diff --git a/lightning/src/offers/flow.rs b/lightning/src/offers/flow.rs index ade684e5be1..6a49224ee1f 100644 --- a/lightning/src/offers/flow.rs +++ b/lightning/src/offers/flow.rs @@ -1349,12 +1349,31 @@ impl OffersMessageFlow { self.check_refresh_async_offers(peers.clone(), timer_tick_occurred)?; if timer_tick_occurred { - self.check_refresh_static_invoices(peers, usable_channels, router); + self.check_refresh_static_invoices(peers, usable_channels, router, false); } Ok(()) } + /// Enqueues static invoice updates for cached async receive offers after local channel changes. + pub fn force_refresh_async_receive_static_invoices( + &self, get_refresh_inputs: F, router: R, + ) where + F: FnOnce() -> (Vec, Vec), + { + // A forced refresh is useful only for an async recipient already configured with a server. + let cache = self.async_receive_offer_cache.lock().unwrap(); + if cache.paths_to_static_invoice_server().is_empty() { + return; + } + core::mem::drop(cache); + + // Channel details may be in a short-lived transitional state when this refresh is requested. + // Collect them only after confirming that async receiving needs the snapshot. + let (peers, usable_channels) = get_refresh_inputs(); + self.check_refresh_static_invoices(peers, usable_channels, router, true); + } + fn check_refresh_async_offers( &self, peers: Vec, timer_tick_occurred: bool, ) -> Result<(), ()> { @@ -1408,41 +1427,59 @@ impl OffersMessageFlow { /// server, based on the offers provided by the cache. fn check_refresh_static_invoices( &self, peers: Vec, usable_channels: Vec, router: R, + force_refresh: bool, ) { let mut serve_static_invoice_msgs = Vec::new(); { let duration_since_epoch = self.duration_since_epoch(); let cache = self.async_receive_offer_cache.lock().unwrap(); - for offer_and_metadata in cache.offers_needing_invoice_refresh(duration_since_epoch) { - let (offer, offer_nonce, update_static_invoice_path) = offer_and_metadata; - - let (invoice, forward_invreq_path) = match self.create_static_invoice_for_server( - offer, - offer_nonce, - peers.clone(), - usable_channels.clone(), - &router, - ) { - Ok((invoice, path)) => (invoice, path), - Err(()) => continue, - }; - let reply_path_context = { - MessageContext::AsyncPayments(AsyncPaymentsContext::StaticInvoicePersisted { - invoice_created_at: invoice.created_at(), - offer_id: offer.id(), - }) - }; + // Both timer-driven and forced refreshes build the same update message. Keep the + // construction in one place so the only difference is which cached offers are selected. + macro_rules! build_refresh_message { + ($offer: expr, $offer_nonce: expr, $update_static_invoice_path: expr) => {{ + let (invoice, forward_invreq_path) = match self + .create_static_invoice_for_server( + $offer, + $offer_nonce, + peers.clone(), + usable_channels.clone(), + &router, + ) { + Ok((invoice, path)) => (invoice, path), + Err(()) => continue, + }; + + let reply_path_context = MessageContext::AsyncPayments( + AsyncPaymentsContext::StaticInvoicePersisted { + invoice_created_at: invoice.created_at(), + offer_id: $offer.id(), + }, + ); - let serve_invoice_message = ServeStaticInvoice { - invoice, - forward_invoice_request_path: forward_invreq_path, - }; - serve_static_invoice_msgs.push(( - serve_invoice_message, - update_static_invoice_path.clone(), - reply_path_context, - )); + let serve_invoice_message = ServeStaticInvoice { + invoice, + forward_invoice_request_path: forward_invreq_path, + }; + serve_static_invoice_msgs.push(( + serve_invoice_message, + $update_static_invoice_path.clone(), + reply_path_context, + )); + }}; + } + + if force_refresh { + for offer_and_metadata in cache.offers_needing_forced_invoice_refresh() { + let (offer, offer_nonce, update_static_invoice_path) = offer_and_metadata; + build_refresh_message!(offer, offer_nonce, update_static_invoice_path); + } + } else { + for offer_and_metadata in cache.offers_needing_invoice_refresh(duration_since_epoch) + { + let (offer, offer_nonce, update_static_invoice_path) = offer_and_metadata; + build_refresh_message!(offer, offer_nonce, update_static_invoice_path); + } } }