Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub(crate) const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
// The time in-between peer reconnection attempts.
pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(60);

// The upper bound on the per-peer exponential backoff applied to failed reconnection attempts.
pub(crate) const PEER_RECONNECTION_MAX_INTERVAL: Duration = Duration::from_secs(60 * 30);

// The time in-between RGS sync attempts.
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

Expand Down
138 changes: 135 additions & 3 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,51 @@
use std::collections::hash_map::{self, HashMap};
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};

use bitcoin::secp256k1::PublicKey;
use lightning::ln::msgs::SocketAddress;

use crate::config::TorConfig;
use crate::config::{TorConfig, PEER_RECONNECTION_INTERVAL, PEER_RECONNECTION_MAX_INTERVAL};
use crate::logger::{log_debug, log_error, log_info, LdkLogger};
use crate::types::{KeysManager, PeerManager};
use crate::Error;

struct PeerReconnectState {
consecutive_failures: u32,
next_retry_at: Instant,
next_backoff: Duration,
}

impl PeerReconnectState {
fn new(now: Instant) -> Self {
Self {
consecutive_failures: 0,
next_retry_at: now,
next_backoff: PEER_RECONNECTION_INTERVAL,
}
}

/// Bumps the failure count, schedules `next_retry_at` to `now + current backoff`,
/// and doubles the backoff for the following failure (capped at
/// [`PEER_RECONNECTION_MAX_INTERVAL`]). Returns the backoff that was scheduled.
fn record_failure(&mut self, now: Instant) -> Duration {
self.consecutive_failures = self.consecutive_failures.saturating_add(1);
let scheduled_backoff = self.next_backoff;
self.next_retry_at = now + scheduled_backoff;
self.next_backoff =
std::cmp::min(scheduled_backoff.saturating_mul(2), PEER_RECONNECTION_MAX_INTERVAL);
scheduled_backoff
}
}

pub(crate) struct ConnectionManager<L: Deref + Clone + Sync + Send>
where
L::Target: LdkLogger,
{
pending_connections:
Mutex<HashMap<PublicKey, Vec<tokio::sync::oneshot::Sender<Result<(), Error>>>>>,
reconnect_state: Mutex<HashMap<PublicKey, PeerReconnectState>>,
peer_manager: Arc<PeerManager>,
tor_proxy_config: Option<TorConfig>,
keys_manager: Arc<KeysManager>,
Expand All @@ -39,8 +68,60 @@ where
keys_manager: Arc<KeysManager>, logger: L,
) -> Self {
let pending_connections = Mutex::new(HashMap::new());
let reconnect_state = Mutex::new(HashMap::new());

Self {
pending_connections,
reconnect_state,
peer_manager,
tor_proxy_config,
keys_manager,
logger,
}
}

/// Returns whether the background reconnection task should attempt to reconnect
/// to `node_id` now, based on per-peer exponential backoff state.
pub(crate) fn is_reconnect_due(&self, node_id: &PublicKey) -> bool {
self.reconnect_state
.lock()
.expect("lock")
.get(node_id)
.map_or(true, |state| Instant::now() >= state.next_retry_at)
}

/// Records the outcome of a reconnection attempt and updates per-peer backoff.
///
/// On success, any existing backoff state is cleared. On failure, the per-peer
/// retry interval is doubled (up to [`PEER_RECONNECTION_MAX_INTERVAL`]) and the
/// next retry is scheduled accordingly.
pub(crate) fn record_reconnect_attempt(&self, node_id: &PublicKey, result: &Result<(), Error>) {
let mut state_lock = self.reconnect_state.lock().expect("lock");
if result.is_ok() {
if state_lock.remove(node_id).is_some() {
log_debug!(self.logger, "Cleared reconnection backoff for peer {}", node_id);
}
return;
}

Self { pending_connections, peer_manager, tor_proxy_config, keys_manager, logger }
let now = Instant::now();
let state = state_lock.entry(*node_id).or_insert_with(|| PeerReconnectState::new(now));
let scheduled_backoff = state.record_failure(now);

log_debug!(
self.logger,
"Reconnection to peer {} failed ({} consecutive failures); next retry in {}s",
node_id,
state.consecutive_failures,
scheduled_backoff.as_secs(),
);
}

/// Removes any per-peer backoff state for `node_id`, so a subsequent attempt
/// is treated as a fresh first try. Called when a peer is removed from the
/// persisted peer store.
pub(crate) fn clear_reconnect_state(&self, node_id: &PublicKey) {
self.reconnect_state.lock().expect("lock").remove(node_id);
}

pub(crate) async fn connect_peer_if_necessary(
Expand All @@ -57,6 +138,11 @@ where
&self, node_id: PublicKey, addr: SocketAddress,
) -> Result<(), Error> {
let res = self.do_connect_peer_internal(node_id, addr).await;
if res.is_ok() {
// Any successful connect (including user-initiated ones) resets backoff so the
// background reconnection loop retries promptly if the peer drops again.
self.clear_reconnect_state(&node_id);
}
self.propagate_result_to_subscribers(&node_id, res);
res
}
Expand Down Expand Up @@ -273,3 +359,49 @@ where
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn reconnect_state_doubles_until_capped() {
let start = Instant::now();
let mut state = PeerReconnectState::new(start);

let scheduled = state.record_failure(start);
assert_eq!(scheduled, PEER_RECONNECTION_INTERVAL);
assert_eq!(state.consecutive_failures, 1);
assert_eq!(state.next_retry_at, start + PEER_RECONNECTION_INTERVAL);

let mut expected = PEER_RECONNECTION_INTERVAL;
for failure_count in 2..32 {
expected = std::cmp::min(expected.saturating_mul(2), PEER_RECONNECTION_MAX_INTERVAL);
let scheduled = state.record_failure(start);
assert_eq!(scheduled, expected);
assert_eq!(state.consecutive_failures, failure_count);
assert_eq!(state.next_retry_at, start + expected);
assert!(state.next_backoff <= PEER_RECONNECTION_MAX_INTERVAL);
}

// Once capped, further failures stay at the cap.
assert_eq!(state.next_backoff, PEER_RECONNECTION_MAX_INTERVAL);
let scheduled = state.record_failure(start);
assert_eq!(scheduled, PEER_RECONNECTION_MAX_INTERVAL);
assert_eq!(state.next_backoff, PEER_RECONNECTION_MAX_INTERVAL);
}

#[test]
fn reconnect_state_schedules_relative_to_failure_time() {
let t0 = Instant::now();
let mut state = PeerReconnectState::new(t0);

let _ = state.record_failure(t0);
assert_eq!(state.next_retry_at, t0 + PEER_RECONNECTION_INTERVAL);

let t1 = t0 + Duration::from_secs(5);
let scheduled = state.record_failure(t1);
assert_eq!(scheduled, PEER_RECONNECTION_INTERVAL * 2);
assert_eq!(state.next_retry_at, t1 + PEER_RECONNECTION_INTERVAL * 2);
}
}
17 changes: 15 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,22 @@ impl Node {
.map(|peer| peer.counterparty_node_id)
.collect::<Vec<_>>();

for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) {
let _ = connect_cm.do_connect_peer(
for peer_info in connect_peer_store.list_peers().iter() {
if pm_peers.contains(&peer_info.node_id) {
// A connected peer (e.g., via an inbound connection) is
// proven reachable: reset any backoff so a future
// disconnect is retried promptly again.
connect_cm.clear_reconnect_state(&peer_info.node_id);
continue;
}
if !connect_cm.is_reconnect_due(&peer_info.node_id) {
continue;
}
let res = connect_cm.do_connect_peer(
peer_info.node_id,
peer_info.address.clone(),
).await;
connect_cm.record_reconnect_attempt(&peer_info.node_id, &res);
}
}
}
Expand Down Expand Up @@ -1144,6 +1155,7 @@ impl Node {
log_error!(self.logger, "Failed to remove peer {}: {}", counterparty_node_id, e)
},
}
self.connection_manager.clear_reconnect_state(&counterparty_node_id);

self.peer_manager.disconnect_by_node_id(counterparty_node_id);
Ok(())
Expand Down Expand Up @@ -1862,6 +1874,7 @@ impl Node {
// Check if this was the last open channel, if so, forget the peer.
if open_channels.len() == 1 {
self.peer_store.remove_peer(&counterparty_node_id)?;
self.connection_manager.clear_reconnect_state(&counterparty_node_id);
}
}

Expand Down