diff --git a/src/builder.rs b/src/builder.rs index ae115bde0..5328b368c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1944,6 +1944,7 @@ fn build_with_store_internal( node_metrics, om_mailbox, async_payments_role, + forward_counters: Arc::new(crate::ForwardCounters::new()), }) } diff --git a/src/event.rs b/src/event.rs index ec11731f2..618c4ae89 100644 --- a/src/event.rs +++ b/src/event.rs @@ -40,6 +40,7 @@ use crate::io::{ EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, }; +use crate::forward_metrics::ForwardCounters; use crate::liquidity::LiquiditySource; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; @@ -505,6 +506,7 @@ where static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + forward_counters: Arc, } impl EventHandler @@ -520,7 +522,7 @@ where payment_store: Arc, peer_store: Arc>, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, runtime: Arc, logger: L, - config: Arc, + config: Arc, forward_counters: Arc, ) -> Self { Self { event_queue, @@ -539,6 +541,7 @@ where static_invoice_store, onion_messenger, om_mailbox, + forward_counters, } } @@ -1125,9 +1128,40 @@ where LdkEvent::PaymentPathFailed { .. } => {}, LdkEvent::ProbeSuccessful { .. } => {}, LdkEvent::ProbeFailed { .. } => {}, - LdkEvent::HTLCHandlingFailed { failure_type, .. } => { + LdkEvent::HTLCHandlingFailed { + prev_channel_id, + failure_type, + failure_reason, + } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { - liquidity_source.handle_htlc_handling_failed(failure_type).await; + liquidity_source + .handle_htlc_handling_failed(failure_type.clone()) + .await; + } + + match &failure_type { + lightning::events::HTLCHandlingFailureType::Forward { + channel_id: next_channel_id, + .. + } => { + let channels = self.channel_manager.list_channels(); + if let Some(dir) = ForwardCounters::classify( + &channels, + &prev_channel_id, + next_channel_id, + ) { + let is_downstream = matches!( + failure_reason, + Some(lightning::events::HTLCHandlingFailureReason::Downstream) + ); + self.forward_counters.record_failure(dir, is_downstream); + } + }, + lightning::events::HTLCHandlingFailureType::UnknownNextHop { .. } + | lightning::events::HTLCHandlingFailureType::InvalidForward { .. } => { + self.forward_counters.record_invalid_scid(); + }, + _ => {}, } }, LdkEvent::SpendableOutputs { outputs, channel_id } => { @@ -1308,10 +1342,11 @@ where claim_from_onchain_tx, outbound_amount_forwarded_msat, } => { + let channels = self.channel_manager.list_channels(); + { let read_only_network_graph = self.network_graph.read_only(); let nodes = read_only_network_graph.nodes(); - let channels = self.channel_manager.list_channels(); let node_str = |channel_id: &Option| { channel_id @@ -1374,6 +1409,14 @@ where .await; } + if let (Some(prev_cid), Some(next_cid)) = (prev_channel_id, next_channel_id) { + if let Some(dir) = + ForwardCounters::classify(&channels, &prev_cid, &next_cid) + { + self.forward_counters.record_success(dir); + } + } + let event = Event::PaymentForwarded { prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."), next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."), diff --git a/src/forward_metrics.rs b/src/forward_metrics.rs new file mode 100644 index 000000000..033d9155d --- /dev/null +++ b/src/forward_metrics.rs @@ -0,0 +1,305 @@ +//! Monotonic counters for HTLC forward outcomes involving LSP client channels. +//! +//! Only forwards where at least one side is a private (unannounced) channel are counted. +//! Network-to-network routing traffic is ignored entirely. + +use std::sync::atomic::{AtomicU64, Ordering}; + +/// The direction of a forward relative to the LSP's client. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ForwardDirection { + /// Incoming payment to a client (network → private channel). + ToClient, + /// Outgoing payment from a client (private channel → network). + FromClient, +} + +/// Monotonic counters for HTLC forward outcomes involving client channels. +/// +/// Dimensions: +/// - **Direction**: `to_client` (inbound to client) vs `from_client` (client sending out). +/// - **Outcome**: `success` vs `failure`. +/// - **Failure source** (failures only): `downstream` (peer failed it back) vs `local` (we rejected). +/// +/// Network-to-network forwards (both channels public) are not counted. +/// +/// These are plain atomics so ldk-node stays free of any metrics framework dependency. +/// The application layer reads them via [`ForwardCounters::load_all`] and maps to prometheus. +#[derive(Default, Debug)] +pub struct ForwardCounters { + /// Successful forwards to a client's private channel. + pub success_to_client: AtomicU64, + /// Successful forwards from a client's private channel to the network. + pub success_from_client: AtomicU64, + /// Failed forwards to a client's private channel (downstream failure). + pub failure_to_client_downstream: AtomicU64, + /// Failed forwards to a client's private channel (local failure). + pub failure_to_client_local: AtomicU64, + /// Failed forwards from a client's private channel to the network (downstream failure). + pub failure_from_client_downstream: AtomicU64, + /// Failed forwards from a client's private channel to the network (local failure). + pub failure_from_client_local: AtomicU64, + /// HTLC referenced an SCID we don't recognize (UnknownNextHop / InvalidForward). + /// Likely a bug in private channel alias tracking. + pub failure_invalid_forward_scid: AtomicU64, +} + +impl ForwardCounters { + /// Creates a new set of zeroed counters. + pub fn new() -> Self { + Self::default() + } + + /// Classifies a forward by whether it involves a client channel and in which direction. + /// Payments between clients are classified as toward the client. + /// + /// Returns `None` for network-to-network forwards (both channels public). + /// Defaults to private if a channel is not found (e.g. already closed). + pub(crate) fn classify( + channels: &[lightning::ln::channel_state::ChannelDetails], + prev_channel_id: &lightning::ln::types::ChannelId, + next_channel_id: &lightning::ln::types::ChannelId, + ) -> Option { + let prev_private = channels + .iter() + .find(|c| c.channel_id == *prev_channel_id) + .map_or(true, |c| !c.is_announced); + let next_private = channels + .iter() + .find(|c| c.channel_id == *next_channel_id) + .map_or(true, |c| !c.is_announced); + + match (prev_private, next_private) { + (_, true) => Some(ForwardDirection::ToClient), + (true, false) => Some(ForwardDirection::FromClient), + (false, false) => None, + } + } + + pub(crate) fn record_success(&self, direction: ForwardDirection) { + match direction { + ForwardDirection::ToClient => { + self.success_to_client.fetch_add(1, Ordering::Relaxed); + }, + ForwardDirection::FromClient => { + self.success_from_client.fetch_add(1, Ordering::Relaxed); + }, + } + } + + pub(crate) fn record_failure( + &self, direction: ForwardDirection, is_downstream: bool, + ) { + match (direction, is_downstream) { + (ForwardDirection::ToClient, true) => { + self.failure_to_client_downstream.fetch_add(1, Ordering::Relaxed); + }, + (ForwardDirection::ToClient, false) => { + self.failure_to_client_local.fetch_add(1, Ordering::Relaxed); + }, + (ForwardDirection::FromClient, true) => { + self.failure_from_client_downstream.fetch_add(1, Ordering::Relaxed); + }, + (ForwardDirection::FromClient, false) => { + self.failure_from_client_local.fetch_add(1, Ordering::Relaxed); + }, + } + } + + pub(crate) fn record_invalid_scid(&self) { + self.failure_invalid_forward_scid.fetch_add(1, Ordering::Relaxed); + } + + /// Takes a point-in-time snapshot of all counter values. + pub fn load_all(&self) -> ForwardSnapshot { + ForwardSnapshot { + success_to_client: self.success_to_client.load(Ordering::Relaxed), + success_from_client: self.success_from_client.load(Ordering::Relaxed), + failure_to_client_downstream: self.failure_to_client_downstream.load(Ordering::Relaxed), + failure_to_client_local: self.failure_to_client_local.load(Ordering::Relaxed), + failure_from_client_downstream: self + .failure_from_client_downstream + .load(Ordering::Relaxed), + failure_from_client_local: self.failure_from_client_local.load(Ordering::Relaxed), + failure_invalid_forward_scid: self + .failure_invalid_forward_scid + .load(Ordering::Relaxed), + } + } +} + +/// Point-in-time snapshot of all forward counters. +#[derive(Debug, Clone, Copy, Default)] +pub struct ForwardSnapshot { + /// Successful forwards to a client's private channel. + pub success_to_client: u64, + /// Successful forwards from a client's private channel to the network. + pub success_from_client: u64, + /// Failed forwards to a client's private channel (downstream failure). + pub failure_to_client_downstream: u64, + /// Failed forwards to a client's private channel (local failure). + pub failure_to_client_local: u64, + /// Failed forwards from a client's private channel to the network (downstream failure). + pub failure_from_client_downstream: u64, + /// Failed forwards from a client's private channel to the network (local failure). + pub failure_from_client_local: u64, + /// HTLC referenced an SCID we don't recognize. + pub failure_invalid_forward_scid: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; + use lightning::ln::channel_state::{ChannelCounterparty, ChannelDetails}; + use lightning::ln::types::ChannelId; + use lightning_types::features::InitFeatures; + + fn dummy_pubkey() -> PublicKey { + let secp = Secp256k1::new(); + PublicKey::from_secret_key(&secp, &SecretKey::from_slice(&[1u8; 32]).unwrap()) + } + + fn make_channel(id: [u8; 32], announced: bool) -> ChannelDetails { + ChannelDetails { + channel_id: ChannelId::from_bytes(id), + counterparty: ChannelCounterparty { + node_id: dummy_pubkey(), + features: InitFeatures::empty(), + unspendable_punishment_reserve: 0, + forwarding_info: None, + outbound_htlc_minimum_msat: None, + outbound_htlc_maximum_msat: None, + }, + funding_txo: None, + channel_type: None, + short_channel_id: None, + outbound_scid_alias: None, + inbound_scid_alias: None, + channel_value_satoshis: 1_000_000, + unspendable_punishment_reserve: None, + user_channel_id: 0, + feerate_sat_per_1000_weight: None, + outbound_capacity_msat: 0, + next_outbound_htlc_limit_msat: 0, + next_outbound_htlc_minimum_msat: 0, + inbound_capacity_msat: 0, + confirmations_required: None, + confirmations: None, + force_close_spend_delay: None, + is_outbound: false, + is_channel_ready: true, + channel_shutdown_state: None, + is_usable: true, + is_announced: announced, + inbound_htlc_minimum_msat: None, + inbound_htlc_maximum_msat: None, + config: None, + pending_inbound_htlcs: vec![], + pending_outbound_htlcs: vec![], + funding_redeem_script: None, + } + } + + fn cid(b: u8) -> ChannelId { + ChannelId::from_bytes([b; 32]) + } + + #[test] + fn classify_network_to_client() { + let channels = vec![ + make_channel([1; 32], true), + make_channel([2; 32], false), + ]; + assert_eq!( + ForwardCounters::classify(&channels, &cid(1), &cid(2)), + Some(ForwardDirection::ToClient), + ); + } + + #[test] + fn classify_client_to_network() { + let channels = vec![ + make_channel([1; 32], false), + make_channel([2; 32], true), + ]; + assert_eq!( + ForwardCounters::classify(&channels, &cid(1), &cid(2)), + Some(ForwardDirection::FromClient), + ); + } + + #[test] + fn classify_network_to_network_ignored() { + let channels = vec![ + make_channel([1; 32], true), + make_channel([2; 32], true), + ]; + assert_eq!(ForwardCounters::classify(&channels, &cid(1), &cid(2)), None); + } + + #[test] + fn classify_client_to_client_is_to_client() { + let channels = vec![ + make_channel([1; 32], false), + make_channel([2; 32], false), + ]; + assert_eq!( + ForwardCounters::classify(&channels, &cid(1), &cid(2)), + Some(ForwardDirection::ToClient), + ); + } + + #[test] + fn classify_unknown_channel_defaults_to_private() { + // next_channel_id not in list -> treated as private -> ToClient + let channels = vec![make_channel([1; 32], true)]; + assert_eq!( + ForwardCounters::classify(&channels, &cid(1), &cid(99)), + Some(ForwardDirection::ToClient), + ); + } + + #[test] + fn record_success_increments_correct_counter() { + let c = ForwardCounters::new(); + c.record_success(ForwardDirection::ToClient); + c.record_success(ForwardDirection::ToClient); + c.record_success(ForwardDirection::FromClient); + + let snap = c.load_all(); + assert_eq!(snap.success_to_client, 2); + assert_eq!(snap.success_from_client, 1); + } + + #[test] + fn record_failure_increments_correct_counter() { + let c = ForwardCounters::new(); + c.record_failure(ForwardDirection::ToClient, true); + c.record_failure(ForwardDirection::ToClient, false); + c.record_failure(ForwardDirection::FromClient, true); + c.record_failure(ForwardDirection::FromClient, false); + c.record_failure(ForwardDirection::ToClient, true); + + let snap = c.load_all(); + assert_eq!(snap.failure_to_client_downstream, 2); + assert_eq!(snap.failure_to_client_local, 1); + assert_eq!(snap.failure_from_client_downstream, 1); + assert_eq!(snap.failure_from_client_local, 1); + // successes untouched + assert_eq!(snap.success_to_client, 0); + } + + #[test] + fn record_unknown_scid_increments() { + let c = ForwardCounters::new(); + c.record_invalid_scid(); + c.record_invalid_scid(); + c.record_invalid_scid(); + + let snap = c.load_all(); + assert_eq!(snap.failure_invalid_forward_scid, 3); + assert_eq!(snap.success_to_client, 0); + } +} + diff --git a/src/lib.rs b/src/lib.rs index 945889321..ea17ffb66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -86,6 +86,7 @@ mod data_store; mod error; mod event; mod fee_estimator; +pub mod forward_metrics; mod ffi; mod gossip; pub mod graph; @@ -124,6 +125,7 @@ use connection::ConnectionManager; pub use error::Error as NodeError; use error::Error; pub use event::Event; +pub use forward_metrics::ForwardCounters; use event::{EventHandler, EventQueue}; use fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; #[cfg(feature = "uniffi")] @@ -221,6 +223,7 @@ pub struct Node { node_metrics: Arc>, om_mailbox: Option>, async_payments_role: Option, + forward_counters: Arc, } impl Node { @@ -572,6 +575,7 @@ impl Node { Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), + Arc::clone(&self.forward_counters), )); // Setup background processing @@ -739,6 +743,14 @@ impl Node { Ok(()) } + /// Returns the HTLC forward counters for metrics collection. + /// + /// These are monotonic atomic counters incremented inside the event handler for + /// `PaymentForwarded` (success) and `HTLCHandlingFailed` (failure) events. + pub fn forward_counters(&self) -> &ForwardCounters { + &self.forward_counters + } + /// Returns the status of the [`Node`]. pub fn status(&self) -> NodeStatus { let is_running = *self.is_running.read().unwrap();