diff --git a/lightning-liquidity/tests/common/mod.rs b/lightning-liquidity/tests/common/mod.rs index dea987527ad..08305d236a2 100644 --- a/lightning-liquidity/tests/common/mod.rs +++ b/lightning-liquidity/tests/common/mod.rs @@ -6,7 +6,7 @@ use lightning_liquidity::{LiquidityClientConfig, LiquidityManagerSync, Liquidity use lightning::chain::{BestBlock, Filter}; use lightning::ln::channelmanager::ChainParameters; use lightning::ln::functional_test_utils::{Node, TestChannelManager}; -use lightning::util::test_utils::{TestBroadcaster, TestKeysInterface, TestStore}; +use lightning::util::test_utils::{TestBroadcaster, TestKeysInterface, TestLogger, TestStore}; use bitcoin::Network; @@ -47,6 +47,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>( Some(service_config), None, Arc::clone(&time_provider), + service_inner.logger, ) .unwrap(); @@ -61,6 +62,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>( None, Some(client_config), time_provider, + client_inner.logger, ) .unwrap(); @@ -141,6 +143,7 @@ pub(crate) struct LiquidityNode<'a, 'b, 'c> { Arc, Arc, &'c TestBroadcaster, + &'c TestLogger, >, } @@ -155,6 +158,7 @@ impl<'a, 'b, 'c> LiquidityNode<'a, 'b, 'c> { Arc, Arc, &'c TestBroadcaster, + &'c TestLogger, >, ) -> Self { Self { inner: node, liquidity_manager } diff --git a/lightning-net-tokio/Cargo.toml b/lightning-net-tokio/Cargo.toml index a911b3d22be..1d3fbf2a551 100644 --- a/lightning-net-tokio/Cargo.toml +++ b/lightning-net-tokio/Cargo.toml @@ -15,13 +15,17 @@ edition = "2021" all-features = true rustdoc-args = ["--cfg", "docsrs"] +[features] +socks = ["tokio-socks"] + [dependencies] bitcoin = "0.32.2" lightning = { version = "0.2.0", path = "../lightning" } tokio = { version = "1.35", features = [ "rt", "sync", "net", "time" ] } +tokio-socks = { version = "0.5", optional = true } [dev-dependencies] -tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] } +tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time", "io-util" ] } lightning = { version = "0.2.0", path = "../lightning", features = ["_test_utils"] } [lints] diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 068f77a84bb..bf5a539e0a2 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -470,6 +470,33 @@ where } } +/// Like [`connect_outbound`], but routes the TCP connection through a SOCKS5 proxy. +/// +/// `proxy_addr` is the address of the SOCKS5 proxy (e.g. `127.0.0.1:1080`). +/// The proxy connects to `addr` on our behalf, then we hand the resulting stream to +/// [`setup_outbound`]. +/// +/// Available only when the `socks` feature is enabled. +#[cfg(feature = "socks")] +#[cfg_attr(docsrs, doc(cfg(feature = "socks")))] +pub async fn connect_outbound_via_socks5( + peer_manager: PM, their_node_id: PublicKey, addr: SocketAddr, proxy_addr: SocketAddr, +) -> Option> +where + PM::Target: APeerManager, +{ + let connect_fut = async { + tokio_socks::tcp::Socks5Stream::connect(proxy_addr, addr) + .await + .map(|s| s.into_inner().into_std().unwrap()) + }; + if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await { + Some(setup_outbound(peer_manager, their_node_id, stream)) + } else { + None + } +} + const SOCK_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( clone_socket_waker, wake_socket_waker, @@ -949,4 +976,204 @@ mod tests { async fn unthreaded_race_disconnect_accept() { race_disconnect_accept().await; } + + #[cfg(feature = "socks")] + mod socks_tests { + use super::*; + + /// Minimal SOCKS5 proxy: no auth, CONNECT to IPv4 only. Good enough for tests. + async fn run_socks5_proxy(listener: tokio::net::TcpListener) { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; + + loop { + let (mut client, _) = match listener.accept().await { + Ok(c) => c, + Err(_) => return, + }; + tokio::spawn(async move { + // Greeting: client sends [version, nmethods, methods...] + let mut buf = [0u8; 258]; + client.read_exact(&mut buf[..2]).await.ok()?; + let nmethods = buf[1] as usize; + client.read_exact(&mut buf[..nmethods]).await.ok()?; + // Reply: no auth + client.write_all(&[0x05, 0x00]).await.ok()?; + + // Connect request: [version, cmd, rsv, atype, addr..., port] + client.read_exact(&mut buf[..4]).await.ok()?; + let atype = buf[3]; + let target: std::net::SocketAddr = match atype { + 0x01 => { + // IPv4 + client.read_exact(&mut buf[..6]).await.ok()?; + let ip = std::net::Ipv4Addr::new(buf[0], buf[1], buf[2], buf[3]); + let port = u16::from_be_bytes([buf[4], buf[5]]); + (ip, port).into() + }, + _ => return None, // unsupported + }; + + let mut upstream = TcpStream::connect(target).await.ok()?; + // Reply: success, bound addr 0.0.0.0:0 + client.write_all(&[0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0]).await.ok()?; + // Shovel bytes + tokio::io::copy_bidirectional(&mut client, &mut upstream).await.ok()?; + Some(()) + }); + } + } + + async fn do_socks5_connection_test() { + let secp_ctx = Secp256k1::new(); + let a_key = SecretKey::from_slice(&[1; 32]).unwrap(); + let b_key = SecretKey::from_slice(&[1; 32]).unwrap(); + let a_pub = PublicKey::from_secret_key(&secp_ctx, &a_key); + let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key); + + let (a_connected_sender, mut a_connected) = mpsc::channel(1); + let (a_disconnected_sender, mut a_disconnected) = mpsc::channel(1); + let a_handler = Arc::new(MsgHandler { + expected_pubkey: b_pub, + pubkey_connected: a_connected_sender, + pubkey_disconnected: a_disconnected_sender, + disconnected_flag: AtomicBool::new(false), + msg_events: Mutex::new(Vec::new()), + }); + let a_manager = Arc::new(PeerManager::new( + MessageHandler { + chan_handler: Arc::clone(&a_handler), + route_handler: Arc::clone(&a_handler), + onion_message_handler: Arc::new(IgnoringMessageHandler {}), + custom_message_handler: Arc::new(IgnoringMessageHandler {}), + send_only_message_handler: Arc::new(IgnoringMessageHandler {}), + }, + 0, + &[1; 32], + Arc::new(TestLogger()), + Arc::new(TestNodeSigner::new(a_key)), + )); + + let (b_connected_sender, mut b_connected) = mpsc::channel(1); + let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); + let b_handler = Arc::new(MsgHandler { + expected_pubkey: a_pub, + pubkey_connected: b_connected_sender, + pubkey_disconnected: b_disconnected_sender, + disconnected_flag: AtomicBool::new(false), + msg_events: Mutex::new(Vec::new()), + }); + let b_manager = Arc::new(PeerManager::new( + MessageHandler { + chan_handler: Arc::clone(&b_handler), + route_handler: Arc::clone(&b_handler), + onion_message_handler: Arc::new(IgnoringMessageHandler {}), + custom_message_handler: Arc::new(IgnoringMessageHandler {}), + send_only_message_handler: Arc::new(IgnoringMessageHandler {}), + }, + 0, + &[2; 32], + Arc::new(TestLogger()), + Arc::new(TestNodeSigner::new(b_key)), + )); + + // Start SOCKS5 proxy on a random port. + let proxy_listener = + tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let proxy_addr = proxy_listener.local_addr().unwrap(); + tokio::spawn(run_socks5_proxy(proxy_listener)); + + // Listener for the inbound side of the LN connection. + let inbound_listener = + tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let inbound_addr = inbound_listener.local_addr().unwrap(); + + // Outbound: connect through the proxy. Spawned because we need to + // concurrently accept on the inbound listener. + let a_mgr = Arc::clone(&a_manager); + let outbound_handle = tokio::spawn(async move { + super::super::connect_outbound_via_socks5( + a_mgr, b_pub, inbound_addr, proxy_addr, + ) + .await + }); + + // Accept the proxied TCP connection and hand it to setup_inbound. + let (inbound_stream, _) = inbound_listener.accept().await.unwrap(); + let inbound_std = inbound_stream.into_std().unwrap(); + let fut_b = super::super::setup_inbound(b_manager, inbound_std); + + let outbound_result = outbound_handle.await.unwrap(); + assert!(outbound_result.is_some(), "SOCKS5 outbound connection failed"); + let fut_a = outbound_result.unwrap(); + + // Both peers should see each other. + tokio::time::timeout(Duration::from_secs(10), a_connected.recv()) + .await + .unwrap(); + tokio::time::timeout(Duration::from_secs(1), b_connected.recv()) + .await + .unwrap(); + + // Disconnect. + a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError { + node_id: b_pub, + action: ErrorAction::DisconnectPeer { msg: None }, + }); + a_manager.process_events(); + tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv()) + .await + .unwrap(); + tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv()) + .await + .unwrap(); + assert!(a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(b_handler.disconnected_flag.load(Ordering::SeqCst)); + + fut_a.await; + fut_b.await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn socks5_connection_test() { + do_socks5_connection_test().await; + } + + /// Connecting through a dead proxy must return None, not silently + /// bypass the proxy. This catches regressions where the proxy + /// config is accidentally ignored. + #[tokio::test(flavor = "multi_thread")] + async fn socks5_dead_proxy_returns_none() { + let secp_ctx = Secp256k1::new(); + let a_key = SecretKey::from_slice(&[1; 32]).unwrap(); + let b_key = SecretKey::from_slice(&[2; 32]).unwrap(); + let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key); + + let a_manager = Arc::new(PeerManager::new( + MessageHandler { + chan_handler: Arc::new( + lightning::ln::peer_handler::ErroringMessageHandler::new(), + ), + route_handler: Arc::new(IgnoringMessageHandler {}), + onion_message_handler: Arc::new(IgnoringMessageHandler {}), + custom_message_handler: Arc::new(IgnoringMessageHandler {}), + send_only_message_handler: Arc::new(IgnoringMessageHandler {}), + }, + 0, + &[1; 32], + Arc::new(TestLogger()), + Arc::new(TestNodeSigner::new(a_key)), + )); + + // Port 1 on loopback: nothing should be listening there. + let dead_proxy: std::net::SocketAddr = "127.0.0.1:1".parse().unwrap(); + let target: std::net::SocketAddr = "127.0.0.1:9735".parse().unwrap(); + + let result = super::super::connect_outbound_via_socks5( + a_manager, b_pub, target, dead_proxy, + ) + .await; + assert!(result.is_none(), "dead proxy should yield None"); + } + } }