diff --git a/crates/cli/src/commands/admin/decommission.rs b/crates/cli/src/commands/admin/decommission.rs index 96097fc..9dfeed3 100644 --- a/crates/cli/src/commands/admin/decommission.rs +++ b/crates/cli/src/commands/admin/decommission.rs @@ -6,7 +6,7 @@ use serde::Serialize; use super::{get_admin_client, pool}; use crate::exit_code::ExitCode; use crate::output::Formatter; -use rc_core::admin::{AdminApi, PoolStatus, PoolTarget}; +use rc_core::admin::{AdminApi, DecommissionPoolStatus, PoolDecommissionInfo, PoolTarget}; /// Decommission subcommands #[derive(Subcommand, Debug)] @@ -83,11 +83,6 @@ struct DecommissionOperationOutput { pool: String, } -#[derive(Serialize)] -struct DecommissionStatusListOutput { - pools: Vec, -} - /// Execute a decommission subcommand pub async fn execute(cmd: DecommissionCommands, formatter: &Formatter) -> ExitCode { match cmd { @@ -138,43 +133,104 @@ async fn execute_status(args: StatusArgs, formatter: &Formatter) -> ExitCode { Err(code) => return code, }; - if let Some(pool_name) = args.pool { - let target = PoolTarget { - pool: pool_name, - by_id: args.by_id, - }; - match client.pool_status(target).await { - Ok(status) => { - if formatter.is_json() { - formatter.json(&status); - } else { - pool::print_pool_status(&status, formatter); - } - ExitCode::Success - } - Err(e) => { - formatter.error(&format!("Failed to get decommission status: {e}")); - ExitCode::GeneralError + let target = args.pool.map(|pool| PoolTarget { + pool, + by_id: args.by_id, + }); + + match client.decommission_status(target).await { + Ok(status) => { + if formatter.is_json() { + formatter.json(&status); + } else { + print_decommission_status(&status.pools, formatter); } + ExitCode::Success } - } else { - match client.list_pools().await { - Ok(pools) => { - if formatter.is_json() { - formatter.json(&DecommissionStatusListOutput { pools }); - } else { - pool::print_pool_list(&pools, formatter); - } - ExitCode::Success + Err(e) => { + formatter.error(&format!("Failed to get decommission status: {e}")); + ExitCode::GeneralError + } + } +} + +fn print_decommission_status(pools: &[DecommissionPoolStatus], formatter: &Formatter) { + formatter.println(&formatter.style_name("Decommission:")); + if pools.is_empty() { + formatter.println(" No decommission status found."); + return; + } + + for item in pools { + let decommission = item.decommission.as_ref(); + let operation_status = status_or_default(&item.status, decommission_state(decommission)); + let pool_status = status_or_default(&item.pool_status, "unknown"); + + formatter.println(&format!( + " Pool {}: {}", + item.id, + formatter.style_url(&item.cmd_line) + )); + formatter.println(&format!( + " Status: {}", + pool::style_state(operation_status, formatter) + )); + formatter.println(&format!( + " Pool status: {}", + pool::style_state(pool_status, formatter) + )); + + if let Some(info) = decommission { + let used = info.total_size.saturating_sub(info.current_size); + if info.total_size > 0 { + formatter.println(&format!( + " Usage: {} / {}", + formatter.style_size(&pool::format_bytes(used)), + pool::format_bytes(info.total_size) + )); } - Err(e) => { - formatter.error(&format!("Failed to get decommission status: {e}")); - ExitCode::GeneralError + + if info.objects_decommissioned > 0 + || info.objects_decommissioned_failed > 0 + || info.bytes_decommissioned > 0 + || info.bytes_decommissioned_failed > 0 + { + formatter.println(&format!( + " Progress: {}, {} failed; {}, {} failed bytes", + info.objects_decommissioned, + info.objects_decommissioned_failed, + pool::format_bytes(info.bytes_decommissioned), + pool::format_bytes(info.bytes_decommissioned_failed) + )); } } } } +fn status_or_default<'a>(status: &'a str, default: &'a str) -> &'a str { + if status.is_empty() { default } else { status } +} + +fn decommission_state(decommission: Option<&PoolDecommissionInfo>) -> &'static str { + let Some(info) = decommission else { + return "none"; + }; + + if info.complete { + "complete" + } else if info.failed { + "failed" + } else if info.canceled { + "canceled" + } else if info.queued { + "queued" + } else if info.start_time.is_some() { + "running" + } else { + "none" + } +} + async fn execute_cancel(args: CancelArgs, formatter: &Formatter) -> ExitCode { let client = match get_admin_client(&args.alias, formatter) { Ok(c) => c, diff --git a/crates/cli/src/commands/admin/pool.rs b/crates/cli/src/commands/admin/pool.rs index 1195ca4..53c6674 100644 --- a/crates/cli/src/commands/admin/pool.rs +++ b/crates/cli/src/commands/admin/pool.rs @@ -129,20 +129,34 @@ pub(super) fn print_pool_list(pools: &[PoolStatus], formatter: &Formatter) { pub(super) fn print_pool_status(pool: &PoolStatus, formatter: &Formatter) { let decommission = pool.decommission.as_ref(); - let state = decommission_state(decommission); + let lifecycle_state = pool_lifecycle_state(pool); + let decommission_state = pool_decommission_state(pool); + let rebalance_state = status_or_default(&pool.rebalance_status, "none"); let used = decommission .map(|info| info.total_size.saturating_sub(info.current_size)) + .or_else(|| (pool.total_size > 0).then_some(pool.used_size)) + .unwrap_or_default(); + let total = decommission + .map(|info| info.total_size) + .or_else(|| (pool.total_size > 0).then_some(pool.total_size)) .unwrap_or_default(); - let total = decommission.map(|info| info.total_size).unwrap_or_default(); formatter.println(&format!( " Pool {}: {}", pool.id, formatter.style_url(&pool.cmd_line) )); + formatter.println(&format!( + " Status: {}", + style_state(lifecycle_state, formatter) + )); formatter.println(&format!( " Decommission: {}", - style_state(state, formatter) + style_state(decommission_state, formatter) + )); + formatter.println(&format!( + " Rebalance: {}", + style_state(rebalance_state, formatter) )); if total > 0 { @@ -169,9 +183,29 @@ pub(super) fn print_pool_status(pool: &PoolStatus, formatter: &Formatter) { } } +fn pool_lifecycle_state(pool: &PoolStatus) -> &str { + if !pool.status.is_empty() { + return pool.status.as_str(); + } + + match decommission_state(pool.decommission.as_ref()) { + "complete" => "decommissioned", + "failed" | "canceled" => "blocked", + _ => "active", + } +} + +fn pool_decommission_state(pool: &PoolStatus) -> &str { + if !pool.decommission_status.is_empty() { + return pool.decommission_status.as_str(); + } + + decommission_state(pool.decommission.as_ref()) +} + fn decommission_state(decommission: Option<&PoolDecommissionInfo>) -> &'static str { let Some(info) = decommission else { - return "not started"; + return "none"; }; if info.complete { @@ -180,24 +214,32 @@ fn decommission_state(decommission: Option<&PoolDecommissionInfo>) -> &'static s "failed" } else if info.canceled { "canceled" + } else if info.queued { + "queued" } else if info.start_time.is_some() { "running" } else { - "not started" + "none" } } -fn style_state(state: &str, formatter: &Formatter) -> String { +pub(super) fn style_state(state: &str, formatter: &Formatter) -> String { match state { - "complete" => formatter.style_size(state), - "failed" => formatter.theme().error.apply_to(state).to_string(), - "canceled" => formatter.theme().warning.apply_to(state).to_string(), - "running" => formatter.style_name(state), + "active" | "complete" | "completed" | "decommissioned" => formatter.style_size(state), + "blocked" | "failed" => formatter.theme().error.apply_to(state).to_string(), + "canceled" | "queued" | "stopping" | "stopped" => { + formatter.theme().warning.apply_to(state).to_string() + } + "decommissioning" | "running" | "started" => formatter.style_name(state), _ => formatter.style_date(state), } } -fn format_bytes(bytes: u64) -> String { +fn status_or_default<'a>(status: &'a str, default: &'static str) -> &'a str { + if status.is_empty() { default } else { status } +} + +pub(super) fn format_bytes(bytes: u64) -> String { const KB: u64 = 1024; const MB: u64 = KB * 1024; const GB: u64 = MB * 1024; @@ -222,10 +264,17 @@ mod tests { #[test] fn test_decommission_state() { - assert_eq!(decommission_state(None), "not started"); + assert_eq!(decommission_state(None), "none"); assert_eq!( decommission_state(Some(&PoolDecommissionInfo::default())), - "not started" + "none" + ); + assert_eq!( + decommission_state(Some(&PoolDecommissionInfo { + queued: true, + ..Default::default() + })), + "queued" ); assert_eq!( decommission_state(Some(&PoolDecommissionInfo { diff --git a/crates/cli/tests/admin_decommission.rs b/crates/cli/tests/admin_decommission.rs index 504263f..e6530a5 100644 --- a/crates/cli/tests/admin_decommission.rs +++ b/crates/cli/tests/admin_decommission.rs @@ -52,10 +52,10 @@ fn decommission_start_dispatches_by_id_pool_json() { } #[test] -fn decommission_status_without_pool_dispatches_to_pool_list_json() { +fn decommission_status_without_pool_dispatches_to_decommission_status_json() { let config_dir = tempfile::tempdir().expect("create config dir"); let (endpoint, receiver, handle) = start_admin_test_server( - r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}]"#, + r#"{"pools":[{"id":0,"cmdline":"/data/pool0/disk{1...4}","status":"none","poolStatus":"active","decommissionInfo":null}]}"#, ); let output = Command::new(rc_binary()) @@ -77,12 +77,62 @@ fn decommission_status_without_pool_dispatches_to_pool_list_json() { assert_eq!(pools.len(), 1); assert_eq!(pools[0]["id"], 0); assert_eq!(pools[0]["cmdline"], "/data/pool0/disk{1...4}"); + assert_eq!(pools[0]["status"], "none"); + assert_eq!(pools[0]["poolStatus"], "active"); let request = receiver .recv_timeout(Duration::from_secs(5)) .expect("captured admin request"); assert_eq!(request.method, "GET"); - assert_eq!(request.target, "/rustfs/admin/v3/pools/list"); + assert_eq!(request.target, "/rustfs/admin/v3/decommission/status"); + + handle.join().expect("admin test server finished"); +} + +#[test] +fn decommission_status_with_pool_dispatches_to_decommission_status_json() { + let config_dir = tempfile::tempdir().expect("create config dir"); + let (endpoint, receiver, handle) = start_admin_test_server( + r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","status":"failed","poolStatus":"blocked","decommissionInfo":null}"#, + ); + + let output = Command::new(rc_binary()) + .args([ + "--json", + "admin", + "decommission", + "status", + "myalias", + "1", + "--by-id", + ]) + .env("RC_CONFIG_DIR", config_dir.path()) + .env("RC_HOST_myalias", rc_host_alias(&endpoint)) + .output() + .expect("run rc command"); + + assert!( + output.status.success(), + "stderr: {}", + String::from_utf8_lossy(&output.stderr) + ); + + let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8"); + let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output"); + let pools = payload["pools"].as_array().expect("pools array"); + assert_eq!(pools.len(), 1); + assert_eq!(pools[0]["id"], 1); + assert_eq!(pools[0]["status"], "failed"); + assert_eq!(pools[0]["poolStatus"], "blocked"); + + let request = receiver + .recv_timeout(Duration::from_secs(5)) + .expect("captured admin request"); + assert_eq!(request.method, "GET"); + assert_eq!( + request.target, + "/rustfs/admin/v3/decommission/status?pool=1&by-id=true" + ); handle.join().expect("admin test server finished"); } diff --git a/crates/cli/tests/admin_pool.rs b/crates/cli/tests/admin_pool.rs index 2bcd64c..0efbb40 100644 --- a/crates/cli/tests/admin_pool.rs +++ b/crates/cli/tests/admin_pool.rs @@ -11,7 +11,7 @@ use admin_support::{rc_binary, rc_host_alias, start_admin_test_server}; fn pool_list_dispatches_to_pool_list_json() { let config_dir = tempfile::tempdir().expect("create config dir"); let (endpoint, receiver, handle) = start_admin_test_server( - r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-10T00:00:00Z","decommissionInfo":null}]"#, + r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-10T00:00:00Z","status":"active","decommissionStatus":"none","rebalanceStatus":"completed","totalSize":100,"currentSize":80,"usedSize":20,"used":0.2,"decommissionInfo":null}]"#, ); let output = Command::new(rc_binary()) @@ -33,6 +33,10 @@ fn pool_list_dispatches_to_pool_list_json() { assert_eq!(pools.len(), 1); assert_eq!(pools[0]["id"], 0); assert_eq!(pools[0]["cmdline"], "/data/pool0/disk{1...4}"); + assert_eq!(pools[0]["status"], "active"); + assert_eq!(pools[0]["decommissionStatus"], "none"); + assert_eq!(pools[0]["rebalanceStatus"], "completed"); + assert_eq!(pools[0]["usedSize"], 20); let request = receiver .recv_timeout(Duration::from_secs(5)) @@ -47,7 +51,7 @@ fn pool_list_dispatches_to_pool_list_json() { fn pool_status_without_target_dispatches_to_pool_list_json() { let config_dir = tempfile::tempdir().expect("create config dir"); let (endpoint, receiver, handle) = start_admin_test_server( - r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-10T00:00:00Z","decommissionInfo":{"totalSize":100,"currentSize":90}}]"#, + r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-10T00:00:00Z","status":"decommissioning","decommissionStatus":"running","rebalanceStatus":"none","decommissionInfo":{"totalSize":100,"currentSize":90}}]"#, ); let output = Command::new(rc_binary()) @@ -69,6 +73,9 @@ fn pool_status_without_target_dispatches_to_pool_list_json() { assert_eq!(pools.len(), 1); assert_eq!(pools[0]["id"], 0); assert_eq!(pools[0]["cmdline"], "/data/pool0/disk{1...4}"); + assert_eq!(pools[0]["status"], "decommissioning"); + assert_eq!(pools[0]["decommissionStatus"], "running"); + assert_eq!(pools[0]["rebalanceStatus"], "none"); assert_eq!(pools[0]["decommissionInfo"]["totalSize"], 100); assert_eq!(pools[0]["decommissionInfo"]["currentSize"], 90); @@ -85,7 +92,7 @@ fn pool_status_without_target_dispatches_to_pool_list_json() { fn pool_status_dispatches_by_id_pool_json() { let config_dir = tempfile::tempdir().expect("create config dir"); let (endpoint, receiver, handle) = start_admin_test_server( - r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","lastUpdate":"2026-05-10T00:00:00Z","decommissionInfo":null}"#, + r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","lastUpdate":"2026-05-10T00:00:00Z","status":"active","decommissionStatus":"none","rebalanceStatus":"failed","decommissionInfo":null}"#, ); let output = Command::new(rc_binary()) @@ -107,6 +114,9 @@ fn pool_status_dispatches_by_id_pool_json() { let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output"); assert_eq!(payload["id"], 1); assert_eq!(payload["cmdline"], "/data/pool1/disk{1...4}"); + assert_eq!(payload["status"], "active"); + assert_eq!(payload["decommissionStatus"], "none"); + assert_eq!(payload["rebalanceStatus"], "failed"); let request = receiver .recv_timeout(Duration::from_secs(5)) diff --git a/crates/core/src/admin/cluster.rs b/crates/core/src/admin/cluster.rs index 463c702..34481fd 100644 --- a/crates/core/src/admin/cluster.rs +++ b/crates/core/src/admin/cluster.rs @@ -650,11 +650,71 @@ pub struct PoolStatus { #[serde(default, rename = "lastUpdate")] pub last_update: String, + /// Pool lifecycle status. + #[serde(default)] + pub status: String, + + /// Decommission operation status for this pool. + #[serde(default, rename = "decommissionStatus")] + pub decommission_status: String, + + /// Rebalance operation status for this pool. + #[serde(default, rename = "rebalanceStatus")] + pub rebalance_status: String, + + /// Total pool size in bytes. + #[serde(default, rename = "totalSize")] + pub total_size: u64, + + /// Current free size in bytes. + #[serde(default, rename = "currentSize")] + pub current_size: u64, + + /// Used pool size in bytes. + #[serde(default, rename = "usedSize")] + pub used_size: u64, + + /// Used capacity ratio in the range 0.0..=1.0. + #[serde(default)] + pub used: f64, + /// Decommission status and progress for this pool. #[serde(default, rename = "decommissionInfo")] pub decommission: Option, } +/// Decommission status response. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct DecommissionStatus { + /// Per-pool decommission status. + #[serde(default)] + pub pools: Vec, +} + +/// Decommission operation status for a single pool. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct DecommissionPoolStatus { + /// Zero-based pool ID. + #[serde(default)] + pub id: usize, + + /// Pool command line used by the server process. + #[serde(default, rename = "cmdline")] + pub cmd_line: String, + + /// Decommission operation status for this pool. + #[serde(default)] + pub status: String, + + /// Pool lifecycle status. + #[serde(default, rename = "poolStatus")] + pub pool_status: String, + + /// Decommission state and progress for this pool. + #[serde(default, rename = "decommissionInfo")] + pub decommission: Option, +} + /// Decommission state and progress for a server pool. #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct PoolDecommissionInfo { @@ -686,6 +746,34 @@ pub struct PoolDecommissionInfo { #[serde(default)] pub canceled: bool, + /// Whether decommission is queued. + #[serde(default)] + pub queued: bool, + + /// Buckets waiting to be decommissioned. + #[serde(default, rename = "queuedBuckets")] + pub queued_buckets: Vec, + + /// Buckets already decommissioned. + #[serde(default, rename = "decommissionedBuckets")] + pub decommissioned_buckets: Vec, + + /// Current bucket. + #[serde(default)] + pub bucket: String, + + /// Current prefix. + #[serde(default)] + pub prefix: String, + + /// Current object. + #[serde(default)] + pub object: String, + + /// Current decommission stage. + #[serde(default)] + pub stage: String, + /// Number of successfully decommissioned objects. #[serde(default, rename = "objectsDecommissioned")] pub objects_decommissioned: u64, @@ -701,6 +789,10 @@ pub struct PoolDecommissionInfo { /// Bytes that failed to move off the pool. #[serde(default, rename = "bytesDecommissionedFailed")] pub bytes_decommissioned_failed: u64, + + /// Reason why decommission is waiting. + #[serde(default, rename = "waitingReason")] + pub waiting_reason: Option, } /// Response from starting a rebalance operation. @@ -954,17 +1046,44 @@ mod tests { #[test] fn test_pool_status_deserialization() { - let json = r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":{"startTime":"2026-05-06T00:00:01Z","startSize":100,"totalSize":1000,"currentSize":600,"complete":false,"failed":false,"canceled":false,"objectsDecommissioned":2,"objectsDecommissionedFailed":1,"bytesDecommissioned":128,"bytesDecommissionedFailed":64}}"#; + let json = r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","status":"decommissioning","decommissionStatus":"running","rebalanceStatus":"none","totalSize":1000,"currentSize":600,"usedSize":400,"used":0.4,"decommissionInfo":{"startTime":"2026-05-06T00:00:01Z","startSize":100,"totalSize":1000,"currentSize":600,"complete":false,"failed":false,"canceled":false,"queued":true,"queuedBuckets":["bucket-a"],"decommissionedBuckets":["bucket-b"],"bucket":"bucket-a","prefix":"","object":"object.txt","stage":"migrate_object","objectsDecommissioned":2,"objectsDecommissionedFailed":1,"bytesDecommissioned":128,"bytesDecommissionedFailed":64,"waitingReason":"queued"}}"#; let status: PoolStatus = serde_json::from_str(json).unwrap(); assert_eq!(status.id, 1); assert_eq!(status.cmd_line, "/data/pool1/disk{1...4}"); + assert_eq!(status.status, "decommissioning"); + assert_eq!(status.decommission_status, "running"); + assert_eq!(status.rebalance_status, "none"); + assert_eq!(status.used_size, 400); let info = status.decommission.expect("decommission info exists"); + assert!(info.queued); + assert_eq!(info.queued_buckets, vec!["bucket-a"]); + assert_eq!(info.bucket, "bucket-a"); + assert_eq!(info.object, "object.txt"); + assert_eq!(info.waiting_reason.as_deref(), Some("queued")); assert_eq!(info.objects_decommissioned, 2); assert_eq!(info.bytes_decommissioned_failed, 64); } + #[test] + fn test_decommission_status_deserialization() { + let json = r#"{"pools":[{"id":2,"cmdline":"/data/pool2/disk{1...4}","status":"failed","poolStatus":"blocked","decommissionInfo":{"failed":true,"totalSize":1000,"currentSize":900}}]}"#; + + let status: DecommissionStatus = serde_json::from_str(json).unwrap(); + + assert_eq!(status.pools.len(), 1); + assert_eq!(status.pools[0].id, 2); + assert_eq!(status.pools[0].status, "failed"); + assert_eq!(status.pools[0].pool_status, "blocked"); + assert!( + status.pools[0] + .decommission + .as_ref() + .is_some_and(|info| info.failed) + ); + } + #[test] fn test_rebalance_status_deserialization() { let json = r#"{"id":"rebalance-1","pools":[{"id":0,"status":"Started","used":0.5,"lastError":null,"cleanupWarnings":{"count":1,"lastMsg":"cleanup warning","lastBucket":"bucket","lastObject":"object","lastAt":"2026-06-12T00:00:00Z"},"progress":{"objects":3,"versions":4,"bytes":1024,"remainingBuckets":2,"bucket":"bucket","object":"object","elapsed":10,"eta":20}}],"stoppedAt":null}"#; diff --git a/crates/core/src/admin/mod.rs b/crates/core/src/admin/mod.rs index 19f5769..1f90a40 100644 --- a/crates/core/src/admin/mod.rs +++ b/crates/core/src/admin/mod.rs @@ -8,11 +8,11 @@ pub mod tier; mod types; pub use cluster::{ - BackendInfo, BackendType, BucketsInfo, ClusterInfo, DiskInfo, HealDriveInfo, HealDriveInfos, - HealResultItem, HealScanMode, HealStartRequest, HealStatus, HealingDiskInfo, MemStats, - ObjectsInfo, PoolDecommissionInfo, PoolErasureSetInfo, PoolStatus, PoolTarget, - RebalanceCleanupWarnings, RebalancePoolProgress, RebalancePoolStatus, RebalanceStartResult, - RebalanceStatus, ServerInfo, UsageInfo, + BackendInfo, BackendType, BucketsInfo, ClusterInfo, DecommissionPoolStatus, DecommissionStatus, + DiskInfo, HealDriveInfo, HealDriveInfos, HealResultItem, HealScanMode, HealStartRequest, + HealStatus, HealingDiskInfo, MemStats, ObjectsInfo, PoolDecommissionInfo, PoolErasureSetInfo, + PoolStatus, PoolTarget, RebalanceCleanupWarnings, RebalancePoolProgress, RebalancePoolStatus, + RebalanceStartResult, RebalanceStatus, ServerInfo, UsageInfo, }; pub use tier::{ TierAliyun, TierAzure, TierConfig, TierCreds, TierGCS, TierHuaweicloud, TierMinIO, TierR2, @@ -65,6 +65,9 @@ pub trait AdminApi: Send + Sync { /// Clear failed or canceled decommissioning metadata for a storage pool async fn decommission_clear(&self, target: PoolTarget) -> Result<()>; + /// Get decommissioning status + async fn decommission_status(&self, target: Option) -> Result; + /// Start a rebalance operation async fn rebalance_start(&self) -> Result; diff --git a/crates/s3/src/admin.rs b/crates/s3/src/admin.rs index f463c21..be6e452 100644 --- a/crates/s3/src/admin.rs +++ b/crates/s3/src/admin.rs @@ -10,10 +10,11 @@ use aws_sigv4::http_request::{ }; use aws_sigv4::sign::v4; use rc_core::admin::{ - AccessKeyInfo, AdminApi, BucketQuota, ClusterInfo, CreateServiceAccountRequest, Group, - GroupStatus, HealScanMode, HealStartRequest, HealStatus, Policy, PolicyEntity, PolicyInfo, - PoolStatus, PoolTarget, RebalanceStartResult, RebalanceStatus, ServiceAccount, - ServiceAccountCreateResponse, UpdateGroupMembersRequest, User, UserStatus, + AccessKeyInfo, AdminApi, BucketQuota, ClusterInfo, CreateServiceAccountRequest, + DecommissionPoolStatus, DecommissionStatus, Group, GroupStatus, HealScanMode, HealStartRequest, + HealStatus, Policy, PolicyEntity, PolicyInfo, PoolStatus, PoolTarget, RebalanceStartResult, + RebalanceStatus, ServiceAccount, ServiceAccountCreateResponse, UpdateGroupMembersRequest, User, + UserStatus, }; use rc_core::{Alias, Error, Result}; use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue}; @@ -571,6 +572,24 @@ impl AdminApi for AdminClient { .await } + async fn decommission_status(&self, target: Option) -> Result { + if let Some(target) = target { + let query = pool_target_query(&target); + let pool = self + .request::( + Method::GET, + "/decommission/status", + Some(&query), + None, + ) + .await?; + Ok(DecommissionStatus { pools: vec![pool] }) + } else { + self.request(Method::GET, "/decommission/status", None, None) + .await + } + } + async fn rebalance_start(&self) -> Result { self.request(Method::POST, "/rebalance/start", None, None) .await @@ -1534,6 +1553,60 @@ mod tests { handle.join().expect("server thread should finish"); } + #[tokio::test] + async fn test_decommission_status_uses_status_route() { + let (endpoint, receiver, handle) = start_admin_test_server( + "200 OK", + r#"{"pools":[{"id":0,"cmdline":"/data/pool0/disk{1...4}","status":"running","poolStatus":"decommissioning","decommissionInfo":null}]}"#, + ); + let client = admin_client_for_endpoint(&endpoint); + + let status = client + .decommission_status(None) + .await + .expect("decommission status request"); + + assert_eq!(status.pools.len(), 1); + assert_eq!(status.pools[0].status, "running"); + assert_eq!(status.pools[0].pool_status, "decommissioning"); + + let request = receiver.recv().expect("captured request"); + assert_eq!(request.method, "GET"); + assert_eq!(request.target, "/rustfs/admin/v3/decommission/status"); + assert!(request.body.is_empty()); + handle.join().expect("server thread should finish"); + } + + #[tokio::test] + async fn test_decommission_status_uses_status_route_with_by_id_query() { + let (endpoint, receiver, handle) = start_admin_test_server( + "200 OK", + r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","status":"failed","poolStatus":"blocked","decommissionInfo":null}"#, + ); + let client = admin_client_for_endpoint(&endpoint); + + let status = client + .decommission_status(Some(PoolTarget { + pool: "1".to_string(), + by_id: true, + })) + .await + .expect("decommission status request"); + + assert_eq!(status.pools.len(), 1); + assert_eq!(status.pools[0].id, 1); + assert_eq!(status.pools[0].status, "failed"); + + let request = receiver.recv().expect("captured request"); + assert_eq!(request.method, "GET"); + assert_eq!( + request.target, + "/rustfs/admin/v3/decommission/status?pool=1&by-id=true" + ); + assert!(request.body.is_empty()); + handle.join().expect("server thread should finish"); + } + #[tokio::test] async fn test_decommission_start_posts_pool_query() { let (endpoint, receiver, handle) = start_admin_test_server("200 OK", ""); diff --git a/schemas/output_v2.json b/schemas/output_v2.json index 9d74be4..63449ea 100644 --- a/schemas/output_v2.json +++ b/schemas/output_v2.json @@ -356,6 +356,40 @@ "type": "boolean", "description": "Whether decommission was canceled" }, + "queued": { + "type": "boolean", + "description": "Whether decommission is queued" + }, + "queuedBuckets": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Buckets waiting to be decommissioned" + }, + "decommissionedBuckets": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Buckets already decommissioned" + }, + "bucket": { + "type": "string", + "description": "Current bucket" + }, + "prefix": { + "type": "string", + "description": "Current prefix" + }, + "object": { + "type": "string", + "description": "Current object" + }, + "stage": { + "type": "string", + "description": "Current decommission stage" + }, "objectsDecommissioned": { "type": "integer", "description": "Objects successfully moved off the pool" @@ -371,6 +405,17 @@ "bytesDecommissionedFailed": { "type": "integer", "description": "Bytes that failed to move off the pool" + }, + "waitingReason": { + "oneOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "description": "Reason why decommission is waiting" } } }, @@ -380,6 +425,13 @@ "id", "cmdline", "lastUpdate", + "status", + "decommissionStatus", + "rebalanceStatus", + "totalSize", + "currentSize", + "usedSize", + "used", "decommissionInfo" ], "properties": { @@ -395,6 +447,34 @@ "$ref": "#/definitions/timestamp", "description": "Last pool metadata update timestamp" }, + "status": { + "type": "string", + "description": "Pool lifecycle status" + }, + "decommissionStatus": { + "type": "string", + "description": "Decommission operation status for this pool" + }, + "rebalanceStatus": { + "type": "string", + "description": "Rebalance operation status for this pool" + }, + "totalSize": { + "type": "integer", + "description": "Total pool size in bytes" + }, + "currentSize": { + "type": "integer", + "description": "Current free bytes in the pool" + }, + "usedSize": { + "type": "integer", + "description": "Used pool size in bytes" + }, + "used": { + "type": "number", + "description": "Used capacity ratio in the range 0.0..=1.0" + }, "decommissionInfo": { "oneOf": [ { @@ -408,6 +488,59 @@ } } }, + "decommissionPoolStatus": { + "type": "object", + "required": [ + "id", + "cmdline", + "status", + "poolStatus", + "decommissionInfo" + ], + "properties": { + "id": { + "type": "integer", + "description": "Zero-based pool ID" + }, + "cmdline": { + "type": "string", + "description": "Pool command line used by the server process" + }, + "status": { + "type": "string", + "description": "Decommission operation status for this pool" + }, + "poolStatus": { + "type": "string", + "description": "Pool lifecycle status" + }, + "decommissionInfo": { + "oneOf": [ + { + "$ref": "#/definitions/poolDecommissionInfo" + }, + { + "type": "null" + } + ], + "description": "Decommission state and progress for this pool" + } + } + }, + "decommissionStatus": { + "type": "object", + "required": [ + "pools" + ], + "properties": { + "pools": { + "type": "array", + "items": { + "$ref": "#/definitions/decommissionPoolStatus" + } + } + } + }, "rebalancePoolProgress": { "type": "object", "required": [ @@ -796,6 +929,11 @@ } } }, + { + "title": "admin decommission status", + "description": "Decommission status output", + "$ref": "#/definitions/decommissionStatus" + }, { "title": "admin rebalance status", "description": "Rebalance status output",