diff --git a/dash-spv-ffi/src/client.rs b/dash-spv-ffi/src/client.rs index faf0235f4..4513a86f8 100644 --- a/dash-spv-ffi/src/client.rs +++ b/dash-spv-ffi/src/client.rs @@ -13,7 +13,6 @@ use std::mem::forget; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; /// FFI wrapper around `DashSpvClient`. type InnerClient = DashSpvClient< @@ -26,7 +25,6 @@ pub struct FFIDashSpvClient { pub(crate) inner: InnerClient, pub(crate) runtime: Arc, run_task: Mutex>>, - shutdown_token: CancellationToken, } impl FFIDashSpvClient { @@ -113,7 +111,6 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new( inner: client, runtime, run_task: Mutex::new(None), - shutdown_token: CancellationToken::new(), }; Box::into_raw(Box::new(ffi_client)) } @@ -130,9 +127,10 @@ const RUN_TASK_SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from impl FFIDashSpvClient { /// Wait for the run task to finish cooperatively, aborting only on timeout. /// - /// The caller must cancel `shutdown_token` before calling this so that - /// `DashSpvClient::run()` exits its loop and cleans up monitor tasks. - /// Only falls back to `abort()` if the task doesn't exit within the timeout. + /// `DashSpvClient::stop()` must have been called first (it cancels the + /// client's internal token, which makes `run()` exit its loop and clean + /// up monitor tasks). This only falls back to `abort()` if the task + /// doesn't exit within the timeout. fn wait_for_run_task(&self) { let task = self.run_task.lock().unwrap().take(); if let Some(mut task) = task { @@ -155,18 +153,6 @@ impl FFIDashSpvClient { } } -fn stop_client_internal(client: &mut FFIDashSpvClient) -> Result<(), dash_spv::SpvError> { - client.shutdown_token.cancel(); - - client.wait_for_run_task(); - - let result = client.runtime.block_on(async { client.inner.stop().await }); - - client.shutdown_token = CancellationToken::new(); - - result -} - /// Update the running client's configuration. /// /// # Safety @@ -203,8 +189,12 @@ pub unsafe extern "C" fn dash_spv_ffi_client_update_config( pub unsafe extern "C" fn dash_spv_ffi_client_stop(client: *mut FFIDashSpvClient) -> i32 { null_check!(client); - let client = &mut (*client); - match stop_client_internal(client) { + let client = &(*client); + + let result = client.runtime.block_on(async { client.inner.stop().await }); + client.wait_for_run_task(); + + match result { Ok(()) => FFIErrorCode::Success as i32, Err(e) => { set_last_error(&e.to_string()); @@ -231,13 +221,12 @@ pub unsafe extern "C" fn dash_spv_ffi_client_run(client: *mut FFIDashSpvClient) tracing::info!("dash_spv_ffi_client_run: starting sync"); - let shutdown_token = client.shutdown_token.clone(); let spv_client = client.inner.clone(); let task = client.runtime.spawn(async move { tracing::debug!("Sync task: starting run"); - if let Err(e) = spv_client.run(shutdown_token).await { + if let Err(e) = spv_client.run().await { tracing::error!("Sync task: error: {}", e); } @@ -366,18 +355,13 @@ pub unsafe extern "C" fn dash_spv_ffi_client_destroy(client: *mut FFIDashSpvClie if !client.is_null() { let client = Box::from_raw(client); - // Cancel shutdown token so run() exits its loop and cleans up - client.shutdown_token.cancel(); - - // Wait for the run task to finish (cooperative, with timeout fallback) - client.wait_for_run_task(); - - // Stop the SPV client (run() calls stop() internally, but this - // handles the case where run() was never called or was aborted) client.runtime.block_on(async { let _ = client.inner.stop().await; }); + // Wait for the run task to finish (cooperative, with timeout fallback) + client.wait_for_run_task(); + tracing::info!("FFI client destroyed and all tasks cleaned up"); } } diff --git a/dash-spv/examples/filter_sync.rs b/dash-spv/examples/filter_sync.rs index e5940036e..203896ce6 100644 --- a/dash-spv/examples/filter_sync.rs +++ b/dash-spv/examples/filter_sync.rs @@ -9,7 +9,6 @@ use key_wallet_manager::WalletManager; use std::str::FromStr; use std::sync::Arc; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() -> Result<(), Box> { @@ -42,9 +41,7 @@ async fn main() -> Result<(), Box> { println!("Starting synchronization with filter support..."); println!("Watching address: {:?}", watch_address); - let shutdown_token = CancellationToken::new(); - - client.run(shutdown_token).await?; + client.run().await?; println!("Done!"); Ok(()) diff --git a/dash-spv/examples/simple_sync.rs b/dash-spv/examples/simple_sync.rs index 63679108d..0568768fe 100644 --- a/dash-spv/examples/simple_sync.rs +++ b/dash-spv/examples/simple_sync.rs @@ -8,7 +8,6 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet_manager::WalletManager; use std::sync::Arc; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() -> Result<(), Box> { @@ -36,9 +35,7 @@ async fn main() -> Result<(), Box> { println!("Starting header synchronization..."); - let shutdown_token = CancellationToken::new(); - - client.run(shutdown_token).await?; + client.run().await?; println!("Done!"); Ok(()) diff --git a/dash-spv/examples/spv_with_wallet.rs b/dash-spv/examples/spv_with_wallet.rs index 1609bd8d7..2d2c661d8 100644 --- a/dash-spv/examples/spv_with_wallet.rs +++ b/dash-spv/examples/spv_with_wallet.rs @@ -9,7 +9,6 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet_manager::WalletManager; use std::sync::Arc; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() -> Result<(), Box> { @@ -40,9 +39,7 @@ async fn main() -> Result<(), Box> { // - Reorgs via handle_reorg() // - Compact filter checks via check_compact_filter() - let shutdown_token = CancellationToken::new(); - - client.run(shutdown_token).await?; + client.run().await?; println!("Done!"); Ok(()) diff --git a/dash-spv/src/client/core.rs b/dash-spv/src/client/core.rs index ff346b8d0..387edf9b0 100644 --- a/dash-spv/src/client/core.rs +++ b/dash-spv/src/client/core.rs @@ -11,6 +11,7 @@ use dashcore::sml::masternode_list_engine::MasternodeListEngine; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; +use tokio_util::sync::CancellationToken; use super::ClientConfig; use crate::error::{Result, SpvError}; @@ -113,6 +114,7 @@ pub struct DashSpvClient>>, pub(super) running: Arc>, pub(super) event_handlers: Arc>>, + pub(super) cancel_token: CancellationToken, } impl Clone for DashSpvClient { @@ -126,6 +128,7 @@ impl Clone for DashSpv sync_coordinator: Arc::clone(&self.sync_coordinator), running: Arc::clone(&self.running), event_handlers: Arc::clone(&self.event_handlers), + cancel_token: self.cancel_token.clone(), } } } diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index 1e4125dc6..fef9c2818 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -26,6 +26,7 @@ use dashcore_hashes::Hash; use key_wallet_manager::WalletInterface; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; +use tokio_util::sync::CancellationToken; impl DashSpvClient { /// Create a new SPV client with the given configuration, network, storage, and wallet. @@ -145,6 +146,7 @@ impl DashSpvClient DashSpvClient Result<()> { + pub async fn run(&self) -> Result<()> { let handlers = self.event_handlers.clone(); let monitor_shutdown = CancellationToken::new(); let (monitor_failure_tx, mut monitor_failure_rx) = mpsc::channel::(1); @@ -115,7 +115,7 @@ impl DashSpvClient { self.sync_coordinator.lock().await.tick().await.err().map(Into::into) } - _ = token.cancelled() => { + _ = self.cancel_token.cancelled() => { tracing::debug!("DashSpvClient run loop cancelled"); break None } diff --git a/dash-spv/src/lib.rs b/dash-spv/src/lib.rs index cabedf76a..3e26c0c97 100644 --- a/dash-spv/src/lib.rs +++ b/dash-spv/src/lib.rs @@ -20,7 +20,6 @@ //! use key_wallet_manager::WalletManager; //! use std::sync::Arc; //! use tokio::sync::RwLock; -//! use tokio_util::sync::CancellationToken; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { @@ -41,9 +40,8 @@ //! wallet, //! vec![Arc::new(())], //! ).await?; -//! let shutdown_token = CancellationToken::new(); //! -//! client.run(shutdown_token).await?; +//! client.run().await?; //! //! Ok(()) //! } diff --git a/dash-spv/src/main.rs b/dash-spv/src/main.rs index eb95bd580..677ff274f 100644 --- a/dash-spv/src/main.rs +++ b/dash-spv/src/main.rs @@ -8,7 +8,6 @@ use clap::{Parser, ValueEnum}; use dash_spv::{ClientConfig, DashSpvClient, LevelFilter, MempoolStrategy, Network}; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet_manager::WalletManager; -use tokio_util::sync::CancellationToken; /// Network selection for CLI #[derive(Clone, Copy, Debug, ValueEnum)] @@ -315,22 +314,17 @@ async fn run_client( } }; - let shutdown_token = CancellationToken::new(); - let ctrl_c_token = shutdown_token.clone(); + let stop_client = client.clone(); tokio::spawn(async move { - tokio::select! { - result = tokio::signal::ctrl_c() => { - result.ok(); - tracing::debug!("Shutdown signal received"); - } - _ = ctrl_c_token.cancelled() => { - tracing::debug!("Shutdown token cancelled"); + if tokio::signal::ctrl_c().await.is_ok() { + tracing::debug!("Shutdown signal received"); + if let Err(e) = stop_client.stop().await { + tracing::warn!("Error during ctrl-c stop: {}", e); } } - ctrl_c_token.cancel(); }); - client.run(shutdown_token).await?; + client.run().await?; Ok(()) } diff --git a/dash-spv/tests/dashd_masternode/setup.rs b/dash-spv/tests/dashd_masternode/setup.rs index 0ac2ca457..773dd5c07 100644 --- a/dash-spv/tests/dashd_masternode/setup.rs +++ b/dash-spv/tests/dashd_masternode/setup.rs @@ -23,7 +23,6 @@ use tempfile::TempDir; use tokio::sync::{broadcast, watch, RwLock}; use tokio::task::JoinHandle; use tokio::time; -use tokio_util::sync::CancellationToken; /// Timeout for masternode sync tests (masternode sync takes longer than wallet sync). pub(super) const SYNC_TIMEOUT: u64 = 60; @@ -38,14 +37,13 @@ pub(super) struct ClientHandle { pub(super) sync_event_receiver: broadcast::Receiver, pub(super) wallet_event_receiver: broadcast::Receiver, pub(super) _network_event_receiver: broadcast::Receiver, - pub(super) cancel_token: CancellationToken, pub(super) engine: Arc>, } impl ClientHandle { pub(super) async fn stop(&mut self) { - tracing::info!("Cancelling client run loop..."); - self.cancel_token.cancel(); + tracing::info!("Stopping client run loop..."); + self.client.stop().await.expect("client stop failed"); if let Some(handle) = self.run_handle.take() { handle.await.expect("Run task panicked").expect("Run task returned error"); } @@ -185,11 +183,9 @@ pub(super) async fn create_and_start_client( let engine = client.masternode_list_engine().expect("Engine should be initialized after creation"); - let cancel_token = CancellationToken::new(); - let run_token = cancel_token.clone(); let run_client = client.clone(); - let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await }); + let run_handle = tokio::task::spawn(async move { run_client.run().await }); ClientHandle { client, @@ -198,7 +194,6 @@ pub(super) async fn create_and_start_client( sync_event_receiver, wallet_event_receiver, _network_event_receiver, - cancel_token, engine, } } diff --git a/dash-spv/tests/dashd_sync/setup.rs b/dash-spv/tests/dashd_sync/setup.rs index 16c229bb7..74af8302c 100644 --- a/dash-spv/tests/dashd_sync/setup.rs +++ b/dash-spv/tests/dashd_sync/setup.rs @@ -24,7 +24,6 @@ use std::path::PathBuf; use std::sync::Arc; use tempfile::TempDir; use tokio::sync::{broadcast, watch, RwLock}; -use tokio_util::sync::CancellationToken; /// SPV-specific test context wrapping the shared dashd infrastructure. /// @@ -231,7 +230,7 @@ pub(super) type TestClient = DashSpvClient, PeerNetworkManager, DiskStorageManager>; /// A `ClientHandle` is a utility structure that manages the state and handles for a `TestClient` -/// required to interact with the synchronization process, various event channels, and cancellation capabilities. +/// required to interact with the synchronization process and event channels. pub(super) struct ClientHandle { /// The underlying SPV client instance. pub(super) client: TestClient, @@ -245,16 +244,13 @@ pub(super) struct ClientHandle { pub(super) network_event_receiver: broadcast::Receiver, /// A channel for receiving wallet events. pub(super) wallet_event_receiver: broadcast::Receiver, - /// A cancellation token for the client's run loop. - pub(super) cancel_token: CancellationToken, } impl ClientHandle { - /// Stops the execution of the client run loop by canceling its associated token and awaiting the - /// termination of the background task. + /// Stops the SPV client and awaits the background run task. pub(super) async fn stop(&mut self) { - tracing::info!("Cancelling client run loop..."); - self.cancel_token.cancel(); + tracing::info!("Stopping client run loop..."); + self.client.stop().await.expect("client stop failed"); if let Some(handle) = self.run_handle.take() { handle.await.expect("Run task panicked").expect("Run task returned error"); } @@ -304,11 +300,9 @@ pub(super) async fn create_and_start_client( let w = client.wallet().read().await; w.subscribe_events() }; - let cancel_token = CancellationToken::new(); - let run_token = cancel_token.clone(); let run_client = client.clone(); - let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await }); + let run_handle = tokio::task::spawn(async move { run_client.run().await }); ClientHandle { client, @@ -317,7 +311,6 @@ pub(super) async fn create_and_start_client( sync_event_receiver, network_event_receiver, wallet_event_receiver, - cancel_token, } } diff --git a/dash-spv/tests/peer_test.rs b/dash-spv/tests/peer_test.rs index 47602c846..7b42bcdb7 100644 --- a/dash-spv/tests/peer_test.rs +++ b/dash-spv/tests/peer_test.rs @@ -6,7 +6,6 @@ use std::time::Duration; use tempfile::TempDir; use tokio::sync::RwLock; use tokio::time; -use tokio_util::sync::CancellationToken; use dash_spv::client::{ClientConfig, DashSpvClient}; use dash_spv::network::PeerNetworkManager; @@ -53,10 +52,8 @@ async fn test_peer_connection() { let client = DashSpvClient::new(config, network_manager, storage_manager, wallet, vec![]).await.unwrap(); - let token = CancellationToken::new(); - let cancel = token.clone(); let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run().await }); // Give it time to connect to peers time::sleep(Duration::from_secs(5)).await; @@ -65,7 +62,7 @@ async fn test_peer_connection() { let peer_count = client.peer_count().await; assert!(peer_count > 0, "Should have connected to at least one peer"); - cancel.cancel(); + let _ = client.stop().await; let _ = handle.await; } @@ -92,17 +89,15 @@ async fn test_peer_persistence() { .await .unwrap(); - let token = CancellationToken::new(); - let cancel = token.clone(); let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run().await }); time::sleep(Duration::from_secs(5)).await; let peer_count = client.peer_count().await; assert!(peer_count > 0, "Should have connected to peers"); - cancel.cancel(); + let _ = client.stop().await; let _ = handle.await; } @@ -122,11 +117,9 @@ async fn test_peer_persistence() { .unwrap(); // Should connect faster due to saved peers - let token = CancellationToken::new(); - let cancel = token.clone(); let run_client = client.clone(); let start = tokio::time::Instant::now(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run().await }); // Wait for connection but with shorter timeout time::sleep(Duration::from_secs(3)).await; @@ -137,7 +130,7 @@ async fn test_peer_persistence() { let elapsed = start.elapsed(); println!("Connected to {} peers in {:?} (using saved peers)", peer_count, elapsed); - cancel.cancel(); + let _ = client.stop().await; let _ = handle.await; } } diff --git a/dash-spv/tests/wallet_integration_test.rs b/dash-spv/tests/wallet_integration_test.rs index 7ba508b7e..bdefbbe3d 100644 --- a/dash-spv/tests/wallet_integration_test.rs +++ b/dash-spv/tests/wallet_integration_test.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; use dash_spv::network::PeerNetworkManager; use dash_spv::storage::DiskStorageManager; @@ -49,11 +48,8 @@ async fn test_spv_client_creation() { async fn test_spv_client_run_stop() { let client = create_test_client().await; - let token = CancellationToken::new(); - let cancel = token.clone(); - let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run().await }); tokio::time::timeout(Duration::from_secs(5), async { while !client.is_running().await { @@ -63,7 +59,7 @@ async fn test_spv_client_run_stop() { .await .expect("client failed to start"); - cancel.cancel(); + client.stop().await.unwrap(); handle.await.unwrap().unwrap(); assert!(!client.is_running().await);