Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 14 additions & 30 deletions dash-spv-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::mem::forget;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

/// FFI wrapper around `DashSpvClient`.
type InnerClient = DashSpvClient<
Expand All @@ -26,7 +25,6 @@ pub struct FFIDashSpvClient {
pub(crate) inner: InnerClient,
pub(crate) runtime: Arc<Runtime>,
run_task: Mutex<Option<JoinHandle<()>>>,
shutdown_token: CancellationToken,
}

impl FFIDashSpvClient {
Expand Down Expand Up @@ -113,7 +111,6 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new(
inner: client,
runtime,
run_task: Mutex::new(None),
shutdown_token: CancellationToken::new(),
};
Box::into_raw(Box::new(ffi_client))
}
Expand All @@ -130,9 +127,10 @@ const RUN_TASK_SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from
impl FFIDashSpvClient {
/// Wait for the run task to finish cooperatively, aborting only on timeout.
///
/// The caller must cancel `shutdown_token` before calling this so that
/// `DashSpvClient::run()` exits its loop and cleans up monitor tasks.
/// Only falls back to `abort()` if the task doesn't exit within the timeout.
/// `DashSpvClient::stop()` must have been called first (it cancels the
/// client's internal token, which makes `run()` exit its loop and clean
/// up monitor tasks). This only falls back to `abort()` if the task
/// doesn't exit within the timeout.
fn wait_for_run_task(&self) {
let task = self.run_task.lock().unwrap().take();
if let Some(mut task) = task {
Expand All @@ -155,18 +153,6 @@ impl FFIDashSpvClient {
}
}

fn stop_client_internal(client: &mut FFIDashSpvClient) -> Result<(), dash_spv::SpvError> {
client.shutdown_token.cancel();

client.wait_for_run_task();

let result = client.runtime.block_on(async { client.inner.stop().await });

client.shutdown_token = CancellationToken::new();

result
}

/// Update the running client's configuration.
///
/// # Safety
Expand Down Expand Up @@ -203,8 +189,12 @@ pub unsafe extern "C" fn dash_spv_ffi_client_update_config(
pub unsafe extern "C" fn dash_spv_ffi_client_stop(client: *mut FFIDashSpvClient) -> i32 {
null_check!(client);

let client = &mut (*client);
match stop_client_internal(client) {
let client = &(*client);

let result = client.runtime.block_on(async { client.inner.stop().await });
client.wait_for_run_task();

match result {
Ok(()) => FFIErrorCode::Success as i32,
Err(e) => {
set_last_error(&e.to_string());
Expand All @@ -231,13 +221,12 @@ pub unsafe extern "C" fn dash_spv_ffi_client_run(client: *mut FFIDashSpvClient)

tracing::info!("dash_spv_ffi_client_run: starting sync");

let shutdown_token = client.shutdown_token.clone();
let spv_client = client.inner.clone();

let task = client.runtime.spawn(async move {
tracing::debug!("Sync task: starting run");

if let Err(e) = spv_client.run(shutdown_token).await {
if let Err(e) = spv_client.run().await {
tracing::error!("Sync task: error: {}", e);
}

Expand Down Expand Up @@ -366,18 +355,13 @@ pub unsafe extern "C" fn dash_spv_ffi_client_destroy(client: *mut FFIDashSpvClie
if !client.is_null() {
let client = Box::from_raw(client);

// Cancel shutdown token so run() exits its loop and cleans up
client.shutdown_token.cancel();

// Wait for the run task to finish (cooperative, with timeout fallback)
client.wait_for_run_task();

// Stop the SPV client (run() calls stop() internally, but this
// handles the case where run() was never called or was aborted)
client.runtime.block_on(async {
let _ = client.inner.stop().await;
});

// Wait for the run task to finish (cooperative, with timeout fallback)
client.wait_for_run_task();

tracing::info!("FFI client destroyed and all tasks cleaned up");
}
}
Expand Down
5 changes: 1 addition & 4 deletions dash-spv/examples/filter_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use key_wallet_manager::WalletManager;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -42,9 +41,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Starting synchronization with filter support...");
println!("Watching address: {:?}", watch_address);

let shutdown_token = CancellationToken::new();

client.run(shutdown_token).await?;
client.run().await?;

println!("Done!");
Ok(())
Expand Down
5 changes: 1 addition & 4 deletions dash-spv/examples/simple_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
use key_wallet_manager::WalletManager;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -36,9 +35,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("Starting header synchronization...");

let shutdown_token = CancellationToken::new();

client.run(shutdown_token).await?;
client.run().await?;

println!("Done!");
Ok(())
Expand Down
5 changes: 1 addition & 4 deletions dash-spv/examples/spv_with_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
use key_wallet_manager::WalletManager;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -40,9 +39,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// - Reorgs via handle_reorg()
// - Compact filter checks via check_compact_filter()

let shutdown_token = CancellationToken::new();

client.run(shutdown_token).await?;
client.run().await?;

println!("Done!");
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions dash-spv/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use dashcore::sml::masternode_list_engine::MasternodeListEngine;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;

use super::ClientConfig;
use crate::error::{Result, SpvError};
Expand Down Expand Up @@ -113,6 +114,7 @@ pub struct DashSpvClient<W: WalletInterface, N: NetworkManager, S: StorageManage
pub(super) sync_coordinator: Arc<Mutex<PersistentSyncCoordinator<W>>>,
pub(super) running: Arc<RwLock<bool>>,
pub(super) event_handlers: Arc<Vec<Arc<dyn super::EventHandler>>>,
pub(super) cancel_token: CancellationToken,
}

impl<W: WalletInterface, N: NetworkManager, S: StorageManager> Clone for DashSpvClient<W, N, S> {
Expand All @@ -126,6 +128,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> Clone for DashSpv
sync_coordinator: Arc::clone(&self.sync_coordinator),
running: Arc::clone(&self.running),
event_handlers: Arc::clone(&self.event_handlers),
cancel_token: self.cancel_token.clone(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions dash-spv/src/client/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use dashcore_hashes::Hash;
use key_wallet_manager::WalletInterface;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;

impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W, N, S> {
/// Create a new SPV client with the given configuration, network, storage, and wallet.
Expand Down Expand Up @@ -145,6 +146,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
sync_coordinator: Arc::new(Mutex::new(sync_coordinator)),
running: Arc::new(RwLock::new(false)),
event_handlers: Arc::new(event_handlers),
cancel_token: CancellationToken::new(),
};

// Load wallet data from storage
Expand Down
6 changes: 3 additions & 3 deletions dash-spv/src/client/sync_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
self.sync_coordinator.lock().await.progress().clone()
}

/// Start the client and run the sync loop until the token is cancelled.
/// Start the client and run the sync loop until `stop()` is called.
///
/// Subscribes to all event channels internally and dispatches events to the
/// event handler provided at construction. Calls `start()` internally, runs
/// continuous network monitoring, and calls `stop()` before returning.
pub async fn run(&self, token: CancellationToken) -> Result<()> {
pub async fn run(&self) -> Result<()> {
let handlers = self.event_handlers.clone();
let monitor_shutdown = CancellationToken::new();
let (monitor_failure_tx, mut monitor_failure_rx) = mpsc::channel::<String>(1);
Expand Down Expand Up @@ -115,7 +115,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
_ = sync_coordinator_tick_interval.tick() => {
self.sync_coordinator.lock().await.tick().await.err().map(Into::into)
}
_ = token.cancelled() => {
_ = self.cancel_token.cancelled() => {
tracing::debug!("DashSpvClient run loop cancelled");
break None
}
Comment on lines +118 to 121
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Wire stop() to the internal token to prevent shutdown races.

At Line 118, run() waits on self.cancel_token.cancelled(), but in this layer stop() never cancels that token and only flips running at the end of teardown. This allows extra tick() work while shutdown is already in progress.

Suggested fix
diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs
@@
     pub async fn stop(&self) -> Result<()> {
         // Check if already stopped
         {
             let running = self.running.read().await;
             if !*running {
                 return Ok(());
             }
         }
+
+        // Wake run loop immediately.
+        self.cancel_token.cancel();
+
+        // Prevent further coordinator ticks during shutdown.
+        {
+            let mut running = self.running.write().await;
+            *running = false;
+        }
@@
-        // Mark as stopped
-        let mut running = self.running.write().await;
-        *running = false;
-
         Ok(())
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@dash-spv/src/client/sync_coordinator.rs` around lines 118 - 121, The run loop
is waiting on self.cancel_token.cancelled() but stop() only flips running later,
allowing extra tick() work during shutdown; update stop() to call
self.cancel_token.cancel() immediately (before waiting on running or doing
teardown) so the run() branch listening on cancel_token will wake and exit
promptly, and keep existing running flag/teardown logic intact; locate methods
named run(), stop(), and the field cancel_token and ensure stop() invokes the
token's cancel() (or equivalent) prior to the rest of the shutdown sequence.

Expand Down
4 changes: 1 addition & 3 deletions dash-spv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
//! use key_wallet_manager::WalletManager;
//! use std::sync::Arc;
//! use tokio::sync::RwLock;
//! use tokio_util::sync::CancellationToken;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -41,9 +40,8 @@
//! wallet,
//! vec![Arc::new(())],
//! ).await?;
//! let shutdown_token = CancellationToken::new();
//!
//! client.run(shutdown_token).await?;
//! client.run().await?;
//!
//! Ok(())
//! }
Expand Down
18 changes: 6 additions & 12 deletions dash-spv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use clap::{Parser, ValueEnum};
use dash_spv::{ClientConfig, DashSpvClient, LevelFilter, MempoolStrategy, Network};
use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
use key_wallet_manager::WalletManager;
use tokio_util::sync::CancellationToken;

/// Network selection for CLI
#[derive(Clone, Copy, Debug, ValueEnum)]
Expand Down Expand Up @@ -315,22 +314,17 @@ async fn run_client<S: dash_spv::storage::StorageManager>(
}
};

let shutdown_token = CancellationToken::new();
let ctrl_c_token = shutdown_token.clone();
let stop_client = client.clone();
tokio::spawn(async move {
tokio::select! {
result = tokio::signal::ctrl_c() => {
result.ok();
tracing::debug!("Shutdown signal received");
}
_ = ctrl_c_token.cancelled() => {
tracing::debug!("Shutdown token cancelled");
if tokio::signal::ctrl_c().await.is_ok() {
tracing::debug!("Shutdown signal received");
if let Err(e) = stop_client.stop().await {
tracing::warn!("Error during ctrl-c stop: {}", e);
}
}
ctrl_c_token.cancel();
});

client.run(shutdown_token).await?;
client.run().await?;

Ok(())
}
11 changes: 3 additions & 8 deletions dash-spv/tests/dashd_masternode/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use tempfile::TempDir;
use tokio::sync::{broadcast, watch, RwLock};
use tokio::task::JoinHandle;
use tokio::time;
use tokio_util::sync::CancellationToken;

/// Timeout for masternode sync tests (masternode sync takes longer than wallet sync).
pub(super) const SYNC_TIMEOUT: u64 = 60;
Expand All @@ -38,14 +37,13 @@ pub(super) struct ClientHandle {
pub(super) sync_event_receiver: broadcast::Receiver<SyncEvent>,
pub(super) wallet_event_receiver: broadcast::Receiver<WalletEvent>,
pub(super) _network_event_receiver: broadcast::Receiver<NetworkEvent>,
pub(super) cancel_token: CancellationToken,
pub(super) engine: Arc<RwLock<MasternodeListEngine>>,
}

impl ClientHandle {
pub(super) async fn stop(&mut self) {
tracing::info!("Cancelling client run loop...");
self.cancel_token.cancel();
tracing::info!("Stopping client run loop...");
self.client.stop().await.expect("client stop failed");
if let Some(handle) = self.run_handle.take() {
handle.await.expect("Run task panicked").expect("Run task returned error");
}
Expand Down Expand Up @@ -185,11 +183,9 @@ pub(super) async fn create_and_start_client(

let engine =
client.masternode_list_engine().expect("Engine should be initialized after creation");
let cancel_token = CancellationToken::new();
let run_token = cancel_token.clone();
let run_client = client.clone();

let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await });
let run_handle = tokio::task::spawn(async move { run_client.run().await });

ClientHandle {
client,
Expand All @@ -198,7 +194,6 @@ pub(super) async fn create_and_start_client(
sync_event_receiver,
wallet_event_receiver,
_network_event_receiver,
cancel_token,
engine,
}
}
17 changes: 5 additions & 12 deletions dash-spv/tests/dashd_sync/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::sync::{broadcast, watch, RwLock};
use tokio_util::sync::CancellationToken;

/// SPV-specific test context wrapping the shared dashd infrastructure.
///
Expand Down Expand Up @@ -231,7 +230,7 @@ pub(super) type TestClient =
DashSpvClient<WalletManager<ManagedWalletInfo>, PeerNetworkManager, DiskStorageManager>;

/// A `ClientHandle` is a utility structure that manages the state and handles for a `TestClient`
/// required to interact with the synchronization process, various event channels, and cancellation capabilities.
/// required to interact with the synchronization process and event channels.
pub(super) struct ClientHandle {
/// The underlying SPV client instance.
pub(super) client: TestClient,
Expand All @@ -245,16 +244,13 @@ pub(super) struct ClientHandle {
pub(super) network_event_receiver: broadcast::Receiver<NetworkEvent>,
/// A channel for receiving wallet events.
pub(super) wallet_event_receiver: broadcast::Receiver<WalletEvent>,
/// A cancellation token for the client's run loop.
pub(super) cancel_token: CancellationToken,
}

impl ClientHandle {
/// Stops the execution of the client run loop by canceling its associated token and awaiting the
/// termination of the background task.
/// Stops the SPV client and awaits the background run task.
pub(super) async fn stop(&mut self) {
tracing::info!("Cancelling client run loop...");
self.cancel_token.cancel();
tracing::info!("Stopping client run loop...");
self.client.stop().await.expect("client stop failed");
if let Some(handle) = self.run_handle.take() {
handle.await.expect("Run task panicked").expect("Run task returned error");
}
Expand Down Expand Up @@ -304,11 +300,9 @@ pub(super) async fn create_and_start_client(
let w = client.wallet().read().await;
w.subscribe_events()
};
let cancel_token = CancellationToken::new();
let run_token = cancel_token.clone();
let run_client = client.clone();

let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await });
let run_handle = tokio::task::spawn(async move { run_client.run().await });

ClientHandle {
client,
Expand All @@ -317,7 +311,6 @@ pub(super) async fn create_and_start_client(
sync_event_receiver,
network_event_receiver,
wallet_event_receiver,
cancel_token,
}
}

Expand Down
Loading
Loading