From 7827106c099f1202d12c448459a83d140f894fb9 Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Wed, 25 Mar 2026 13:47:23 +0900 Subject: [PATCH 1/2] Add BIP 157 compact block filter chain source --- Cargo.toml | 1 + bindings/ldk_node.udl | 3 + src/builder.rs | 81 +++- src/chain/cbf.rs | 1031 +++++++++++++++++++++++++++++++++++++++++ src/chain/mod.rs | 78 +++- src/config.rs | 35 ++ src/ffi/types.rs | 4 +- src/wallet/mod.rs | 22 + tests/common/mod.rs | 39 +- 9 files changed, 1287 insertions(+), 7 deletions(-) create mode 100644 src/chain/cbf.rs diff --git a/Cargo.toml b/Cargo.toml index 07cabe33f..f27772031 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]} +bip157 = { version = "0.4.2", default-features = false } bdk_wallet = { version = "2.3.0", default-features = false, features = ["std", "keys-bip39"]} bitreq = { version = "0.3", default-features = false, features = ["async-https", "json-using-serde"] } diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 0addb1fe1..240ca6b59 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -9,6 +9,8 @@ typedef dictionary EsploraSyncConfig; typedef dictionary ElectrumSyncConfig; +typedef dictionary CbfSyncConfig; + typedef dictionary TorConfig; typedef interface NodeEntropy; @@ -38,6 +40,7 @@ interface Builder { constructor(Config config); void set_chain_source_esplora(string server_url, EsploraSyncConfig? config); void set_chain_source_electrum(string server_url, ElectrumSyncConfig? config); + void set_chain_source_cbf(sequence peers, CbfSyncConfig? sync_config); void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_chain_source_bitcoind_rest(string rest_host, u16 rest_port, string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); diff --git a/src/builder.rs b/src/builder.rs index b6e9e2c8b..a54bdf2b1 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -45,8 +45,8 @@ use vss_client::headers::VssHeaderProvider; use crate::chain::ChainSource; use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, - BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig, - DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, + BitcoindRestClientConfig, CbfSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, + TorConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, }; use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; @@ -105,6 +105,10 @@ enum ChainDataSourceConfig { rpc_password: String, rest_client_config: Option, }, + Cbf { + peers: Vec, + sync_config: Option, + }, } #[derive(Debug, Clone)] @@ -365,6 +369,26 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its chain data via BIP 157 compact block + /// filters. + /// + /// `peers` is an optional list of peer addresses to connect to for sourcing compact block + /// filters. If empty, the node will discover peers via DNS seeds. + /// + /// If no `sync_config` is given, default values are used. See [`CbfSyncConfig`] for more + /// information. + /// + /// Note: fee rate estimation with this chain source uses block-level averages (total fees + /// divided by block weight) rather than per-transaction fee rates. This can underestimate + /// next-block inclusion rates during periods of high mempool congestion. Percentile-based + /// target selection partially mitigates this. + pub fn set_chain_source_cbf( + &mut self, peers: Vec, sync_config: Option, + ) -> &mut Self { + self.chain_data_source_config = Some(ChainDataSourceConfig::Cbf { peers, sync_config }); + self + } + /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. /// /// This method establishes an RPC connection that enables all essential chain operations including @@ -892,6 +916,23 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_chain_source_electrum(server_url, sync_config); } + /// Configures the [`Node`] instance to source its chain data via BIP 157 compact block + /// filters. + /// + /// `peers` is an optional list of peer addresses to connect to for sourcing compact block + /// filters. If empty, the node will discover peers via DNS seeds. + /// + /// If no `sync_config` is given, default values are used. See [`CbfSyncConfig`] for more + /// information. + /// + /// Note: fee rate estimation with this chain source uses block-level averages (total fees + /// divided by block weight) rather than per-transaction fee rates. This can underestimate + /// next-block inclusion rates during periods of high mempool congestion. Percentile-based + /// target selection partially mitigates this. + pub fn set_chain_source_cbf(&self, peers: Vec, sync_config: Option) { + self.inner.write().unwrap().set_chain_source_cbf(peers, sync_config); + } + /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. /// /// This method establishes an RPC connection that enables all essential chain operations including @@ -1364,6 +1405,20 @@ fn build_with_store_internal( }), }, + Some(ChainDataSourceConfig::Cbf { peers, sync_config }) => { + let sync_config = sync_config.clone().unwrap_or(CbfSyncConfig::default()); + ChainSource::new_cbf( + peers.clone(), + sync_config, + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&node_metrics), + ) + }, + None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); @@ -2085,6 +2140,9 @@ pub(crate) fn sanitize_alias(alias_str: &str) -> Result { #[cfg(test)] mod tests { + #[cfg(feature = "uniffi")] + use crate::config::CbfSyncConfig; + use super::{sanitize_alias, BuildError, NodeAlias}; #[test] @@ -2122,4 +2180,23 @@ mod tests { let node = sanitize_alias(alias); assert_eq!(node.err().unwrap(), BuildError::InvalidNodeAlias); } + + #[cfg(feature = "uniffi")] + #[test] + fn arced_builder_can_set_cbf_chain_source() { + let builder = super::ArcedNodeBuilder::new(); + let sync_config = CbfSyncConfig::default(); + + let peers = vec!["127.0.0.1:8333".to_string()]; + builder.set_chain_source_cbf(peers.clone(), Some(sync_config.clone())); + + let guard = builder.inner.read().unwrap(); + assert!(matches!( + guard.chain_data_source_config.as_ref(), + Some(super::ChainDataSourceConfig::Cbf { + peers: p, + sync_config: Some(config), + }) if config == &sync_config && p == &peers + )); + } } diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs new file mode 100644 index 000000000..83a030ade --- /dev/null +++ b/src/chain/cbf.rs @@ -0,0 +1,1031 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::collections::{BTreeMap, HashMap}; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use bdk_chain::{BlockId, ConfirmationBlockTime, TxUpdate}; +use bdk_wallet::Update; +use bip157::chain::BlockHeaderChanges; +use bip157::{ + BlockHash, Builder, Client, Event, Info, Requester, SyncUpdate, TrustedPeer, Warning, +}; +use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; +use bitcoin::{Amount, FeeRate, Network, Script, ScriptBuf, Transaction, Txid}; +use lightning::chain::{Confirm, WatchedOutput}; +use lightning::util::ser::Writeable; +use tokio::sync::{mpsc, oneshot}; + +use super::WalletSyncStatus; +use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP}; +use crate::error::Error; +use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + OnchainFeeEstimator, +}; +use crate::io::utils::write_node_metrics; +use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::runtime::Runtime; +use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::NodeMetrics; + +/// Minimum fee rate: 1 sat/vB = 250 sat/kWU. Used as a floor for computed fee rates. +const MIN_FEERATE_SAT_PER_KWU: u64 = 250; + +/// Number of recent blocks to look back for per-target fee rate estimation. +const FEE_RATE_LOOKBACK_BLOCKS: usize = 6; + +pub(super) struct CbfChainSource { + /// Peer addresses for sourcing compact block filters via P2P. + peers: Vec, + /// User-provided sync configuration (timeouts, background sync intervals). + pub(super) sync_config: CbfSyncConfig, + /// Tracks whether the bip157 node is running and holds the command handle. + cbf_runtime_status: Mutex, + /// Latest chain tip hash, updated by the background event processing task. + latest_tip: Arc>>, + /// Scripts to match against compact block filters during a scan. + watched_scripts: Arc>>, + /// Block (height, hash) pairs where filters matched watched scripts. + matched_block_hashes: Arc>>, + /// One-shot channel sender to signal filter scan completion. + sync_completion_tx: Arc>>>, + /// Filters at or below this height are skipped during incremental scans. + filter_skip_height: Arc, + /// Serializes concurrent filter scans (on-chain and lightning). + scan_lock: tokio::sync::Mutex<()>, + /// Scripts registered by LDK's Filter trait for lightning channel monitoring. + registered_scripts: Mutex>, + /// Set when new scripts are registered; forces a full rescan on next lightning sync. + lightning_scripts_dirty: AtomicBool, + /// Last block height reached by on-chain wallet sync, used for incremental scans. + last_onchain_synced_height: Mutex>, + /// Last block height reached by lightning wallet sync, used for incremental scans. + last_lightning_synced_height: Mutex>, + /// Deduplicates concurrent on-chain wallet sync requests. + onchain_wallet_sync_status: Mutex, + /// Deduplicates concurrent lightning wallet sync requests. + lightning_wallet_sync_status: Mutex, + /// Shared fee rate estimator, updated by this chain source. + fee_estimator: Arc, + /// Persistent key-value store for node metrics. + kv_store: Arc, + /// Node configuration (network, storage path, etc.). + config: Arc, + /// Logger instance. + logger: Arc, + /// Shared node metrics (sync timestamps, etc.). + node_metrics: Arc>, +} + +enum CbfRuntimeStatus { + Started { requester: Requester }, + Stopped, +} + +/// Shared state passed to the background event processing task. +struct CbfEventState { + latest_tip: Arc>>, + watched_scripts: Arc>>, + matched_block_hashes: Arc>>, + sync_completion_tx: Arc>>>, + filter_skip_height: Arc, +} + +impl CbfChainSource { + pub(crate) fn new( + peers: Vec, sync_config: CbfSyncConfig, fee_estimator: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, + ) -> Self { + let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped); + let latest_tip = Arc::new(Mutex::new(None)); + let watched_scripts = Arc::new(RwLock::new(Vec::new())); + let matched_block_hashes = Arc::new(Mutex::new(Vec::new())); + let sync_completion_tx = Arc::new(Mutex::new(None)); + let filter_skip_height = Arc::new(AtomicU32::new(0)); + let registered_scripts = Mutex::new(Vec::new()); + let lightning_scripts_dirty = AtomicBool::new(true); + let scan_lock = tokio::sync::Mutex::new(()); + let last_onchain_synced_height = Mutex::new(None); + let last_lightning_synced_height = Mutex::new(None); + let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + Self { + peers, + sync_config, + cbf_runtime_status, + latest_tip, + watched_scripts, + matched_block_hashes, + sync_completion_tx, + filter_skip_height, + registered_scripts, + lightning_scripts_dirty, + scan_lock, + last_onchain_synced_height, + last_lightning_synced_height, + onchain_wallet_sync_status, + lightning_wallet_sync_status, + fee_estimator, + kv_store, + config, + logger, + node_metrics, + } + } + + /// Start the bip157 node and spawn background tasks for event processing. + pub(crate) fn start(&self, runtime: Arc) { + let mut status = self.cbf_runtime_status.lock().unwrap(); + if matches!(*status, CbfRuntimeStatus::Started { .. }) { + debug_assert!(false, "We shouldn't call start if we're already started"); + return; + } + + let network = self.config.network; + + let mut builder = Builder::new(network); + + // Configure data directory under the node's storage path. + let data_dir = std::path::PathBuf::from(&self.config.storage_dir_path).join("bip157_data"); + builder = builder.data_dir(data_dir); + + // Add configured peers. + let peers: Vec = self + .peers + .iter() + .filter_map(|peer_str| { + peer_str.parse::().ok().map(TrustedPeer::from_socket_addr) + }) + .collect(); + if !peers.is_empty() { + builder = builder.add_peers(peers); + } + + // Request witness data so segwit transactions include full witnesses, + // required for Lightning channel operations. + builder = builder.fetch_witness_data(); + + // Set peer response timeout from user configuration (default: 30s). + builder = + builder.response_timeout(Duration::from_secs(self.sync_config.response_timeout_secs)); + + let (node, client) = builder.build(); + + let Client { requester, info_rx, warn_rx, event_rx } = client; + + // Spawn the bip157 node in the background. + let node_logger = Arc::clone(&self.logger); + runtime.spawn_background_task(async move { + if let Err(e) = node.run().await { + log_error!(node_logger, "CBF node exited with error: {:?}", e); + } + }); + + // Spawn a task to log info messages. + let info_logger = Arc::clone(&self.logger); + runtime + .spawn_cancellable_background_task(Self::process_info_messages(info_rx, info_logger)); + + // Spawn a task to log warning messages. + let warn_logger = Arc::clone(&self.logger); + runtime + .spawn_cancellable_background_task(Self::process_warn_messages(warn_rx, warn_logger)); + + // Spawn a task to process events. + let event_state = CbfEventState { + latest_tip: Arc::clone(&self.latest_tip), + watched_scripts: Arc::clone(&self.watched_scripts), + matched_block_hashes: Arc::clone(&self.matched_block_hashes), + sync_completion_tx: Arc::clone(&self.sync_completion_tx), + filter_skip_height: Arc::clone(&self.filter_skip_height), + }; + let event_logger = Arc::clone(&self.logger); + runtime.spawn_cancellable_background_task(Self::process_events( + event_rx, + event_state, + event_logger, + )); + + log_info!(self.logger, "CBF chain source started."); + + *status = CbfRuntimeStatus::Started { requester }; + } + + /// Shut down the bip157 node and stop all background tasks. + pub(crate) fn stop(&self) { + let mut status = self.cbf_runtime_status.lock().unwrap(); + match &*status { + CbfRuntimeStatus::Started { requester } => { + let _ = requester.shutdown(); + log_info!(self.logger, "CBF chain source stopped."); + }, + CbfRuntimeStatus::Stopped => {}, + } + *status = CbfRuntimeStatus::Stopped; + } + + async fn process_info_messages(mut info_rx: mpsc::Receiver, logger: Arc) { + while let Some(info) = info_rx.recv().await { + log_debug!(logger, "CBF node info: {}", info); + } + } + + async fn process_warn_messages( + mut warn_rx: mpsc::UnboundedReceiver, logger: Arc, + ) { + while let Some(warning) = warn_rx.recv().await { + log_debug!(logger, "CBF node warning: {}", warning); + } + } + + async fn process_events( + mut event_rx: mpsc::UnboundedReceiver, state: CbfEventState, logger: Arc, + ) { + while let Some(event) = event_rx.recv().await { + match event { + Event::FiltersSynced(sync_update) => { + let tip = sync_update.tip(); + *state.latest_tip.lock().unwrap() = Some(tip.hash); + log_info!( + logger, + "CBF filters synced to tip: height={}, hash={}", + tip.height, + tip.hash, + ); + if let Some(tx) = state.sync_completion_tx.lock().unwrap().take() { + let _ = tx.send(sync_update); + } + }, + Event::Block(_) => {}, + Event::ChainUpdate(header_changes) => match header_changes { + BlockHeaderChanges::Reorganized { accepted, reorganized } => { + log_debug!( + logger, + "CBF chain reorg detected: {} blocks removed, {} blocks accepted.", + reorganized.len(), + accepted.len(), + ); + }, + BlockHeaderChanges::Connected(header) => { + log_trace!(logger, "CBF block connected at height {}", header.height,); + }, + BlockHeaderChanges::ForkAdded(header) => { + log_trace!(logger, "CBF fork block observed at height {}", header.height,); + }, + }, + Event::IndexedFilter(indexed_filter) => { + let skip_height = state.filter_skip_height.load(Ordering::Acquire); + if skip_height > 0 && indexed_filter.height() <= skip_height { + continue; + } + let scripts = state.watched_scripts.read().unwrap(); + if !scripts.is_empty() && indexed_filter.contains_any(scripts.iter()) { + state + .matched_block_hashes + .lock() + .unwrap() + .push((indexed_filter.height(), indexed_filter.block_hash())); + } + log_trace!(logger, "CBF received filter at height {}", indexed_filter.height(),); + }, + } + } + } + + fn requester(&self) -> Result { + let status = self.cbf_runtime_status.lock().unwrap(); + match &*status { + CbfRuntimeStatus::Started { requester } => Ok(requester.clone()), + CbfRuntimeStatus::Stopped => { + debug_assert!( + false, + "We should have started the chain source before using the requester" + ); + Err(Error::ConnectionFailed) + }, + } + } + + /// Register a transaction script for Lightning channel monitoring. + pub(crate) fn register_tx(&self, _txid: &Txid, script_pubkey: &Script) { + self.registered_scripts.lock().unwrap().push(script_pubkey.to_owned()); + self.lightning_scripts_dirty.store(true, Ordering::Release); + } + + /// Register a watched output script for Lightning channel monitoring. + pub(crate) fn register_output(&self, output: WatchedOutput) { + self.registered_scripts.lock().unwrap().push(output.script_pubkey.clone()); + self.lightning_scripts_dirty.store(true, Ordering::Release); + } + + /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for + /// completion, and return the sync update along with matched block hashes. + /// + /// When `skip_before_height` is `Some(h)`, filters at or below height `h` are + /// skipped, making the scan incremental. + async fn run_filter_scan( + &self, scripts: Vec, skip_before_height: Option, + ) -> Result<(SyncUpdate, Vec<(u32, BlockHash)>), Error> { + let requester = self.requester()?; + + let _scan_guard = self.scan_lock.lock().await; + + self.filter_skip_height.store(skip_before_height.unwrap_or(0), Ordering::Release); + self.matched_block_hashes.lock().unwrap().clear(); + *self.watched_scripts.write().unwrap() = scripts; + + let (tx, rx) = oneshot::channel(); + *self.sync_completion_tx.lock().unwrap() = Some(tx); + + requester.rescan().map_err(|e| { + log_error!(self.logger, "Failed to trigger CBF rescan: {:?}", e); + Error::WalletOperationFailed + })?; + + let sync_update = rx.await.map_err(|e| { + log_error!(self.logger, "CBF sync completion channel dropped: {:?}", e); + Error::WalletOperationFailed + })?; + + self.filter_skip_height.store(0, Ordering::Release); + self.watched_scripts.write().unwrap().clear(); + let matched = std::mem::take(&mut *self.matched_block_hashes.lock().unwrap()); + + Ok((sync_update, matched)) + } + + /// Sync the on-chain wallet by scanning compact block filters for relevant transactions. + pub(crate) async fn sync_onchain_wallet( + &self, onchain_wallet: Arc, + ) -> Result<(), Error> { + let receiver_res = { + let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_debug!(self.logger, "On-chain wallet sync already in progress, waiting."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } + + let res = async { + let requester = self.requester()?; + let now = Instant::now(); + + let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP); + if scripts.is_empty() { + log_debug!(self.logger, "No wallet scripts to sync via CBF."); + return Ok(()); + } + + let timeout_fut = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs, + ), + self.sync_onchain_wallet_op(requester, scripts), + ); + + let (tx_update, sync_update) = match timeout_fut.await { + Ok(res) => res?, + Err(e) => { + log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e); + return Err(Error::WalletOperationTimeout); + }, + }; + + // Build chain checkpoint extending from the wallet's current tip. + let mut cp = onchain_wallet.latest_checkpoint(); + for (height, header) in sync_update.recent_history() { + if *height > cp.height() { + let block_id = BlockId { height: *height, hash: header.block_hash() }; + cp = cp.push(block_id).unwrap_or_else(|old| old); + } + } + let tip = sync_update.tip(); + if tip.height > cp.height() { + let tip_block_id = BlockId { height: tip.height, hash: tip.hash }; + cp = cp.push(tip_block_id).unwrap_or_else(|old| old); + } + + let update = + Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) }; + + onchain_wallet.apply_update(update)?; + + log_debug!( + self.logger, + "Sync of on-chain wallet via CBF finished in {}ms.", + now.elapsed().as_millis() + ); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_onchain_wallet_sync_timestamp = t; + }, + )?; + + Ok(()) + } + .await; + + self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn sync_onchain_wallet_op( + &self, requester: Requester, scripts: Vec, + ) -> Result<(TxUpdate, SyncUpdate), Error> { + let skip_height = *self.last_onchain_synced_height.lock().unwrap(); + let (sync_update, matched) = self.run_filter_scan(scripts, skip_height).await?; + + log_debug!( + self.logger, + "CBF on-chain filter scan complete: {} matching blocks found.", + matched.len() + ); + + // Fetch matching blocks and include all their transactions. + // The compact block filter already matched our scripts (covering both + // created outputs and spent inputs), so we include every transaction + // from matched blocks and let BDK determine relevance. + let mut tx_update = TxUpdate::default(); + for (height, block_hash) in &matched { + let indexed_block = requester.get_block(*block_hash).await.map_err(|e| { + log_error!(self.logger, "Failed to fetch block {}: {:?}", block_hash, e); + Error::WalletOperationFailed + })?; + let block = indexed_block.block; + let block_id = BlockId { height: *height, hash: block.header.block_hash() }; + let conf_time = + ConfirmationBlockTime { block_id, confirmation_time: block.header.time as u64 }; + for tx in &block.txdata { + let txid = tx.compute_txid(); + tx_update.txs.push(Arc::new(tx.clone())); + tx_update.anchors.insert((conf_time, txid)); + } + } + + let tip = sync_update.tip(); + *self.last_onchain_synced_height.lock().unwrap() = Some(tip.height); + + Ok((tx_update, sync_update)) + } + + /// Sync the Lightning wallet by confirming channel transactions via compact block filters. + pub(crate) async fn sync_lightning_wallet( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + let receiver_res = { + let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_debug!(self.logger, "Lightning wallet sync already in progress, waiting."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::TxSyncFailed + })?; + } + + let res = async { + let requester = self.requester()?; + let now = Instant::now(); + + let scripts: Vec = self.registered_scripts.lock().unwrap().clone(); + if scripts.is_empty() { + log_debug!(self.logger, "No registered scripts for CBF lightning sync."); + return Ok(()); + } + + let timeout_fut = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.lightning_wallet_sync_timeout_secs, + ), + self.sync_lightning_wallet_op( + requester, + channel_manager, + chain_monitor, + output_sweeper, + scripts, + ), + ); + + match timeout_fut.await { + Ok(res) => res?, + Err(e) => { + log_error!(self.logger, "Sync of Lightning wallet timed out: {}", e); + return Err(Error::TxSyncTimeout); + }, + }; + + log_debug!( + self.logger, + "Sync of Lightning wallet via CBF finished in {}ms.", + now.elapsed().as_millis() + ); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_lightning_wallet_sync_timestamp = t; + }, + )?; + + Ok(()) + } + .await; + + self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn sync_lightning_wallet_op( + &self, requester: Requester, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, scripts: Vec, + ) -> Result<(), Error> { + let scripts_dirty = self.lightning_scripts_dirty.load(Ordering::Acquire); + let skip_height = + if scripts_dirty { None } else { *self.last_lightning_synced_height.lock().unwrap() }; + let (sync_update, matched) = self.run_filter_scan(scripts, skip_height).await?; + + log_debug!( + self.logger, + "CBF lightning filter scan complete: {} matching blocks found.", + matched.len() + ); + + let confirmables: Vec<&(dyn Confirm + Sync + Send)> = + vec![&*channel_manager, &*chain_monitor, &*output_sweeper]; + + // Fetch matching blocks and confirm all their transactions. + // The compact block filter already matched our scripts (covering both + // created outputs and spent inputs), so we confirm every transaction + // from matched blocks and let LDK determine relevance. + for (height, block_hash) in &matched { + confirm_block_transactions( + &requester, + *block_hash, + *height, + &confirmables, + &self.logger, + ) + .await?; + } + + // Update the best block tip. + let tip = sync_update.tip(); + if let Some(tip_header) = sync_update.recent_history().get(&tip.height) { + for confirmable in &confirmables { + confirmable.best_block_updated(tip_header, tip.height); + } + } + + *self.last_lightning_synced_height.lock().unwrap() = Some(tip.height); + self.lightning_scripts_dirty.store(false, Ordering::Release); + + Ok(()) + } + + pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { + let requester = self.requester()?; + + let tip_hash = match *self.latest_tip.lock().unwrap() { + Some(hash) => hash, + None => { + log_debug!(self.logger, "No tip available yet for fee rate estimation, skipping."); + return Ok(()); + }, + }; + + let now = Instant::now(); + + // Fetch fee rates from the last N blocks for per-target estimation. + // We compute fee rates ourselves rather than using Requester::average_fee_rate, + // so we can sample multiple blocks and select percentiles per confirmation target. + let mut block_fee_rates: Vec = Vec::with_capacity(FEE_RATE_LOOKBACK_BLOCKS); + let mut current_hash = tip_hash; + + let timeout = Duration::from_secs( + self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, + ); + let fetch_start = Instant::now(); + + for idx in 0..FEE_RATE_LOOKBACK_BLOCKS { + // Check if we've exceeded the overall timeout for fee estimation. + let remaining_timeout = timeout.saturating_sub(fetch_start.elapsed()); + if remaining_timeout.is_zero() { + log_error!(self.logger, "Updating fee rate estimates timed out."); + return Err(Error::FeerateEstimationUpdateTimeout); + } + + // Fetch the block via P2P. On the first iteration, a fetch failure + // likely means the cached tip is stale (initial sync or reorg), so + // we clear the tip and skip gracefully instead of returning an error. + let indexed_block = + match tokio::time::timeout(remaining_timeout, requester.get_block(current_hash)) + .await + { + Ok(Ok(indexed_block)) => indexed_block, + Ok(Err(e)) if idx == 0 => { + log_debug!( + self.logger, + "Cached CBF tip {} was unavailable during fee estimation, \ + likely due to initial sync or a reorg: {:?}", + current_hash, + e + ); + *self.latest_tip.lock().unwrap() = None; + return Ok(()); + }, + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to fetch block for fee estimation: {:?}", + e + ); + return Err(Error::FeerateEstimationUpdateFailed); + }, + Err(e) if idx == 0 => { + log_debug!( + self.logger, + "Timed out fetching cached CBF tip {} during fee estimation, \ + likely due to initial sync or a reorg: {}", + current_hash, + e + ); + *self.latest_tip.lock().unwrap() = None; + return Ok(()); + }, + Err(e) => { + log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); + return Err(Error::FeerateEstimationUpdateTimeout); + }, + }; + + let height = indexed_block.height; + let block = &indexed_block.block; + let weight_kwu = block.weight().to_kwu_floor(); + + // Compute fee rate: (coinbase_output - subsidy) / weight. + // For blocks with zero weight (e.g. coinbase-only in regtest), use the floor rate. + let fee_rate_sat_per_kwu = if weight_kwu == 0 { + MIN_FEERATE_SAT_PER_KWU + } else { + let subsidy = block_subsidy(height); + let revenue = block + .txdata + .first() + .map(|tx| tx.output.iter().map(|o| o.value).sum()) + .unwrap_or(Amount::ZERO); + let block_fees = revenue.checked_sub(subsidy).unwrap_or(Amount::ZERO); + + if block_fees == Amount::ZERO && self.config.network == Network::Bitcoin { + log_error!( + self.logger, + "Failed to retrieve fee rate estimates: zero block fees are disallowed on Mainnet.", + ); + return Err(Error::FeerateEstimationUpdateFailed); + } + + (block_fees.to_sat() / weight_kwu).max(MIN_FEERATE_SAT_PER_KWU) + }; + + block_fee_rates.push(fee_rate_sat_per_kwu); + // Walk backwards through the chain via prev_blockhash. + if height == 0 { + break; + } + current_hash = block.header.prev_blockhash; + } + + if block_fee_rates.is_empty() { + log_error!(self.logger, "No blocks available for fee rate estimation."); + return Err(Error::FeerateEstimationUpdateFailed); + } + + block_fee_rates.sort(); + + let confirmation_targets = get_all_conf_targets(); + let mut new_fee_rate_cache = HashMap::with_capacity(confirmation_targets.len()); + + for target in confirmation_targets { + let num_blocks = get_num_block_defaults_for_target(target); + let base_fee_rate = select_fee_rate_for_target(&block_fee_rates, num_blocks); + let adjusted_fee_rate = apply_post_estimation_adjustments(target, base_fee_rate); + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + + self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache); + + log_debug!( + self.logger, + "Fee rate cache update finished in {}ms ({} blocks sampled).", + now.elapsed().as_millis(), + block_fee_rates.len(), + ); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_fee_rate_cache_update_timestamp = t; + }, + )?; + + Ok(()) + } + + /// Broadcast a package of transactions via the P2P network. + pub(crate) async fn process_broadcast_package(&self, package: Vec) { + let Ok(requester) = self.requester() else { return }; + + for tx in package { + let txid = tx.compute_txid(); + let tx_bytes = tx.encode(); + let timeout_fut = tokio::time::timeout( + Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs), + requester.broadcast_tx(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(wtxid) => { + log_trace!( + self.logger, + "Successfully broadcast transaction {} (wtxid: {})", + txid, + wtxid + ); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {:?}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx_bytes) + ); + }, + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction due to timeout {}: {}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx_bytes) + ); + }, + } + } + } +} + +/// Record the current timestamp in a `NodeMetrics` field and persist the metrics. +fn update_node_metrics_timestamp( + node_metrics: &RwLock, kv_store: &DynStore, logger: &Logger, + setter: impl FnOnce(&mut NodeMetrics, Option), +) -> Result<(), Error> { + let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + let mut locked = node_metrics.write().unwrap(); + setter(&mut locked, unix_time_secs_opt); + write_node_metrics(&*locked, kv_store, logger)?; + Ok(()) +} + +/// Fetch a block by hash and call `transactions_confirmed` on each confirmable. +async fn confirm_block_transactions( + requester: &Requester, block_hash: BlockHash, height: u32, + confirmables: &[&(dyn Confirm + Sync + Send)], logger: &Logger, +) -> Result<(), Error> { + let indexed_block = requester.get_block(block_hash).await.map_err(|e| { + log_error!(logger, "Failed to fetch block {}: {:?}", block_hash, e); + Error::TxSyncFailed + })?; + let block = &indexed_block.block; + let header = &block.header; + let txdata: Vec<(usize, &Transaction)> = block.txdata.iter().enumerate().collect(); + if !txdata.is_empty() { + for confirmable in confirmables { + confirmable.transactions_confirmed(header, &txdata, height); + } + } + Ok(()) +} + +/// Compute the block subsidy (mining reward before fees) at the given block height. +fn block_subsidy(height: u32) -> Amount { + let halvings = height / SUBSIDY_HALVING_INTERVAL; + if halvings >= 64 { + return Amount::ZERO; + } + let base = Amount::ONE_BTC.to_sat() * 50; + Amount::from_sat(base >> halvings) +} + +/// Select a fee rate from sorted block fee rates based on confirmation urgency. +/// +/// For urgent targets (1 block), uses the highest observed fee rate. +/// For medium targets (2-6 blocks), uses the 75th percentile. +/// For standard targets (7-12 blocks), uses the median. +/// For low-urgency targets (13+ blocks), uses the 25th percentile. +fn select_fee_rate_for_target(sorted_rates: &[u64], num_blocks: usize) -> FeeRate { + if sorted_rates.is_empty() { + return FeeRate::from_sat_per_kwu(MIN_FEERATE_SAT_PER_KWU); + } + + let len = sorted_rates.len(); + let idx = if num_blocks <= 1 { + len - 1 + } else if num_blocks <= 6 { + (len * 3) / 4 + } else if num_blocks <= 12 { + len / 2 + } else { + len / 4 + }; + + FeeRate::from_sat_per_kwu(sorted_rates[idx.min(len - 1)]) +} + +#[cfg(test)] +mod tests { + use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; + use bitcoin::{Amount, FeeRate}; + + use super::{block_subsidy, select_fee_rate_for_target, MIN_FEERATE_SAT_PER_KWU}; + use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + }; + + #[test] + fn select_fee_rate_empty_returns_floor() { + let rate = select_fee_rate_for_target(&[], 1); + assert_eq!(rate, FeeRate::from_sat_per_kwu(MIN_FEERATE_SAT_PER_KWU)); + } + + #[test] + fn select_fee_rate_single_element_returns_it_for_all_buckets() { + let rates = [5000u64]; + // Every urgency bucket should return the single available rate. + for num_blocks in [1, 3, 6, 12, 144, 1008] { + let rate = select_fee_rate_for_target(&rates, num_blocks); + assert_eq!( + rate, + FeeRate::from_sat_per_kwu(5000), + "num_blocks={} should return the only available rate", + num_blocks, + ); + } + } + + #[test] + fn select_fee_rate_picks_correct_percentile() { + // 6 sorted rates: indices 0..5 + let rates = [100, 200, 300, 400, 500, 600]; + // 1-block (most urgent): highest → index 5 → 600 + assert_eq!(select_fee_rate_for_target(&rates, 1), FeeRate::from_sat_per_kwu(600)); + // 6-block (medium): 75th percentile → (6*3)/4 = 4 → 500 + assert_eq!(select_fee_rate_for_target(&rates, 6), FeeRate::from_sat_per_kwu(500)); + // 12-block (standard): median → 6/2 = 3 → 400 + assert_eq!(select_fee_rate_for_target(&rates, 12), FeeRate::from_sat_per_kwu(400)); + // 144-block (low): 25th percentile → 6/4 = 1 → 200 + assert_eq!(select_fee_rate_for_target(&rates, 144), FeeRate::from_sat_per_kwu(200)); + } + + #[test] + fn select_fee_rate_monotonic_urgency() { + // More urgent targets should never produce lower rates than less urgent ones. + let rates = [250, 500, 1000, 2000, 4000, 8000]; + let urgent = select_fee_rate_for_target(&rates, 1); + let medium = select_fee_rate_for_target(&rates, 6); + let standard = select_fee_rate_for_target(&rates, 12); + let low = select_fee_rate_for_target(&rates, 144); + + assert!( + urgent >= medium, + "urgent ({}) >= medium ({})", + urgent.to_sat_per_kwu(), + medium.to_sat_per_kwu() + ); + assert!( + medium >= standard, + "medium ({}) >= standard ({})", + medium.to_sat_per_kwu(), + standard.to_sat_per_kwu() + ); + assert!( + standard >= low, + "standard ({}) >= low ({})", + standard.to_sat_per_kwu(), + low.to_sat_per_kwu() + ); + } + + #[test] + fn uniform_rates_match_naive_single_rate() { + // When all blocks have the same fee rate (like the old single-block + // approach), every target should select that same base rate. This + // proves the optimized multi-block approach is backwards-compatible. + + let uniform_rate = 3000u64; + let rates = [uniform_rate; 6]; + for target in get_all_conf_targets() { + let num_blocks = get_num_block_defaults_for_target(target); + let optimized = select_fee_rate_for_target(&rates, num_blocks); + let naive = FeeRate::from_sat_per_kwu(uniform_rate); + assert_eq!( + optimized, naive, + "For target {:?} (num_blocks={}), optimized rate should match naive single-rate", + target, num_blocks, + ); + + // Also verify the post-estimation adjustments produce the same + // result for both approaches. + let adjusted_optimized = apply_post_estimation_adjustments(target, optimized); + let adjusted_naive = apply_post_estimation_adjustments(target, naive); + assert_eq!(adjusted_optimized, adjusted_naive); + } + } + + #[test] + fn block_subsidy_genesis() { + assert_eq!(block_subsidy(0), Amount::from_sat(50 * 100_000_000)); + } + + #[test] + fn block_subsidy_first_halving() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL), Amount::from_sat(25 * 100_000_000)); + } + + #[test] + fn block_subsidy_second_halving() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 2), Amount::from_sat(1_250_000_000)); + } + + #[test] + fn block_subsidy_exhausted_after_64_halvings() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 64), Amount::ZERO); + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 100), Amount::ZERO); + } + + #[test] + fn select_fee_rate_two_elements() { + let rates = [1000, 5000]; + // 1-block: index 1 (highest) → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 1), FeeRate::from_sat_per_kwu(5000)); + // 6-block: (2*3)/4 = 1 → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 6), FeeRate::from_sat_per_kwu(5000)); + // 12-block: 2/2 = 1 → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 12), FeeRate::from_sat_per_kwu(5000)); + // 144-block: 2/4 = 0 → 1000 + assert_eq!(select_fee_rate_for_target(&rates, 144), FeeRate::from_sat_per_kwu(1000)); + } + + #[test] + fn select_fee_rate_all_targets_use_valid_indices() { + for size in 1..=6 { + let rates: Vec = (1..=size).map(|i| i as u64 * 1000).collect(); + for target in get_all_conf_targets() { + let num_blocks = get_num_block_defaults_for_target(target); + let _ = select_fee_rate_for_target(&rates, num_blocks); + } + } + } +} diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 49c011a78..9cfbe4abe 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. pub(crate) mod bitcoind; +mod cbf; mod electrum; mod esplora; @@ -17,11 +18,12 @@ use bitcoin::{Script, Txid}; use lightning::chain::{BestBlock, Filter}; use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient}; +use crate::chain::cbf::CbfChainSource; use crate::chain::electrum::ElectrumChainSource; use crate::chain::esplora::EsploraChainSource; use crate::config::{ - BackgroundSyncConfig, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, - WALLET_SYNC_INTERVAL_MINIMUM_SECS, + BackgroundSyncConfig, BitcoindRestClientConfig, CbfSyncConfig, Config, ElectrumSyncConfig, + EsploraSyncConfig, WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; @@ -93,6 +95,7 @@ enum ChainSourceKind { Esplora(EsploraChainSource), Electrum(ElectrumChainSource), Bitcoind(BitcoindChainSource), + Cbf(CbfChainSource), } impl ChainSource { @@ -184,11 +187,33 @@ impl ChainSource { (Self { kind, registered_txids, tx_broadcaster, logger }, best_block) } + pub(crate) fn new_cbf( + peers: Vec, sync_config: CbfSyncConfig, fee_estimator: Arc, + tx_broadcaster: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, + ) -> (Self, Option) { + let cbf_chain_source = CbfChainSource::new( + peers, + sync_config, + fee_estimator, + kv_store, + config, + Arc::clone(&logger), + node_metrics, + ); + let kind = ChainSourceKind::Cbf(cbf_chain_source); + let registered_txids = Mutex::new(Vec::new()); + (Self { kind, registered_txids, tx_broadcaster, logger }, None) + } + pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> { match &self.kind { ChainSourceKind::Electrum(electrum_chain_source) => { electrum_chain_source.start(runtime)? }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.start(runtime); + }, _ => { // Nothing to do for other chain sources. }, @@ -199,6 +224,9 @@ impl ChainSource { pub(crate) fn stop(&self) { match &self.kind { ChainSourceKind::Electrum(electrum_chain_source) => electrum_chain_source.stop(), + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.stop(); + }, _ => { // Nothing to do for other chain sources. }, @@ -210,6 +238,7 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { Some(bitcoind_chain_source.as_utxo_source()) }, + ChainSourceKind::Cbf { .. } => None, _ => None, } } @@ -223,6 +252,7 @@ impl ChainSource { ChainSourceKind::Esplora(_) => true, ChainSourceKind::Electrum { .. } => true, ChainSourceKind::Bitcoind { .. } => false, + ChainSourceKind::Cbf { .. } => true, } } @@ -289,6 +319,28 @@ impl ChainSource { ) .await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + if let Some(background_sync_config) = + cbf_chain_source.sync_config.background_sync_config.as_ref() + { + self.start_tx_based_sync_loop( + stop_sync_receiver, + onchain_wallet, + channel_manager, + chain_monitor, + output_sweeper, + background_sync_config, + Arc::clone(&self.logger), + ) + .await + } else { + log_info!( + self.logger, + "Background syncing is disabled. Manual syncing required for onchain wallet, lightning wallet, and fee rate updates.", + ); + return; + } + }, } } @@ -368,6 +420,9 @@ impl ChainSource { // `ChainPoller`. So nothing to do here. unreachable!("Onchain wallet will be synced via chain polling") }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.sync_onchain_wallet(onchain_wallet).await + }, } } @@ -393,6 +448,11 @@ impl ChainSource { // `ChainPoller`. So nothing to do here. unreachable!("Lightning wallet will be synced via chain polling") }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source + .sync_lightning_wallet(channel_manager, chain_monitor, output_sweeper) + .await + }, } } @@ -421,6 +481,10 @@ impl ChainSource { ) .await }, + ChainSourceKind::Cbf { .. } => { + // In CBF mode we sync wallets via compact block filters. + unreachable!("Listeners will be synced via compact block filter syncing") + }, } } @@ -435,6 +499,9 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source.update_fee_rate_estimates().await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.update_fee_rate_estimates().await + }, } } @@ -463,6 +530,9 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source.process_broadcast_package(next_package).await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.process_broadcast_package(next_package).await + }, } } } @@ -481,6 +551,9 @@ impl Filter for ChainSource { electrum_chain_source.register_tx(txid, script_pubkey) }, ChainSourceKind::Bitcoind { .. } => (), + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.register_tx(txid, script_pubkey) + }, } } fn register_output(&self, output: lightning::chain::WatchedOutput) { @@ -492,6 +565,7 @@ impl Filter for ChainSource { electrum_chain_source.register_output(output) }, ChainSourceKind::Bitcoind { .. } => (), + ChainSourceKind::Cbf(cbf_chain_source) => cbf_chain_source.register_output(output), } } } diff --git a/src/config.rs b/src/config.rs index 71e4d2314..7f79367d2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -482,6 +482,41 @@ impl Default for ElectrumSyncConfig { } } +/// Configuration for syncing via BIP 157 compact block filters. +/// +/// Background syncing is enabled by default, using the default values specified in +/// [`BackgroundSyncConfig`]. +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct CbfSyncConfig { + /// Background sync configuration. + /// + /// If set to `None`, background syncing will be disabled. Users will need to manually + /// sync via [`Node::sync_wallets`] for the wallets and fee rate updates. + /// + /// [`Node::sync_wallets`]: crate::Node::sync_wallets + pub background_sync_config: Option, + /// Sync timeouts configuration. + pub timeouts_config: SyncTimeoutsConfig, + /// Peer response timeout in seconds for the bip157 P2P node. + /// + /// If a peer does not respond within this duration, the connection may be dropped. + /// Higher values are recommended for slow peers or when downloading many blocks. + /// + /// Defaults to 30 seconds. + pub response_timeout_secs: u64, +} + +impl Default for CbfSyncConfig { + fn default() -> Self { + Self { + background_sync_config: Some(BackgroundSyncConfig::default()), + timeouts_config: SyncTimeoutsConfig::default(), + response_timeout_secs: 30, + } + } +} + /// Configuration for syncing with Bitcoin Core backend via REST. #[derive(Debug, Clone)] pub struct BitcoindRestClientConfig { diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 5a1420882..57d51f718 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -142,7 +142,9 @@ impl VssClientHeaderProvider for VssHeaderProviderAdapter { } use crate::builder::sanitize_alias; -pub use crate::config::{default_config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig}; +pub use crate::config::{ + default_config, CbfSyncConfig, ElectrumSyncConfig, EsploraSyncConfig, TorConfig, +}; pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 0e80a46db..d6c49274b 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -122,6 +122,28 @@ impl Wallet { self.inner.lock().unwrap().start_sync_with_revealed_spks().build() } + pub(crate) fn get_spks_for_cbf_sync(&self, stop_gap: usize) -> Vec { + let wallet = self.inner.lock().unwrap(); + let mut scripts: Vec = + wallet.spk_index().revealed_spks(..).map(|((_, _), spk)| spk).collect(); + + // For first sync when no scripts have been revealed yet, generate + // lookahead scripts up to the stop gap for both keychains. + if scripts.is_empty() { + for keychain in [KeychainKind::External, KeychainKind::Internal] { + for idx in 0..stop_gap as u32 { + scripts.push(wallet.peek_address(keychain, idx).address.script_pubkey()); + } + } + } + + scripts + } + + pub(crate) fn latest_checkpoint(&self) -> bdk_chain::CheckPoint { + self.inner.lock().unwrap().latest_checkpoint() + } + pub(crate) fn get_cached_txs(&self) -> Vec> { self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect() } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7854a77f2..7c475949b 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -26,7 +26,9 @@ use bitcoin::{ use electrsd::corepc_node::{Client as BitcoindClient, Node as BitcoinD}; use electrsd::{corepc_node, ElectrsD}; use electrum_client::ElectrumApi; -use ldk_node::config::{AsyncPaymentsRole, Config, ElectrumSyncConfig, EsploraSyncConfig}; +use ldk_node::config::{ + AsyncPaymentsRole, CbfSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, +}; use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; @@ -222,6 +224,11 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { let mut bitcoind_conf = corepc_node::Conf::default(); bitcoind_conf.network = "regtest"; bitcoind_conf.args.push("-rest"); + + bitcoind_conf.p2p = corepc_node::P2P::Yes; + bitcoind_conf.args.push("-blockfilterindex=1"); + bitcoind_conf.args.push("-peerblockfilters=1"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); let electrs_exe = env::var("ELECTRS_EXE") @@ -326,6 +333,7 @@ pub(crate) enum TestChainSource<'a> { Electrum(&'a ElectrsD), BitcoindRpcSync(&'a BitcoinD), BitcoindRestSync(&'a BitcoinD), + Cbf(&'a BitcoinD), } #[derive(Clone, Copy)] @@ -463,6 +471,12 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> rpc_password, ); }, + TestChainSource::Cbf(bitcoind) => { + let p2p_socket = bitcoind.params.p2p_socket.expect("P2P must be enabled for CBF"); + let peer_addr = format!("{}", p2p_socket); + let sync_config = CbfSyncConfig { background_sync_config: None, ..Default::default() }; + builder.set_chain_source_cbf(vec![peer_addr], Some(sync_config)); + }, } match &config.log_writer { @@ -497,7 +511,10 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> node.start().unwrap(); assert!(node.status().is_running); - assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); + + if !matches!(chain_source, TestChainSource::Cbf(_)) { + assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); + } node } @@ -585,6 +602,24 @@ pub(crate) async fn wait_for_outpoint_spend(electrs: &E, outpoin .await; } +pub(crate) async fn wait_for_cbf_sync(node: &TestNode) { + let before = node.status().latest_onchain_wallet_sync_timestamp; + let mut delay = Duration::from_millis(200); + for _ in 0..30 { + if node.sync_wallets().is_ok() { + let after = node.status().latest_onchain_wallet_sync_timestamp; + if after > before { + return; + } + } + tokio::time::sleep(delay).await; + if delay < Duration::from_secs(2) { + delay = delay.mul_f32(1.5); + } + } + panic!("wait_for_cbf_sync: timed out waiting for CBF sync to complete"); +} + pub(crate) async fn exponential_backoff_poll(mut poll: F) -> T where F: FnMut() -> Option, From 33b88f207de96450d1ecc07f24fc3802d8fcd6b2 Mon Sep 17 00:00:00 2001 From: Yeji Han Date: Wed, 25 Mar 2026 13:59:26 +0900 Subject: [PATCH 2/2] Add CBF integration tests and documentation --- .github/workflows/rust.yml | 6 +- README.md | 3 +- tests/common/mod.rs | 15 +- tests/integration_tests_rust.rs | 344 +++++++++++++++++++++++++++++++- 4 files changed, 361 insertions(+), 7 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 1ccade444..fcda2c83e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -80,7 +80,11 @@ jobs: - name: Test on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest'" run: | - RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test -- --skip cbf + - name: Test CBF on Rust ${{ matrix.toolchain }} + if: "matrix.platform != 'windows-latest'" + run: | + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test cbf -- --test-threads=1 - name: Test with UniFFI support on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest' && matrix.build-uniffi" run: | diff --git a/README.md b/README.md index 0068b6e07..32417242b 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ fn main() { LDK Node currently comes with a decidedly opinionated set of design choices: - On-chain data is handled by the integrated [BDK][bdk] wallet. -- Chain data may currently be sourced from the Bitcoin Core RPC interface, or from an [Electrum][electrum] or [Esplora][esplora] server. +- Chain data may currently be sourced from the Bitcoin Core RPC interface, from an [Electrum][electrum] or [Esplora][esplora] server, or via [compact block filters (BIP 157)][bip157]. - Wallet and channel state may be persisted to an [SQLite][sqlite] database, to file system, or to a custom back-end to be implemented by the user. - Gossip data may be sourced via Lightning's peer-to-peer network or the [Rapid Gossip Sync](https://docs.rs/lightning-rapid-gossip-sync/*/lightning_rapid_gossip_sync/) protocol. - Entropy for the Lightning and on-chain wallets may be sourced from raw bytes or a [BIP39](https://github.com/bitcoin/bips/blob/master/bip-0039.mediawiki) mnemonic. In addition, LDK Node offers the means to generate and persist the entropy bytes to disk. @@ -80,6 +80,7 @@ The Minimum Supported Rust Version (MSRV) is currently 1.85.0. [bdk]: https://bitcoindevkit.org/ [electrum]: https://github.com/spesmilo/electrum-protocol [esplora]: https://github.com/Blockstream/esplora +[bip157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki [sqlite]: https://sqlite.org/ [rust]: https://www.rust-lang.org/ [swift]: https://www.swift.org/ diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7c475949b..1a60493fa 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -245,7 +245,16 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { pub(crate) fn random_chain_source<'a>( bitcoind: &'a BitcoinD, electrsd: &'a ElectrsD, ) -> TestChainSource<'a> { - let r = rand::random_range(0..3); + // Allow forcing a specific backend via LDK_TEST_CHAIN_SOURCE env var. + // Valid values: "esplora", "electrum", "bitcoind-rpc", "bitcoind-rest", "cbf" + let r = match std::env::var("LDK_TEST_CHAIN_SOURCE").ok().as_deref() { + Some("esplora") => 0, + Some("electrum") => 1, + Some("bitcoind-rpc") => 2, + Some("bitcoind-rest") => 3, + Some("cbf") => 4, + _ => rand::random_range(0..5), + }; match r { 0 => { println!("Randomly setting up Esplora chain syncing..."); @@ -263,6 +272,10 @@ pub(crate) fn random_chain_source<'a>( println!("Randomly setting up Bitcoind REST chain syncing..."); TestChainSource::BitcoindRestSync(bitcoind) }, + 4 => { + println!("Randomly setting up CBF compact block filter syncing..."); + TestChainSource::Cbf(bitcoind) + }, _ => unreachable!(), } } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 3fde52dc4..b294a30f9 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -21,10 +21,11 @@ use common::{ expect_channel_pending_event, expect_channel_ready_event, expect_channel_ready_events, expect_event, expect_payment_claimable_event, expect_payment_received_event, expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, - open_channel, open_channel_push_amt, open_channel_with_all, premine_and_distribute_funds, - premine_blocks, prepare_rbf, random_chain_source, random_config, random_listening_addresses, - setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, - wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, + open_channel, open_channel_push_amt, open_channel_with_all, + premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, + random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, + setup_two_nodes, splice_in_with_all, wait_for_cbf_sync, wait_for_tx, + TestChainSource, TestStoreType, TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -2805,3 +2806,338 @@ async fn splice_in_with_all_balance() { node_a.stop().unwrap(); node_b.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn start_stop_cbf() { + let (bitcoind, _electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + assert!(node.status().is_running); + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + + node.start().unwrap(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + assert!(node.status().is_running); + + node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn fee_rate_estimation_after_manual_sync_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + let addr = node.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr], + Amount::from_sat(100_000), + ) + .await; + + wait_for_cbf_sync(&node).await; + let first_fee_update = node.status().latest_fee_rate_cache_update_timestamp; + assert!(first_fee_update.is_some()); + + // Subsequent manual syncs should keep the fee cache populated. + node.sync_wallets().unwrap(); + let second_fee_update = node.status().latest_fee_rate_cache_update_timestamp; + assert!(second_fee_update.is_some()); + assert!(second_fee_update >= first_fee_update); + + node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn repeated_manual_sync_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + let addr = node.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 100_000; + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&node).await; + assert_eq!(node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Regression: the second manual sync must not block forever. + node.sync_wallets().unwrap(); + assert_eq!(node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn start_stop_reinit_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let config = random_config(true); + + let p2p_socket = bitcoind.params.p2p_socket.expect("P2P must be enabled for CBF"); + let peer_addr = format!("{}", p2p_socket); + let sync_config = ldk_node::config::CbfSyncConfig { + background_sync_config: None, + ..Default::default() + }; + + let test_sync_store = TestSyncStore::new(config.node_config.storage_dir_path.clone().into()); + + setup_builder!(builder, config.node_config); + builder.set_chain_source_cbf(vec![peer_addr.clone()], Some(sync_config.clone())); + + let node = builder + .build_with_store(config.node_entropy.clone().into(), test_sync_store.clone()) + .unwrap(); + node.start().unwrap(); + + let expected_node_id = node.node_id(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + + let funding_address = node.onchain_payment().new_address().unwrap(); + assert_eq!(node.list_balances().total_onchain_balance_sats, 0); + + let expected_amount = Amount::from_sat(100_000); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![funding_address], + expected_amount, + ) + .await; + + wait_for_cbf_sync(&node).await; + assert_eq!(node.list_balances().spendable_onchain_balance_sats, expected_amount.to_sat()); + + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + + node.start().unwrap(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + drop(node); + + // Reinitialize from the same config and store. + setup_builder!(builder, config.node_config); + builder.set_chain_source_cbf(vec![peer_addr], Some(sync_config)); + + let reinitialized_node = + builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + reinitialized_node.start().unwrap(); + assert_eq!(reinitialized_node.node_id(), expected_node_id); + + // Balance should be persisted from the previous run. + assert_eq!( + reinitialized_node.list_balances().spendable_onchain_balance_sats, + expected_amount.to_sat() + ); + + wait_for_cbf_sync(&reinitialized_node).await; + assert_eq!( + reinitialized_node.list_balances().spendable_onchain_balance_sats, + expected_amount.to_sat() + ); + + reinitialized_node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_wallet_recovery_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + + let original_config = random_config(true); + let original_node_entropy = original_config.node_entropy.clone(); + let original_node = setup_node(&chain_source, original_config); + + let premine_amount_sat = 100_000; + + let addr_1 = original_node.onchain_payment().new_address().unwrap(); + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_1], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&original_node).await; + assert_eq!(original_node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + let addr_2 = original_node.onchain_payment().new_address().unwrap(); + + let txid = bitcoind + .client + .send_to_address(&addr_2, Amount::from_sat(premine_amount_sat)) + .unwrap() + .0 + .parse() + .unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + + wait_for_cbf_sync(&original_node).await; + assert_eq!( + original_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 2 + ); + + original_node.stop().unwrap(); + drop(original_node); + + // Now we start from scratch, only the seed remains the same. + let mut recovered_config = random_config(true); + recovered_config.node_entropy = original_node_entropy; + recovered_config.recovery_mode = true; + let recovered_node = setup_node(&chain_source, recovered_config); + + wait_for_cbf_sync(&recovered_node).await; + assert_eq!( + recovered_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 2 + ); + + // Check we sync even when skipping some addresses. + let _addr_3 = recovered_node.onchain_payment().new_address().unwrap(); + let _addr_4 = recovered_node.onchain_payment().new_address().unwrap(); + let _addr_5 = recovered_node.onchain_payment().new_address().unwrap(); + let addr_6 = recovered_node.onchain_payment().new_address().unwrap(); + + let txid = bitcoind + .client + .send_to_address(&addr_6, Amount::from_sat(premine_amount_sat)) + .unwrap() + .0 + .parse() + .unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + + wait_for_cbf_sync(&recovered_node).await; + assert_eq!( + recovered_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 3 + ); + + recovered_node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_send_receive_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + let premine_amount_sat = 1_100_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a.clone(), addr_b.clone()], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + assert_eq!(node_b.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Check on-chain payment tracking after premine. + let node_a_payments = node_a.list_payments(); + let node_b_payments = node_b.list_payments(); + for payments in [&node_a_payments, &node_b_payments] { + assert_eq!(payments.len(), 1); + } + for p in [node_a_payments.first().unwrap(), node_b_payments.first().unwrap()] { + assert_eq!(p.amount_msat, Some(premine_amount_sat * 1000)); + assert_eq!(p.direction, PaymentDirection::Inbound); + assert_eq!(p.status, PaymentStatus::Pending); + match p.kind { + PaymentKind::Onchain { status, .. } => { + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + } + + // Send from B to A. + let amount_to_send_sats = 54_321; + let txid = + node_b.onchain_payment().send_to_address(&addr_a, amount_to_send_sats, None).unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + // Mine the transaction so CBF can see it. + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + + let payment_id = PaymentId(txid.to_byte_array()); + let payment_a = node_a.payment(&payment_id).unwrap(); + match payment_a.kind { + PaymentKind::Onchain { txid: tx, status } => { + assert_eq!(tx, txid); + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + assert!(payment_a.fee_paid_msat > Some(0)); + assert_eq!(payment_a.amount_msat, Some(amount_to_send_sats * 1000)); + + let payment_b = node_b.payment(&payment_id).unwrap(); + match payment_b.kind { + PaymentKind::Onchain { txid: tx, status } => { + assert_eq!(tx, txid); + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + assert!(payment_b.fee_paid_msat > Some(0)); + assert_eq!(payment_b.amount_msat, Some(amount_to_send_sats * 1000)); + assert_eq!(payment_a.fee_paid_msat, payment_b.fee_paid_msat); + + let onchain_fee_buffer_sat = 1000; + let expected_node_a_balance = premine_amount_sat + amount_to_send_sats; + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, expected_node_a_balance); + assert!( + node_b.list_balances().spendable_onchain_balance_sats + > premine_amount_sat - amount_to_send_sats - onchain_fee_buffer_sat + ); + assert!( + node_b.list_balances().spendable_onchain_balance_sats + < premine_amount_sat - amount_to_send_sats + ); + + // Test send_all_to_address. + let addr_b2 = node_b.onchain_payment().new_address().unwrap(); + let txid = node_a.onchain_payment().send_all_to_address(&addr_b2, false, None).unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, 0); + assert_eq!(node_a.list_balances().total_onchain_balance_sats, 0); + assert!(node_b.list_balances().spendable_onchain_balance_sats > premine_amount_sat); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +}