Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 91 additions & 35 deletions crates/cli/src/commands/admin/decommission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::Serialize;
use super::{get_admin_client, pool};
use crate::exit_code::ExitCode;
use crate::output::Formatter;
use rc_core::admin::{AdminApi, PoolStatus, PoolTarget};
use rc_core::admin::{AdminApi, DecommissionPoolStatus, PoolDecommissionInfo, PoolTarget};

/// Decommission subcommands
#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -83,11 +83,6 @@ struct DecommissionOperationOutput {
pool: String,
}

#[derive(Serialize)]
struct DecommissionStatusListOutput {
pools: Vec<PoolStatus>,
}

/// Execute a decommission subcommand
pub async fn execute(cmd: DecommissionCommands, formatter: &Formatter) -> ExitCode {
match cmd {
Expand Down Expand Up @@ -138,43 +133,104 @@ async fn execute_status(args: StatusArgs, formatter: &Formatter) -> ExitCode {
Err(code) => return code,
};

if let Some(pool_name) = args.pool {
let target = PoolTarget {
pool: pool_name,
by_id: args.by_id,
};
match client.pool_status(target).await {
Ok(status) => {
if formatter.is_json() {
formatter.json(&status);
} else {
pool::print_pool_status(&status, formatter);
}
ExitCode::Success
}
Err(e) => {
formatter.error(&format!("Failed to get decommission status: {e}"));
ExitCode::GeneralError
let target = args.pool.map(|pool| PoolTarget {
pool,
by_id: args.by_id,
});

match client.decommission_status(target).await {
Ok(status) => {
if formatter.is_json() {
formatter.json(&status);
} else {
print_decommission_status(&status.pools, formatter);
}
ExitCode::Success
}
} else {
match client.list_pools().await {
Ok(pools) => {
if formatter.is_json() {
formatter.json(&DecommissionStatusListOutput { pools });
} else {
pool::print_pool_list(&pools, formatter);
}
ExitCode::Success
Err(e) => {
formatter.error(&format!("Failed to get decommission status: {e}"));
ExitCode::GeneralError
}
}
}

fn print_decommission_status(pools: &[DecommissionPoolStatus], formatter: &Formatter) {
formatter.println(&formatter.style_name("Decommission:"));
if pools.is_empty() {
formatter.println(" No decommission status found.");
return;
}

for item in pools {
let decommission = item.decommission.as_ref();
let operation_status = status_or_default(&item.status, decommission_state(decommission));
let pool_status = status_or_default(&item.pool_status, "unknown");

formatter.println(&format!(
" Pool {}: {}",
item.id,
formatter.style_url(&item.cmd_line)
));
formatter.println(&format!(
" Status: {}",
pool::style_state(operation_status, formatter)
));
formatter.println(&format!(
" Pool status: {}",
pool::style_state(pool_status, formatter)
));

if let Some(info) = decommission {
let used = info.total_size.saturating_sub(info.current_size);
if info.total_size > 0 {
formatter.println(&format!(
" Usage: {} / {}",
formatter.style_size(&pool::format_bytes(used)),
pool::format_bytes(info.total_size)
));
}
Err(e) => {
formatter.error(&format!("Failed to get decommission status: {e}"));
ExitCode::GeneralError

if info.objects_decommissioned > 0
|| info.objects_decommissioned_failed > 0
|| info.bytes_decommissioned > 0
|| info.bytes_decommissioned_failed > 0
{
formatter.println(&format!(
" Progress: {}, {} failed; {}, {} failed bytes",
info.objects_decommissioned,
info.objects_decommissioned_failed,
pool::format_bytes(info.bytes_decommissioned),
pool::format_bytes(info.bytes_decommissioned_failed)
));
}
}
}
}

fn status_or_default<'a>(status: &'a str, default: &'a str) -> &'a str {
if status.is_empty() { default } else { status }
}

fn decommission_state(decommission: Option<&PoolDecommissionInfo>) -> &'static str {
let Some(info) = decommission else {
return "none";
};

if info.complete {
"complete"
} else if info.failed {
"failed"
} else if info.canceled {
"canceled"
} else if info.queued {
"queued"
} else if info.start_time.is_some() {
"running"
} else {
"none"
}
}

async fn execute_cancel(args: CancelArgs, formatter: &Formatter) -> ExitCode {
let client = match get_admin_client(&args.alias, formatter) {
Ok(c) => c,
Expand Down
75 changes: 62 additions & 13 deletions crates/cli/src/commands/admin/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,34 @@ pub(super) fn print_pool_list(pools: &[PoolStatus], formatter: &Formatter) {

pub(super) fn print_pool_status(pool: &PoolStatus, formatter: &Formatter) {
let decommission = pool.decommission.as_ref();
let state = decommission_state(decommission);
let lifecycle_state = pool_lifecycle_state(pool);
let decommission_state = pool_decommission_state(pool);
let rebalance_state = status_or_default(&pool.rebalance_status, "none");
let used = decommission
.map(|info| info.total_size.saturating_sub(info.current_size))
.or_else(|| (pool.total_size > 0).then_some(pool.used_size))
.unwrap_or_default();
let total = decommission
.map(|info| info.total_size)
.or_else(|| (pool.total_size > 0).then_some(pool.total_size))
.unwrap_or_default();
let total = decommission.map(|info| info.total_size).unwrap_or_default();

formatter.println(&format!(
" Pool {}: {}",
pool.id,
formatter.style_url(&pool.cmd_line)
));
formatter.println(&format!(
" Status: {}",
style_state(lifecycle_state, formatter)
));
formatter.println(&format!(
" Decommission: {}",
style_state(state, formatter)
style_state(decommission_state, formatter)
));
formatter.println(&format!(
" Rebalance: {}",
style_state(rebalance_state, formatter)
));

if total > 0 {
Expand All @@ -169,9 +183,29 @@ pub(super) fn print_pool_status(pool: &PoolStatus, formatter: &Formatter) {
}
}

fn pool_lifecycle_state(pool: &PoolStatus) -> &str {
if !pool.status.is_empty() {
return pool.status.as_str();
}

match decommission_state(pool.decommission.as_ref()) {
"complete" => "decommissioned",
"failed" | "canceled" => "blocked",
_ => "active",
}
}

fn pool_decommission_state(pool: &PoolStatus) -> &str {
if !pool.decommission_status.is_empty() {
return pool.decommission_status.as_str();
}

decommission_state(pool.decommission.as_ref())
}

fn decommission_state(decommission: Option<&PoolDecommissionInfo>) -> &'static str {
let Some(info) = decommission else {
return "not started";
return "none";
};

if info.complete {
Expand All @@ -180,24 +214,32 @@ fn decommission_state(decommission: Option<&PoolDecommissionInfo>) -> &'static s
"failed"
} else if info.canceled {
"canceled"
} else if info.queued {
"queued"
} else if info.start_time.is_some() {
"running"
} else {
"not started"
"none"
}
}

fn style_state(state: &str, formatter: &Formatter) -> String {
pub(super) fn style_state(state: &str, formatter: &Formatter) -> String {
match state {
"complete" => formatter.style_size(state),
"failed" => formatter.theme().error.apply_to(state).to_string(),
"canceled" => formatter.theme().warning.apply_to(state).to_string(),
"running" => formatter.style_name(state),
"active" | "complete" | "completed" | "decommissioned" => formatter.style_size(state),
"blocked" | "failed" => formatter.theme().error.apply_to(state).to_string(),
"canceled" | "queued" | "stopping" | "stopped" => {
formatter.theme().warning.apply_to(state).to_string()
}
"decommissioning" | "running" | "started" => formatter.style_name(state),
_ => formatter.style_date(state),
}
}

fn format_bytes(bytes: u64) -> String {
fn status_or_default<'a>(status: &'a str, default: &'static str) -> &'a str {
if status.is_empty() { default } else { status }
}

pub(super) fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = KB * 1024;
const GB: u64 = MB * 1024;
Expand All @@ -222,10 +264,17 @@ mod tests {

#[test]
fn test_decommission_state() {
assert_eq!(decommission_state(None), "not started");
assert_eq!(decommission_state(None), "none");
assert_eq!(
decommission_state(Some(&PoolDecommissionInfo::default())),
"not started"
"none"
);
assert_eq!(
decommission_state(Some(&PoolDecommissionInfo {
queued: true,
..Default::default()
})),
"queued"
);
assert_eq!(
decommission_state(Some(&PoolDecommissionInfo {
Expand Down
56 changes: 53 additions & 3 deletions crates/cli/tests/admin_decommission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ fn decommission_start_dispatches_by_id_pool_json() {
}

#[test]
fn decommission_status_without_pool_dispatches_to_pool_list_json() {
fn decommission_status_without_pool_dispatches_to_decommission_status_json() {
let config_dir = tempfile::tempdir().expect("create config dir");
let (endpoint, receiver, handle) = start_admin_test_server(
r#"[{"id":0,"cmdline":"/data/pool0/disk{1...4}","lastUpdate":"2026-05-06T00:00:00Z","decommissionInfo":null}]"#,
r#"{"pools":[{"id":0,"cmdline":"/data/pool0/disk{1...4}","status":"none","poolStatus":"active","decommissionInfo":null}]}"#,
);

let output = Command::new(rc_binary())
Expand All @@ -77,12 +77,62 @@ fn decommission_status_without_pool_dispatches_to_pool_list_json() {
assert_eq!(pools.len(), 1);
assert_eq!(pools[0]["id"], 0);
assert_eq!(pools[0]["cmdline"], "/data/pool0/disk{1...4}");
assert_eq!(pools[0]["status"], "none");
assert_eq!(pools[0]["poolStatus"], "active");

let request = receiver
.recv_timeout(Duration::from_secs(5))
.expect("captured admin request");
assert_eq!(request.method, "GET");
assert_eq!(request.target, "/rustfs/admin/v3/pools/list");
assert_eq!(request.target, "/rustfs/admin/v3/decommission/status");

handle.join().expect("admin test server finished");
}

#[test]
fn decommission_status_with_pool_dispatches_to_decommission_status_json() {
let config_dir = tempfile::tempdir().expect("create config dir");
let (endpoint, receiver, handle) = start_admin_test_server(
r#"{"id":1,"cmdline":"/data/pool1/disk{1...4}","status":"failed","poolStatus":"blocked","decommissionInfo":null}"#,
);

let output = Command::new(rc_binary())
.args([
"--json",
"admin",
"decommission",
"status",
"myalias",
"1",
"--by-id",
])
.env("RC_CONFIG_DIR", config_dir.path())
.env("RC_HOST_myalias", rc_host_alias(&endpoint))
.output()
.expect("run rc command");

assert!(
output.status.success(),
"stderr: {}",
String::from_utf8_lossy(&output.stderr)
);

let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8");
let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output");
let pools = payload["pools"].as_array().expect("pools array");
assert_eq!(pools.len(), 1);
assert_eq!(pools[0]["id"], 1);
assert_eq!(pools[0]["status"], "failed");
assert_eq!(pools[0]["poolStatus"], "blocked");

let request = receiver
.recv_timeout(Duration::from_secs(5))
.expect("captured admin request");
assert_eq!(request.method, "GET");
assert_eq!(
request.target,
"/rustfs/admin/v3/decommission/status?pool=1&by-id=true"
);

handle.join().expect("admin test server finished");
}
Expand Down
Loading
Loading