Skip to content

balance_mutex released immediately in do_onchain_rebalance allows concurrent channel-open / splice #61

@benthecarman

Description

@benthecarman

Severity: HIGH · CWE: CWE-667
Location: graduated-rebalancer/src/lib.rs:344
Scanner: llm-code-review · Fingerprint: 1483bfc2bcd221f4…

Description

do_onchain_rebalance (line 343-391) is supposed to be serialized by balance_mutex the same way do_trusted_rebalance is. The trusted path correctly writes let _lock = self.balance_mutex.lock().await; (line 274), binding the guard to a named local so it is held until the function returns. The on-chain path instead writes let _ = self.balance_mutex.lock().await; (line 344). In Rust, let _ = expr is a wildcard pattern that drops expr at the end of the statement, so the MutexGuard is released immediately — the await yields long enough for one task to take the lock, but the guard is gone before the function does any real work.

Impact: two concurrent callers of do_onchain_rebalance_if_needed (or any caller racing a do_trusted_rebalance after it releases the lock) can both pass this point and simultaneously invoke open_channel_with_lsp / splice_to_lsp_channel, both of which spend "all available on-chain funds (minus fees and anchor reserves)" per the trait docs (lines 113-114, 124-125). The result is two channel-open or splice attempts contending for the same UTXOs, leading to a double-spend race, a failed/stuck on-chain transaction, or two simultaneous OnChainRebalanceInitiated events for what should be one rebalance. Because this depends on the downstream LightningWallet implementation's behavior under concurrent calls — which we cannot read from this worktree — the worst-case outcome (lost on-chain funds vs. one transaction simply erroring out) is implementation-dependent; the locking-invariant violation itself is unambiguous.

Repro: the regression test spawns two do_onchain_rebalance_if_needed calls on a multi-thread runtime against a mock LightningWallet that counts concurrent open_channel_with_lsp calls. With the lock guard correctly bound, the observed concurrency is 1; with the current code it is 2.

Proof of concept (regression test)

diff --git a/graduated-rebalancer/Cargo.toml b/graduated-rebalancer/Cargo.toml
--- a/graduated-rebalancer/Cargo.toml
+++ b/graduated-rebalancer/Cargo.toml
@@ -11,3 +11,6 @@
 lightning = { workspace = true }
 lightning-invoice = { workspace = true }
 tokio = { version = "1", default-features = false }
+
+[dev-dependencies]
+tokio = { version = "1", default-features = false, features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }
diff --git a/graduated-rebalancer/src/lib.rs b/graduated-rebalancer/src/lib.rs
--- a/graduated-rebalancer/src/lib.rs
+++ b/graduated-rebalancer/src/lib.rs
@@ -395,4 +395,160 @@
 		log_debug!(self.logger, "Waiting for balance mutex...");
 		let _ = self.balance_mutex.lock().await;
 	}
 }
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use lightning::bitcoin::hashes::Hash;
+	use lightning::bitcoin::Txid;
+	use lightning::util::logger::Record;
+	use std::sync::atomic::{AtomicUsize, Ordering};
+	use std::time::Duration;
+
+	struct TestLogger;
+	impl Logger for TestLogger {
+		fn log(&self, _record: Record) {}
+	}
+
+	#[derive(Debug)]
+	struct TestError;
+
+	struct NoopTrustedWallet;
+	impl TrustedWallet for NoopTrustedWallet {
+		type Error = TestError;
+		fn get_balance(
+			&self,
+		) -> Pin<Box<dyn Future<Output = Result<Amount, Self::Error>> + Send + '_>> {
+			Box::pin(async { Ok(Amount::from_sats(0).expect("valid")) })
+		}
+		fn get_bolt11_invoice(
+			&self, _amount: Option<Amount>,
+		) -> Pin<Box<dyn Future<Output = Result<Bolt11Invoice, Self::Error>> + Send + '_>> {
+			Box::pin(async { Err(TestError) })
+		}
+		fn pay(
+			&self, _method: PaymentMethod, _amount: Amount,
+		) -> Pin<Box<dyn Future<Output = Result<[u8; 32], Self::Error>> + Send + '_>> {
+			Box::pin(async { Err(TestError) })
+		}
+		fn await_payment_success(
+			&self, _payment_hash: [u8; 32],
+		) -> Pin<Box<dyn Future<Output = Option<ReceivedLightningPayment>> + Send + '_>> {
+			Box::pin(async { None })
+		}
+	}
+
+	struct CountingLnWallet {
+		in_flight: Arc<AtomicUsize>,
+		max_observed: Arc<AtomicUsize>,
+	}
+	impl LightningWallet for CountingLnWallet {
+		type Error = TestError;
+		fn get_balance(&self) -> LightningBalance {
+			LightningBalance {
+				lightning: Amount::from_sats(0).expect("valid"),
+				onchain: Amount::from_sats(100_000).expect("valid"),
+			}
+		}
+		fn get_bolt11_invoice(
+			&self, _amount: Option<Amount>,
+		) -> Pin<Box<dyn Future<Output = Result<Bolt11Invoice, Self::Error>> + Send + '_>> {
+			Box::pin(async { Err(TestError) })
+		}
+		fn pay(
+			&self, _method: PaymentMethod, _amount: Amount,
+		) -> Pin<Box<dyn Future<Output = Result<[u8; 32], Self::Error>> + Send + '_>> {
+			Box::pin(async { Err(TestError) })
+		}
+		fn await_payment_receipt(
+			&self, _payment_hash: [u8; 32],
+		) -> Pin<Box<dyn Future<Output = Option<ReceivedLightningPayment>> + Send + '_>> {
+			Box::pin(async { None })
+		}
+		fn has_channel_with_lsp(&self) -> bool {
+			false
+		}
+		fn open_channel_with_lsp(
+			&self,
+		) -> Pin<Box<dyn Future<Output = Result<u128, Self::Error>> + Send + '_>> {
+			let in_flight = self.in_flight.clone();
+			let max_observed = self.max_observed.clone();
+			Box::pin(async move {
+				let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
+				let mut prev = max_observed.load(Ordering::SeqCst);
+				while cur > prev {
+					match max_observed.compare_exchange(
+						prev,
+						cur,
+						Ordering::SeqCst,
+						Ordering::SeqCst,
+					) {
+						Ok(_) => break,
+						Err(v) => prev = v,
+					}
+				}
+				tokio::time::sleep(Duration::from_millis(150)).await;
+				in_flight.fetch_sub(1, Ordering::SeqCst);
+				Ok(0u128)
+			})
+		}
+		fn await_channel_pending(
+			&self, _channel_id: u128,
+		) -> Pin<Box<dyn Future<Output = OutPoint> + Send + '_>> {
+			Box::pin(async {
+				OutPoint { txid: Txid::from_byte_array([0u8; 32]), vout: 0 }
+			})
+		}
+		fn splice_to_lsp_channel(
+			&self,
+		) -> Pin<Box<dyn Future<Output = Result<u128, Self::Error>> + Send + '_>> {
+			Box::pin(async { Err(TestError) })
+		}
+		fn await_splice_pending(
+			&self, _channel_id: u128,
+		) -> Pin<Box<dyn Future<Output = OutPoint> + Send + '_>> {
+			Box::pin(async {
+				OutPoint { txid: Txid::from_byte_array([0u8; 32]), vout: 0 }
+			})
+		}
+	}
+
+	struct OnchainOnlyTrigger;
+	impl RebalanceTrigger for OnchainOnlyTrigger {
+		fn needs_trusted_rebalance(
+			&self,
+		) -> impl Future<Output = Option<TriggerParams>> + Send {
+			std::future::ready(None)
+		}
+		fn needs_onchain_rebalance(
+			&self,
+		) -> impl Future<Output = Option<TriggerParams>> + Send {
+			std::future::ready(Some(TriggerParams {
+				id: [1u8; 32],
+				amount: Amount::from_sats(50_000).expect("valid"),
+			}))
+		}
+	}
+
+	#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+	async fn onchain_rebalance_lock_must_serialize_concurrent_callers() {
+		let in_flight = Arc::new(AtomicUsize::new(0));
+		let max_observed = Arc::new(AtomicUsize::new(0));
+		let ln = Arc::new(CountingLnWallet {
+			in_flight: in_flight.clone(),
+			max_observed: max_observed.clone(),
+		});
+		let rebalancer = Arc::new(GraduatedRebalancer::new(
+			Arc::new(NoopTrustedWallet),
+			ln,
+			Arc::new(OnchainOnlyTrigger),
+			Arc::new(IgnoringEventHandler),
+			Arc::new(TestLogger),
+		));
+		let r1 = rebalancer.clone();
+		let r2 = rebalancer.clone();
+		let t1 = tokio::spawn(async move { r1.do_onchain_rebalance_if_needed().await });
+		let t2 = tokio::spawn(async move { r2.do_onchain_rebalance_if_needed().await });
+		let _ = t1.await;
+		let _ = t2.await;
+		assert_eq!(max_observed.load(Ordering::SeqCst), 1, "balance_mutex must serialize concurrent on-chain rebalances");
+	}
+}

Reported by loupe scan, finding #1 (repo 1, job 3)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions