From 2e6162109187e5e2fa461b7b2b122cce3d8babd2 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Fri, 19 Jun 2026 09:00:37 +0800 Subject: [PATCH 1/6] [rust] Add Fluss 1.x protocol support to the admin client Add 27 new admin methods to FlussAdmin: - Database/table extensions: list_database_summaries, alter_database, alter_table, get_table_stats - KV snapshot operations: get_latest_kv_snapshots, get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot - ACL management: create_acls, list_acls, drop_acls - Cluster configuration: describe_cluster_configs, alter_cluster_configs - Server management: add_server_tag, remove_server_tag, rebalance, list_rebalance_progress, cancel_rebalance - Producer offsets: register_producer_offsets, get_producer_offsets, delete_producer_offsets - Monitoring: get_cluster_health, list_remote_log_manifests - KV snapshots: list_kv_snapshots, release_kv_snapshot_lease, drop_kv_snapshot_lease --- crates/fluss/src/client/admin.rs | 390 +++++++++++++++++- crates/fluss/src/metadata/acl.rs | 74 +++- crates/fluss/src/metadata/cluster_health.rs | 59 +++ crates/fluss/src/metadata/config.rs | 29 +- crates/fluss/src/metadata/database.rs | 18 + crates/fluss/src/metadata/kv_snapshot.rs | 170 ++++++++ crates/fluss/src/metadata/lake_snapshot.rs | 82 ++++ crates/fluss/src/metadata/mod.rs | 10 + crates/fluss/src/metadata/producer_offsets.rs | 24 +- crates/fluss/src/metadata/rebalance.rs | 125 ++++++ crates/fluss/src/metadata/remote_log.rs | 59 +++ crates/fluss/src/metadata/table_change.rs | 12 + crates/fluss/src/metadata/table_stats.rs | 76 +++- 13 files changed, 1119 insertions(+), 9 deletions(-) create mode 100644 crates/fluss/src/metadata/cluster_health.rs create mode 100644 crates/fluss/src/metadata/kv_snapshot.rs create mode 100644 crates/fluss/src/metadata/lake_snapshot.rs create mode 100644 crates/fluss/src/metadata/rebalance.rs create mode 100644 crates/fluss/src/metadata/remote_log.rs diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index d3d5a5e3..e3d78d7e 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -18,14 +18,27 @@ use crate::client::metadata::Metadata; use crate::cluster::ServerNode; use crate::metadata::{ - DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec, - PhysicalTablePath, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, + AclFilter, AclInfo, AcquireKvSnapshotLeaseResult, ActiveKvSnapshots, AlterConfig, + AlterTableChanges, ClusterHealth, CreateAclResult, DatabaseDescriptor, DatabaseInfo, + DatabaseSummary, DescribeConfig, DropAclsFilterResult, JsonSerde, KvSnapshotLeaseForTable, + KvSnapshotMetadata, LakeSnapshot, LakeSnapshotInfo, LatestKvSnapshots, PartitionInfo, + PartitionSpec, PhysicalTablePath, ProducerOffsets, ProducerTableOffsets, RebalanceProgress, + RemoteLogManifestEntry, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, + TableStats, }; use crate::rpc::message::{ + AcquireKvSnapshotLeaseRequest, AddServerTagRequest, AlterClusterConfigsRequest, + AlterDatabaseRequest, AlterTableRequest, CancelRebalanceRequest, CreateAclsRequest, CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest, - DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest, - GetLatestLakeSnapshotRequest, GetTableRequest, GetTableSchemaRequestMsg, ListDatabasesRequest, - ListPartitionInfosRequest, ListTablesRequest, TableExistsRequest, + DeleteProducerOffsetsRequest, DescribeClusterConfigsRequest, DropAclsRequest, + DropDatabaseRequest, DropKvSnapshotLeaseRequest, DropPartitionRequest, DropTableRequest, + GetClusterHealthRequest, GetDatabaseInfoRequest, GetKvSnapshotMetadataRequest, + GetLakeSnapshotRequest, GetLatestKvSnapshotsRequest, GetLatestLakeSnapshotRequest, + GetProducerOffsetsRequest, GetTableRequest, GetTableSchemaRequestMsg, GetTableStatsRequest, + ListAclsRequest, ListDatabaseSummariesRequest, ListDatabasesRequest, ListKvSnapshotsRequest, + ListPartitionInfosRequest, ListRebalanceProgressRequest, ListRemoteLogManifestsRequest, + ListTablesRequest, RebalanceRequest, RegisterProducerOffsetsRequest, + ReleaseKvSnapshotLeaseRequest, RemoveServerTagRequest, TableExistsRequest, }; use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; use crate::rpc::{RpcClient, ServerConnection}; @@ -483,4 +496,371 @@ impl FlussAdmin { } Ok(tasks) } + + /// List database summaries (name, created_time, table_count). + pub async fn list_database_summaries(&self) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListDatabaseSummariesRequest::new()) + .await?; + Ok(response + .database_summary + .iter() + .map(DatabaseSummary::from_pb) + .collect()) + } + + /// Alter a database's configuration. + pub async fn alter_database( + &self, + name: &str, + config_changes: Vec, + ignore_if_not_exists: bool, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterDatabaseRequest::new( + name, + ignore_if_not_exists, + config_changes, + )) + .await?; + Ok(()) + } + + /// Alter a table: config changes plus any combination of add/drop/rename/modify columns. + /// Bundle the column-level edits in [`AlterTableChanges`]. + pub async fn alter_table( + &self, + table_path: &TablePath, + ignore_if_not_exists: bool, + changes: AlterTableChanges, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterTableRequest::new( + table_path, + ignore_if_not_exists, + changes.config_changes, + changes.add_columns, + changes.drop_columns, + changes.rename_columns, + changes.modify_columns, + )) + .await?; + Ok(()) + } + + /// Get table statistics for buckets. Pass empty `target_columns` to request stats for all columns. + pub async fn get_table_stats( + &self, + table_id: i64, + buckets_req: Vec, + target_columns: Vec, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetTableStatsRequest::new( + table_id, + buckets_req, + target_columns, + )) + .await?; + Ok(TableStats::from_pb(&response)) + } + + /// Get the latest KV snapshots for a table (optionally scoped to one partition). + pub async fn get_latest_kv_snapshots( + &self, + table_path: &TablePath, + partition_name: Option<&str>, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetLatestKvSnapshotsRequest::new(table_path, partition_name)) + .await?; + Ok(LatestKvSnapshots::from_pb(&response)) + } + + /// Get KV snapshot metadata (manifest file list). + pub async fn get_kv_snapshot_metadata( + &self, + table_id: i64, + partition_id: Option, + bucket_id: i32, + snapshot_id: i64, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetKvSnapshotMetadataRequest::new( + table_id, + partition_id, + bucket_id, + snapshot_id, + )) + .await?; + Ok(KvSnapshotMetadata::from_pb(&response)) + } + + /// Acquire a KV snapshot lease. Returns the snapshots the server could not lease. + pub async fn create_kv_snapshot_lease( + &self, + lease_id: &str, + lease_duration_ms: i64, + snapshots_to_lease: Vec, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(AcquireKvSnapshotLeaseRequest::new( + lease_id, + lease_duration_ms, + snapshots_to_lease, + )) + .await?; + Ok(AcquireKvSnapshotLeaseResult::from_pb(&response)) + } + + /// Get a specific lake snapshot for a table. + pub async fn get_lake_snapshot( + &self, + table_path: &TablePath, + snapshot_id: Option, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetLakeSnapshotRequest::new(table_path, snapshot_id, None)) + .await?; + Ok(LakeSnapshotInfo::from_pb(&response)) + } + + /// Create ACLs. Returns one result per submitted ACL (success or per-ACL error). + pub async fn create_acls(&self, acl: Vec) -> Result> { + let response = self + .admin_gateway() + .await? + .request(CreateAclsRequest::new(acl)) + .await?; + response + .acl_res + .iter() + .map(CreateAclResult::from_pb) + .collect() + } + + /// List ACLs matching a filter. + pub async fn list_acls(&self, acl_filter: AclFilter) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListAclsRequest::new(acl_filter)) + .await?; + response.acl.iter().map(AclInfo::from_pb).collect() + } + + /// Drop ACLs matching filters. Returns one result per submitted filter. + pub async fn drop_acls(&self, acl_filter: Vec) -> Result> { + let response = self + .admin_gateway() + .await? + .request(DropAclsRequest::new(acl_filter)) + .await?; + response + .filter_results + .iter() + .map(DropAclsFilterResult::from_pb) + .collect() + } + + /// Describe cluster configuration. + pub async fn describe_cluster_configs(&self) -> Result> { + let response = self + .admin_gateway() + .await? + .request(DescribeClusterConfigsRequest::new()) + .await?; + Ok(response + .configs + .iter() + .map(DescribeConfig::from_pb) + .collect()) + } + + /// Alter cluster configuration. + pub async fn alter_cluster_configs(&self, alter_configs: Vec) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterClusterConfigsRequest::new(alter_configs)) + .await?; + Ok(()) + } + + /// Add a tag to servers. + pub async fn add_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AddServerTagRequest::new(server_ids, server_tag)) + .await?; + Ok(()) + } + + /// Remove a tag from servers. + pub async fn remove_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(RemoveServerTagRequest::new(server_ids, server_tag)) + .await?; + Ok(()) + } + + /// Trigger a rebalance. Returns the rebalance id assigned by the server. + pub async fn rebalance(&self, goals: Vec) -> Result { + let response = self + .admin_gateway() + .await? + .request(RebalanceRequest::new(goals)) + .await?; + Ok(response.rebalance_id) + } + + /// List rebalance progress (for a specific rebalance id, or all in-flight ones if `None`). + pub async fn list_rebalance_progress( + &self, + rebalance_id: Option<&str>, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(ListRebalanceProgressRequest::new(rebalance_id)) + .await?; + Ok(RebalanceProgress::from_pb(&response)) + } + + /// Cancel a rebalance. + pub async fn cancel_rebalance(&self, rebalance_id: Option<&str>) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(CancelRebalanceRequest::new(rebalance_id)) + .await?; + Ok(()) + } + + /// Register producer offsets. Returns the server-side result code (if any). + pub async fn register_producer_offsets( + &self, + producer_id: &str, + table_offsets: Vec, + ttl_ms: Option, + ) -> Result> { + let response = self + .admin_gateway() + .await? + .request(RegisterProducerOffsetsRequest::new( + producer_id, + table_offsets, + ttl_ms, + )) + .await?; + Ok(response.result) + } + + /// Get producer offsets. + pub async fn get_producer_offsets(&self, producer_id: &str) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetProducerOffsetsRequest::new(producer_id)) + .await?; + Ok(ProducerOffsets::from_pb(&response)) + } + + /// Delete producer offsets. + pub async fn delete_producer_offsets(&self, producer_id: &str) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(DeleteProducerOffsetsRequest::new(producer_id)) + .await?; + Ok(()) + } + + /// Get cluster health status. + pub async fn get_cluster_health(&self) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetClusterHealthRequest::new()) + .await?; + Ok(ClusterHealth::from_pb(&response)) + } + + /// List remote log manifests for a table (optionally scoped to one partition). + pub async fn list_remote_log_manifests( + &self, + table_id: i64, + partition_id: Option, + ) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListRemoteLogManifestsRequest::new(table_id, partition_id)) + .await?; + Ok(response + .manifests + .iter() + .map(RemoteLogManifestEntry::from_pb) + .collect()) + } + + /// List active KV snapshots for a table (optionally scoped to one partition). + pub async fn list_kv_snapshots( + &self, + table_id: i64, + partition_id: Option, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(ListKvSnapshotsRequest::new(table_id, partition_id)) + .await?; + Ok(ActiveKvSnapshots::from_pb(&response)) + } + + /// Release specific bucket snapshots from a KV snapshot lease. + pub async fn release_kv_snapshot_lease( + &self, + lease_id: &str, + buckets_to_release: Vec, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(ReleaseKvSnapshotLeaseRequest::new( + lease_id, + buckets_to_release, + )) + .await?; + Ok(()) + } + + /// Drop an entire KV snapshot lease. + pub async fn drop_kv_snapshot_lease(&self, lease_id: &str) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(DropKvSnapshotLeaseRequest::new(lease_id)) + .await?; + Ok(()) + } } diff --git a/crates/fluss/src/metadata/acl.rs b/crates/fluss/src/metadata/acl.rs index 84ec97af..f69ec952 100644 --- a/crates/fluss/src/metadata/acl.rs +++ b/crates/fluss/src/metadata/acl.rs @@ -16,7 +16,9 @@ // under the License. use crate::error::{Error, Result}; -use crate::proto::{PbAclFilter, PbAclInfo}; +use crate::proto::{ + PbAclFilter, PbAclInfo, PbCreateAclRespInfo, PbDropAclsFilterResult, PbDropAclsMatchingAcl, +}; /// Mirrors Java `org.apache.fluss.security.acl.ResourceType`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -197,6 +199,76 @@ impl AclFilter { } } +/// One per ACL submitted to `create_acls`: success or a server-side error. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CreateAclResult { + pub acl: AclInfo, + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AclError { + pub code: i32, + pub message: Option, +} + +impl CreateAclResult { + pub fn from_pb(pb: &PbCreateAclRespInfo) -> Result { + Ok(Self { + acl: AclInfo::from_pb(&pb.acl)?, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + +/// One ACL matched by a filter in `drop_acls`. Reports the bound ACL and any +/// server-side error encountered while dropping it. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DropAclMatchingAcl { + pub acl: AclInfo, + pub error: Option, +} + +impl DropAclMatchingAcl { + pub fn from_pb(pb: &PbDropAclsMatchingAcl) -> Result { + Ok(Self { + acl: AclInfo::from_pb(&pb.acl)?, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + +/// One per filter submitted to `drop_acls`: the matching ACLs that were +/// targeted plus any filter-level error. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DropAclsFilterResult { + pub matching_acls: Vec, + pub error: Option, +} + +impl DropAclsFilterResult { + pub fn from_pb(pb: &PbDropAclsFilterResult) -> Result { + let matching_acls = pb + .matching_acls + .iter() + .map(DropAclMatchingAcl::from_pb) + .collect::>>()?; + Ok(Self { + matching_acls, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/cluster_health.rs b/crates/fluss/src/metadata/cluster_health.rs new file mode 100644 index 00000000..c984ab05 --- /dev/null +++ b/crates/fluss/src/metadata/cluster_health.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::GetClusterHealthResponse; + +/// Result of `get_cluster_health`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ClusterHealth { + pub num_replicas: i32, + pub in_sync_replicas: i32, + pub num_leader_replicas: i32, + pub active_leader_replicas: i32, + pub status: i32, +} + +impl ClusterHealth { + pub fn from_pb(pb: &GetClusterHealthResponse) -> Self { + Self { + num_replicas: pb.num_replicas, + in_sync_replicas: pb.in_sync_replicas, + num_leader_replicas: pb.num_leader_replicas, + active_leader_replicas: pb.active_leader_replicas, + status: pb.status, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cluster_health_from_pb() { + let pb = GetClusterHealthResponse { + num_replicas: 5, + in_sync_replicas: 4, + num_leader_replicas: 3, + active_leader_replicas: 3, + status: 1, + }; + let h = ClusterHealth::from_pb(&pb); + assert_eq!(h.num_replicas, 5); + assert_eq!(h.status, 1); + } +} diff --git a/crates/fluss/src/metadata/config.rs b/crates/fluss/src/metadata/config.rs index 4e045c45..4dd7d8bc 100644 --- a/crates/fluss/src/metadata/config.rs +++ b/crates/fluss/src/metadata/config.rs @@ -16,7 +16,7 @@ // under the License. use crate::error::{Error, Result}; -use crate::proto::PbAlterConfig; +use crate::proto::{PbAlterConfig, PbDescribeConfig}; /// Mirrors Java `org.apache.fluss.config.cluster.AlterConfigOpType`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -88,6 +88,33 @@ impl AlterConfig { } } +/// One entry in the response of `describe_cluster_configs`. Mirrors Java's +/// `org.apache.fluss.config.cluster.DescribeConfig`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DescribeConfig { + pub config_key: String, + pub config_value: Option, + pub config_source: String, +} + +impl DescribeConfig { + pub fn from_pb(pb: &PbDescribeConfig) -> Self { + Self { + config_key: pb.config_key.clone(), + config_value: pb.config_value.clone(), + config_source: pb.config_source.clone(), + } + } + + pub fn to_pb(&self) -> PbDescribeConfig { + PbDescribeConfig { + config_key: self.config_key.clone(), + config_value: self.config_value.clone(), + config_source: self.config_source.clone(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/database.rs b/crates/fluss/src/metadata/database.rs index 15fefb54..5403e9a3 100644 --- a/crates/fluss/src/metadata/database.rs +++ b/crates/fluss/src/metadata/database.rs @@ -205,6 +205,24 @@ impl DatabaseDescriptor { } } +/// Lightweight summary of a database returned by `list_database_summaries`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DatabaseSummary { + pub database_name: String, + pub created_time: i64, + pub table_count: i32, +} + +impl DatabaseSummary { + pub fn from_pb(pb: &crate::proto::PbDatabaseSummary) -> Self { + Self { + database_name: pb.database_name.clone(), + created_time: pb.created_time, + table_count: pb.table_count, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/kv_snapshot.rs b/crates/fluss/src/metadata/kv_snapshot.rs new file mode 100644 index 00000000..f58149ba --- /dev/null +++ b/crates/fluss/src/metadata/kv_snapshot.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{ + AcquireKvSnapshotLeaseResponse, GetKvSnapshotMetadataResponse, GetLatestKvSnapshotsResponse, + ListKvSnapshotsResponse, PbKvSnapshot, PbRemotePathAndLocalFile, +}; + +use crate::metadata::KvSnapshotLeaseForTable; + +/// Per-bucket KV snapshot info. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshot { + pub bucket_id: i32, + pub snapshot_id: Option, + pub log_offset: Option, +} + +impl KvSnapshot { + pub fn from_pb(pb: &PbKvSnapshot) -> Self { + Self { + bucket_id: pb.bucket_id, + snapshot_id: pb.snapshot_id, + log_offset: pb.log_offset, + } + } +} + +/// Result of `get_latest_kv_snapshots`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LatestKvSnapshots { + pub table_id: i64, + pub partition_id: Option, + pub latest_snapshots: Vec, +} + +impl LatestKvSnapshots { + pub fn from_pb(pb: &GetLatestKvSnapshotsResponse) -> Self { + Self { + table_id: pb.table_id, + partition_id: pb.partition_id, + latest_snapshots: pb + .latest_snapshots + .iter() + .map(KvSnapshot::from_pb) + .collect(), + } + } +} + +/// One file in a KV snapshot manifest: its remote path and the local filename +/// the server expects clients to materialize it as. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemotePathAndLocalFile { + pub remote_path: String, + pub local_file_name: String, +} + +impl RemotePathAndLocalFile { + pub fn from_pb(pb: &PbRemotePathAndLocalFile) -> Self { + Self { + remote_path: pb.remote_path.clone(), + local_file_name: pb.local_file_name.clone(), + } + } +} + +/// Result of `get_kv_snapshot_metadata`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshotMetadata { + pub log_offset: i64, + pub snapshot_files: Vec, +} + +impl KvSnapshotMetadata { + pub fn from_pb(pb: &GetKvSnapshotMetadataResponse) -> Self { + Self { + log_offset: pb.log_offset, + snapshot_files: pb + .snapshot_files + .iter() + .map(RemotePathAndLocalFile::from_pb) + .collect(), + } + } +} + +/// Result of `acquire_kv_snapshot_lease` — any snapshots the server could not +/// lease (typically because they were evicted concurrently). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AcquireKvSnapshotLeaseResult { + pub unavailable_snapshots: Vec, +} + +impl AcquireKvSnapshotLeaseResult { + pub fn from_pb(pb: &AcquireKvSnapshotLeaseResponse) -> Self { + Self { + unavailable_snapshots: pb + .unavailable_snapshots + .iter() + .map(KvSnapshotLeaseForTable::from_pb) + .collect(), + } + } +} + +/// Result of `list_kv_snapshots`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ActiveKvSnapshots { + pub table_id: i64, + pub partition_id: Option, + pub active_snapshots: Vec, +} + +impl ActiveKvSnapshots { + pub fn from_pb(pb: &ListKvSnapshotsResponse) -> Self { + Self { + table_id: pb.table_id, + partition_id: pb.partition_id, + active_snapshots: pb + .active_snapshots + .iter() + .map(KvSnapshot::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kv_snapshot_from_pb() { + let pb = PbKvSnapshot { + bucket_id: 3, + snapshot_id: Some(7), + log_offset: Some(42), + }; + let s = KvSnapshot::from_pb(&pb); + assert_eq!(s.bucket_id, 3); + assert_eq!(s.snapshot_id, Some(7)); + assert_eq!(s.log_offset, Some(42)); + } + + #[test] + fn test_remote_path_and_local_file_from_pb() { + let pb = PbRemotePathAndLocalFile { + remote_path: "s3://bucket/snap/1.sst".to_string(), + local_file_name: "1.sst".to_string(), + }; + let f = RemotePathAndLocalFile::from_pb(&pb); + assert_eq!(f.remote_path, "s3://bucket/snap/1.sst"); + assert_eq!(f.local_file_name, "1.sst"); + } +} diff --git a/crates/fluss/src/metadata/lake_snapshot.rs b/crates/fluss/src/metadata/lake_snapshot.rs new file mode 100644 index 00000000..b88d53f9 --- /dev/null +++ b/crates/fluss/src/metadata/lake_snapshot.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{GetLakeSnapshotResponse, PbLakeSnapshotForBucket}; + +/// One bucket's slice of a lake snapshot. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LakeBucketSnapshot { + pub partition_id: Option, + pub bucket_id: i32, + pub log_offset: Option, + pub partition_name: Option, +} + +impl LakeBucketSnapshot { + pub fn from_pb(pb: &PbLakeSnapshotForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + log_offset: pb.log_offset, + partition_name: pb.partition_name.clone(), + } + } +} + +/// Result of `get_lake_snapshot` — a specific snapshot's bucket layout. +/// (Distinct from [`LakeSnapshot`](super::LakeSnapshot), which represents the +/// "latest" snapshot summary returned by `get_latest_lake_snapshot`.) +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LakeSnapshotInfo { + pub table_id: i64, + pub snapshot_id: i64, + pub bucket_snapshots: Vec, +} + +impl LakeSnapshotInfo { + pub fn from_pb(pb: &GetLakeSnapshotResponse) -> Self { + Self { + table_id: pb.table_id, + snapshot_id: pb.snapshot_id, + bucket_snapshots: pb + .bucket_snapshots + .iter() + .map(LakeBucketSnapshot::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lake_bucket_snapshot_from_pb() { + let pb = PbLakeSnapshotForBucket { + partition_id: Some(1), + bucket_id: 2, + log_offset: Some(3), + partition_name: Some("date=2024-01-01".to_string()), + }; + let s = LakeBucketSnapshot::from_pb(&pb); + assert_eq!(s.bucket_id, 2); + assert_eq!(s.partition_id, Some(1)); + assert_eq!(s.log_offset, Some(3)); + assert_eq!(s.partition_name.as_deref(), Some("date=2024-01-01")); + } +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 0249fefe..4eaa5c5b 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -16,15 +16,20 @@ // under the License. mod acl; +mod cluster_health; mod config; mod data_lake_format; mod database; mod datatype; mod goal_type; mod json_serde; +mod kv_snapshot; mod kv_snapshot_lease; +mod lake_snapshot; mod partition; mod producer_offsets; +mod rebalance; +mod remote_log; mod schema_util; mod server_tag; mod table; @@ -32,15 +37,20 @@ mod table_change; mod table_stats; pub use acl::*; +pub use cluster_health::*; pub use config::*; pub use data_lake_format::*; pub use database::*; pub use datatype::*; pub use goal_type::*; pub use json_serde::*; +pub use kv_snapshot::*; pub use kv_snapshot_lease::*; +pub use lake_snapshot::*; pub use partition::*; pub use producer_offsets::*; +pub use rebalance::*; +pub use remote_log::*; pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; pub use server_tag::*; pub use table::*; diff --git a/crates/fluss/src/metadata/producer_offsets.rs b/crates/fluss/src/metadata/producer_offsets.rs index f0daddbc..501d5a46 100644 --- a/crates/fluss/src/metadata/producer_offsets.rs +++ b/crates/fluss/src/metadata/producer_offsets.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::proto::{PbBucketOffset, PbProducerTableOffsets}; +use crate::proto::{GetProducerOffsetsResponse, PbBucketOffset, PbProducerTableOffsets}; /// Per-bucket producer log-end offset. #[derive(Debug, Clone, PartialEq, Eq)] @@ -74,6 +74,28 @@ impl ProducerTableOffsets { } } +/// Result of `get_producer_offsets`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProducerOffsets { + pub producer_id: Option, + pub expiration_time: Option, + pub table_offsets: Vec, +} + +impl ProducerOffsets { + pub fn from_pb(pb: &GetProducerOffsetsResponse) -> Self { + Self { + producer_id: pb.producer_id.clone(), + expiration_time: pb.expiration_time, + table_offsets: pb + .table_offsets + .iter() + .map(ProducerTableOffsets::from_pb) + .collect(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/rebalance.rs b/crates/fluss/src/metadata/rebalance.rs new file mode 100644 index 00000000..dfb6d75d --- /dev/null +++ b/crates/fluss/src/metadata/rebalance.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{ + ListRebalanceProgressResponse, PbRebalancePlanForBucket, PbRebalanceProgressForBucket, + PbRebalanceProgressForTable, +}; + +/// Per-bucket plan in a rebalance: who the leader was and who it will be, who +/// the replicas were and who they will be. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketRebalancePlan { + pub partition_id: Option, + pub bucket_id: i32, + pub original_leader: Option, + pub new_leader: Option, + pub original_replicas: Vec, + pub new_replicas: Vec, +} + +impl BucketRebalancePlan { + pub fn from_pb(pb: &PbRebalancePlanForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + original_leader: pb.original_leader, + new_leader: pb.new_leader, + original_replicas: pb.original_replicas.clone(), + new_replicas: pb.new_replicas.clone(), + } + } +} + +/// Per-bucket rebalance progress: the planned move and its current status code. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketRebalanceProgress { + pub rebalance_plan: BucketRebalancePlan, + pub rebalance_status: i32, +} + +impl BucketRebalanceProgress { + pub fn from_pb(pb: &PbRebalanceProgressForBucket) -> Self { + Self { + rebalance_plan: BucketRebalancePlan::from_pb(&pb.rebalance_plan), + rebalance_status: pb.rebalance_status, + } + } +} + +/// All bucket progress for one table. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableRebalanceProgress { + pub table_id: i64, + pub buckets_progress: Vec, +} + +impl TableRebalanceProgress { + pub fn from_pb(pb: &PbRebalanceProgressForTable) -> Self { + Self { + table_id: pb.table_id, + buckets_progress: pb + .buckets_progress + .iter() + .map(BucketRebalanceProgress::from_pb) + .collect(), + } + } +} + +/// Result of `list_rebalance_progress`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RebalanceProgress { + pub rebalance_id: Option, + pub rebalance_status: Option, + pub table_progress: Vec, +} + +impl RebalanceProgress { + pub fn from_pb(pb: &ListRebalanceProgressResponse) -> Self { + Self { + rebalance_id: pb.rebalance_id.clone(), + rebalance_status: pb.rebalance_status, + table_progress: pb + .table_progress + .iter() + .map(TableRebalanceProgress::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_rebalance_plan_from_pb() { + let pb = PbRebalancePlanForBucket { + partition_id: Some(1), + bucket_id: 2, + original_leader: Some(3), + new_leader: Some(4), + original_replicas: vec![3, 5, 6], + new_replicas: vec![4, 5, 6], + }; + let p = BucketRebalancePlan::from_pb(&pb); + assert_eq!(p.bucket_id, 2); + assert_eq!(p.new_leader, Some(4)); + assert_eq!(p.new_replicas, vec![4, 5, 6]); + } +} diff --git a/crates/fluss/src/metadata/remote_log.rs b/crates/fluss/src/metadata/remote_log.rs new file mode 100644 index 00000000..d3fd3e33 --- /dev/null +++ b/crates/fluss/src/metadata/remote_log.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TableBucket; +use crate::proto::PbRemoteLogManifestEntry; + +/// One bucket's remote-log manifest pointer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteLogManifestEntry { + pub table_bucket: TableBucket, + pub remote_log_manifest_path: String, + pub remote_log_end_offset: i64, +} + +impl RemoteLogManifestEntry { + pub fn from_pb(pb: &PbRemoteLogManifestEntry) -> Self { + Self { + table_bucket: TableBucket::from_pb(&pb.table_bucket), + remote_log_manifest_path: pb.remote_log_manifest_path.clone(), + remote_log_end_offset: pb.remote_log_end_offset, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::proto::PbTableBucket; + + #[test] + fn test_remote_log_manifest_entry_from_pb() { + let pb = PbRemoteLogManifestEntry { + table_bucket: PbTableBucket { + table_id: 1, + partition_id: None, + bucket_id: 2, + }, + remote_log_manifest_path: "s3://bucket/manifest.json".to_string(), + remote_log_end_offset: 999, + }; + let m = RemoteLogManifestEntry::from_pb(&pb); + assert_eq!(m.remote_log_end_offset, 999); + assert_eq!(m.table_bucket.bucket_id(), 2); + } +} diff --git a/crates/fluss/src/metadata/table_change.rs b/crates/fluss/src/metadata/table_change.rs index 2ecf6cfa..f95aebd9 100644 --- a/crates/fluss/src/metadata/table_change.rs +++ b/crates/fluss/src/metadata/table_change.rs @@ -121,6 +121,18 @@ impl RenameColumn { } } +/// Bundle of column-level changes for a single `alter_table` call. Empty `Vec`s +/// mean "no change of that kind"; pass `Default::default()` to send only +/// config changes. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct AlterTableChanges { + pub config_changes: Vec, + pub add_columns: Vec, + pub drop_columns: Vec, + pub rename_columns: Vec, + pub modify_columns: Vec, +} + /// Modify a column's type/comment/position. Mirrors the `ModifyColumn` variant of /// Java `TableChange`. All fields except `column_name` are optional — only the /// non-`None` ones are applied. diff --git a/crates/fluss/src/metadata/table_stats.rs b/crates/fluss/src/metadata/table_stats.rs index ad2d1fc2..a649844e 100644 --- a/crates/fluss/src/metadata/table_stats.rs +++ b/crates/fluss/src/metadata/table_stats.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::proto::PbTableStatsReqForBucket; +use crate::proto::{GetTableStatsResponse, PbTableStatsReqForBucket, PbTableStatsRespForBucket}; /// Per-bucket request item for `GetTableStats`. /// Mirrors the bucket-stats request shape used by the Java client. @@ -48,6 +48,51 @@ impl BucketStatsRequest { } } +/// Per-bucket stats result returned by `GetTableStats`. `row_count` is `None` +/// when the server returned an error for the bucket; check `error` in that case. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketStats { + pub bucket_id: i32, + pub partition_id: Option, + pub row_count: Option, + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketStatsError { + pub code: i32, + pub message: Option, +} + +impl BucketStats { + pub fn from_pb(pb: &PbTableStatsRespForBucket) -> Self { + let error = pb.error_code.map(|code| BucketStatsError { + code, + message: pb.error_message.clone(), + }); + Self { + bucket_id: pb.bucket_id, + partition_id: pb.partition_id, + row_count: pb.row_count, + error, + } + } +} + +/// Full result of `GetTableStats` — one entry per requested bucket. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableStats { + pub buckets: Vec, +} + +impl TableStats { + pub fn from_pb(pb: &GetTableStatsResponse) -> Self { + Self { + buckets: pb.buckets_resp.iter().map(BucketStats::from_pb).collect(), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -62,4 +107,33 @@ mod tests { assert_eq!(BucketStatsRequest::from_pb(&pb), req); } } + + #[test] + fn test_bucket_stats_from_pb_ok() { + let pb = PbTableStatsRespForBucket { + error_code: None, + error_message: None, + partition_id: Some(1), + bucket_id: 7, + row_count: Some(123), + }; + let s = BucketStats::from_pb(&pb); + assert_eq!(s.bucket_id, 7); + assert_eq!(s.row_count, Some(123)); + assert!(s.error.is_none()); + } + + #[test] + fn test_bucket_stats_from_pb_err() { + let pb = PbTableStatsRespForBucket { + error_code: Some(7), + error_message: Some("nope".to_string()), + partition_id: None, + bucket_id: 2, + row_count: None, + }; + let s = BucketStats::from_pb(&pb); + assert_eq!(s.error.as_ref().unwrap().code, 7); + assert_eq!(s.error.as_ref().unwrap().message.as_deref(), Some("nope")); + } } From f289f88e391ed0476f83052ea1394ee59970d391 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 03:02:41 +0800 Subject: [PATCH 2/6] Address reviewer feedback: add domain enums, type aliases, expose readable - Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs - Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs - Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs - Expose readable parameter in admin.get_lake_snapshot() instead of hardcoding None Addresses reviewer feedback on PR #631. Co-Authored-By: Claude Opus 4.6 --- crates/fluss/src/client/admin.rs | 11 ++- crates/fluss/src/metadata/cluster_health.rs | 64 ++++++++++-- crates/fluss/src/metadata/kv_snapshot.rs | 11 ++- crates/fluss/src/metadata/lake_snapshot.rs | 7 +- crates/fluss/src/metadata/producer_offsets.rs | 7 +- crates/fluss/src/metadata/rebalance.rs | 98 +++++++++++++++---- 6 files changed, 159 insertions(+), 39 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index e3d78d7e..f815ef7f 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -632,11 +632,16 @@ impl FlussAdmin { &self, table_path: &TablePath, snapshot_id: Option, + readable: Option, ) -> Result { let response = self .admin_gateway() .await? - .request(GetLakeSnapshotRequest::new(table_path, snapshot_id, None)) + .request(GetLakeSnapshotRequest::new( + table_path, + snapshot_id, + readable, + )) .await?; Ok(LakeSnapshotInfo::from_pb(&response)) } @@ -743,7 +748,7 @@ impl FlussAdmin { .await? .request(ListRebalanceProgressRequest::new(rebalance_id)) .await?; - Ok(RebalanceProgress::from_pb(&response)) + RebalanceProgress::from_pb(&response) } /// Cancel a rebalance. @@ -802,7 +807,7 @@ impl FlussAdmin { .await? .request(GetClusterHealthRequest::new()) .await?; - Ok(ClusterHealth::from_pb(&response)) + ClusterHealth::from_pb(&response) } /// List remote log manifests for a table (optionally scoped to one partition). diff --git a/crates/fluss/src/metadata/cluster_health.rs b/crates/fluss/src/metadata/cluster_health.rs index c984ab05..938d1f5f 100644 --- a/crates/fluss/src/metadata/cluster_health.rs +++ b/crates/fluss/src/metadata/cluster_health.rs @@ -15,8 +15,41 @@ // specific language governing permissions and limitations // under the License. +use crate::error::{Error, Result}; use crate::proto::GetClusterHealthResponse; +/// Mirrors Java `org.apache.fluss.client.admin.ClusterHealthStatus`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ClusterHealthStatus { + Green, + Yellow, + Red, + Unknown, +} + +impl ClusterHealthStatus { + pub fn to_i32(self) -> i32 { + match self { + Self::Green => 0, + Self::Yellow => 1, + Self::Red => 2, + Self::Unknown => 3, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Green), + 1 => Ok(Self::Yellow), + 2 => Ok(Self::Red), + 3 => Ok(Self::Unknown), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported ClusterHealthStatus: {value}"), + }), + } + } +} + /// Result of `get_cluster_health`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ClusterHealth { @@ -24,18 +57,18 @@ pub struct ClusterHealth { pub in_sync_replicas: i32, pub num_leader_replicas: i32, pub active_leader_replicas: i32, - pub status: i32, + pub status: ClusterHealthStatus, } impl ClusterHealth { - pub fn from_pb(pb: &GetClusterHealthResponse) -> Self { - Self { + pub fn from_pb(pb: &GetClusterHealthResponse) -> Result { + Ok(Self { num_replicas: pb.num_replicas, in_sync_replicas: pb.in_sync_replicas, num_leader_replicas: pb.num_leader_replicas, active_leader_replicas: pb.active_leader_replicas, - status: pb.status, - } + status: ClusterHealthStatus::try_from_i32(pb.status)?, + }) } } @@ -43,6 +76,23 @@ impl ClusterHealth { mod tests { use super::*; + #[test] + fn test_cluster_health_status_roundtrip() { + for s in [ + ClusterHealthStatus::Green, + ClusterHealthStatus::Yellow, + ClusterHealthStatus::Red, + ClusterHealthStatus::Unknown, + ] { + assert_eq!(ClusterHealthStatus::try_from_i32(s.to_i32()).unwrap(), s); + } + } + + #[test] + fn test_cluster_health_status_unknown_value() { + assert!(ClusterHealthStatus::try_from_i32(99).is_err()); + } + #[test] fn test_cluster_health_from_pb() { let pb = GetClusterHealthResponse { @@ -52,8 +102,8 @@ mod tests { active_leader_replicas: 3, status: 1, }; - let h = ClusterHealth::from_pb(&pb); + let h = ClusterHealth::from_pb(&pb).unwrap(); assert_eq!(h.num_replicas, 5); - assert_eq!(h.status, 1); + assert_eq!(h.status, ClusterHealthStatus::Yellow); } } diff --git a/crates/fluss/src/metadata/kv_snapshot.rs b/crates/fluss/src/metadata/kv_snapshot.rs index f58149ba..054254ad 100644 --- a/crates/fluss/src/metadata/kv_snapshot.rs +++ b/crates/fluss/src/metadata/kv_snapshot.rs @@ -19,13 +19,14 @@ use crate::proto::{ AcquireKvSnapshotLeaseResponse, GetKvSnapshotMetadataResponse, GetLatestKvSnapshotsResponse, ListKvSnapshotsResponse, PbKvSnapshot, PbRemotePathAndLocalFile, }; +use crate::{BucketId, PartitionId, TableId}; use crate::metadata::KvSnapshotLeaseForTable; /// Per-bucket KV snapshot info. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshot { - pub bucket_id: i32, + pub bucket_id: BucketId, pub snapshot_id: Option, pub log_offset: Option, } @@ -43,8 +44,8 @@ impl KvSnapshot { /// Result of `get_latest_kv_snapshots`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct LatestKvSnapshots { - pub table_id: i64, - pub partition_id: Option, + pub table_id: TableId, + pub partition_id: Option, pub latest_snapshots: Vec, } @@ -121,8 +122,8 @@ impl AcquireKvSnapshotLeaseResult { /// Result of `list_kv_snapshots`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ActiveKvSnapshots { - pub table_id: i64, - pub partition_id: Option, + pub table_id: TableId, + pub partition_id: Option, pub active_snapshots: Vec, } diff --git a/crates/fluss/src/metadata/lake_snapshot.rs b/crates/fluss/src/metadata/lake_snapshot.rs index b88d53f9..5ff50ad6 100644 --- a/crates/fluss/src/metadata/lake_snapshot.rs +++ b/crates/fluss/src/metadata/lake_snapshot.rs @@ -16,12 +16,13 @@ // under the License. use crate::proto::{GetLakeSnapshotResponse, PbLakeSnapshotForBucket}; +use crate::{BucketId, PartitionId, TableId}; /// One bucket's slice of a lake snapshot. #[derive(Debug, Clone, PartialEq, Eq)] pub struct LakeBucketSnapshot { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub log_offset: Option, pub partition_name: Option, } @@ -42,7 +43,7 @@ impl LakeBucketSnapshot { /// "latest" snapshot summary returned by `get_latest_lake_snapshot`.) #[derive(Debug, Clone, PartialEq, Eq)] pub struct LakeSnapshotInfo { - pub table_id: i64, + pub table_id: TableId, pub snapshot_id: i64, pub bucket_snapshots: Vec, } diff --git a/crates/fluss/src/metadata/producer_offsets.rs b/crates/fluss/src/metadata/producer_offsets.rs index 501d5a46..f367c390 100644 --- a/crates/fluss/src/metadata/producer_offsets.rs +++ b/crates/fluss/src/metadata/producer_offsets.rs @@ -16,12 +16,13 @@ // under the License. use crate::proto::{GetProducerOffsetsResponse, PbBucketOffset, PbProducerTableOffsets}; +use crate::{BucketId, PartitionId, TableId}; /// Per-bucket producer log-end offset. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketOffset { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub log_end_offset: Option, } @@ -46,7 +47,7 @@ impl BucketOffset { /// All bucket offsets of a single table belonging to one producer. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProducerTableOffsets { - pub table_id: i64, + pub table_id: TableId, pub bucket_offsets: Vec, } diff --git a/crates/fluss/src/metadata/rebalance.rs b/crates/fluss/src/metadata/rebalance.rs index dfb6d75d..13826f9e 100644 --- a/crates/fluss/src/metadata/rebalance.rs +++ b/crates/fluss/src/metadata/rebalance.rs @@ -15,17 +15,57 @@ // specific language governing permissions and limitations // under the License. +use crate::error::{Error, Result}; use crate::proto::{ ListRebalanceProgressResponse, PbRebalancePlanForBucket, PbRebalanceProgressForBucket, PbRebalanceProgressForTable, }; +use crate::{BucketId, PartitionId, TableId}; + +/// Mirrors Java `org.apache.fluss.cluster.rebalance.RebalanceStatus`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RebalanceStatus { + NotStarted, + Rebalancing, + Failed, + Completed, + Canceled, + Timeout, +} + +impl RebalanceStatus { + pub fn to_i32(self) -> i32 { + match self { + Self::NotStarted => 0, + Self::Rebalancing => 1, + Self::Failed => 2, + Self::Completed => 3, + Self::Canceled => 4, + Self::Timeout => 5, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::NotStarted), + 1 => Ok(Self::Rebalancing), + 2 => Ok(Self::Failed), + 3 => Ok(Self::Completed), + 4 => Ok(Self::Canceled), + 5 => Ok(Self::Timeout), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported RebalanceStatus: {value}"), + }), + } + } +} /// Per-bucket plan in a rebalance: who the leader was and who it will be, who /// the replicas were and who they will be. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketRebalancePlan { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub original_leader: Option, pub new_leader: Option, pub original_replicas: Vec, @@ -49,35 +89,35 @@ impl BucketRebalancePlan { #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketRebalanceProgress { pub rebalance_plan: BucketRebalancePlan, - pub rebalance_status: i32, + pub rebalance_status: RebalanceStatus, } impl BucketRebalanceProgress { - pub fn from_pb(pb: &PbRebalanceProgressForBucket) -> Self { - Self { + pub fn from_pb(pb: &PbRebalanceProgressForBucket) -> Result { + Ok(Self { rebalance_plan: BucketRebalancePlan::from_pb(&pb.rebalance_plan), - rebalance_status: pb.rebalance_status, - } + rebalance_status: RebalanceStatus::try_from_i32(pb.rebalance_status)?, + }) } } /// All bucket progress for one table. #[derive(Debug, Clone, PartialEq, Eq)] pub struct TableRebalanceProgress { - pub table_id: i64, + pub table_id: TableId, pub buckets_progress: Vec, } impl TableRebalanceProgress { - pub fn from_pb(pb: &PbRebalanceProgressForTable) -> Self { - Self { + pub fn from_pb(pb: &PbRebalanceProgressForTable) -> Result { + Ok(Self { table_id: pb.table_id, buckets_progress: pb .buckets_progress .iter() .map(BucketRebalanceProgress::from_pb) - .collect(), - } + .collect::>>()?, + }) } } @@ -85,21 +125,24 @@ impl TableRebalanceProgress { #[derive(Debug, Clone, PartialEq, Eq)] pub struct RebalanceProgress { pub rebalance_id: Option, - pub rebalance_status: Option, + pub rebalance_status: Option, pub table_progress: Vec, } impl RebalanceProgress { - pub fn from_pb(pb: &ListRebalanceProgressResponse) -> Self { - Self { + pub fn from_pb(pb: &ListRebalanceProgressResponse) -> Result { + Ok(Self { rebalance_id: pb.rebalance_id.clone(), - rebalance_status: pb.rebalance_status, + rebalance_status: pb + .rebalance_status + .map(RebalanceStatus::try_from_i32) + .transpose()?, table_progress: pb .table_progress .iter() .map(TableRebalanceProgress::from_pb) - .collect(), - } + .collect::>>()?, + }) } } @@ -107,6 +150,25 @@ impl RebalanceProgress { mod tests { use super::*; + #[test] + fn test_rebalance_status_roundtrip() { + for s in [ + RebalanceStatus::NotStarted, + RebalanceStatus::Rebalancing, + RebalanceStatus::Failed, + RebalanceStatus::Completed, + RebalanceStatus::Canceled, + RebalanceStatus::Timeout, + ] { + assert_eq!(RebalanceStatus::try_from_i32(s.to_i32()).unwrap(), s); + } + } + + #[test] + fn test_rebalance_status_unknown() { + assert!(RebalanceStatus::try_from_i32(99).is_err()); + } + #[test] fn test_bucket_rebalance_plan_from_pb() { let pb = PbRebalancePlanForBucket { From dcb50120522ddd81e14757586e0cbc49bf50a20e Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 03:08:55 +0800 Subject: [PATCH 3/6] Apply type aliases to remaining metadata types (table_stats, kv_snapshot_lease) Extends the BucketId/TableId/PartitionId alias consistency fix to table_stats.rs (BucketStatsRequest, BucketStats) and kv_snapshot_lease.rs (KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable). Co-Authored-By: Claude Opus 4.6 --- crates/fluss/src/metadata/kv_snapshot_lease.rs | 7 ++++--- crates/fluss/src/metadata/table_stats.rs | 11 ++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/fluss/src/metadata/kv_snapshot_lease.rs b/crates/fluss/src/metadata/kv_snapshot_lease.rs index 22679136..98638f5d 100644 --- a/crates/fluss/src/metadata/kv_snapshot_lease.rs +++ b/crates/fluss/src/metadata/kv_snapshot_lease.rs @@ -16,12 +16,13 @@ // under the License. use crate::proto::{PbKvSnapshotLeaseForBucket, PbKvSnapshotLeaseForTable}; +use crate::{BucketId, PartitionId, TableId}; /// One bucket's slot in a KV-snapshot lease request. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshotLeaseForBucket { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub snapshot_id: i64, } @@ -46,7 +47,7 @@ impl KvSnapshotLeaseForBucket { /// All the buckets of a single table that should be leased together. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshotLeaseForTable { - pub table_id: i64, + pub table_id: TableId, pub bucket_snapshots: Vec, } diff --git a/crates/fluss/src/metadata/table_stats.rs b/crates/fluss/src/metadata/table_stats.rs index a649844e..53c6f72f 100644 --- a/crates/fluss/src/metadata/table_stats.rs +++ b/crates/fluss/src/metadata/table_stats.rs @@ -16,17 +16,18 @@ // under the License. use crate::proto::{GetTableStatsResponse, PbTableStatsReqForBucket, PbTableStatsRespForBucket}; +use crate::{BucketId, PartitionId}; /// Per-bucket request item for `GetTableStats`. /// Mirrors the bucket-stats request shape used by the Java client. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketStatsRequest { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, } impl BucketStatsRequest { - pub fn new(partition_id: Option, bucket_id: i32) -> Self { + pub fn new(partition_id: Option, bucket_id: BucketId) -> Self { Self { partition_id, bucket_id, @@ -52,8 +53,8 @@ impl BucketStatsRequest { /// when the server returned an error for the bucket; check `error` in that case. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketStats { - pub bucket_id: i32, - pub partition_id: Option, + pub bucket_id: BucketId, + pub partition_id: Option, pub row_count: Option, pub error: Option, } From 4a9cdf6771ba1880fb5f8511402757adf36e620f Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 03:11:46 +0800 Subject: [PATCH 4/6] Update admin.rs to use GoalType/ServerTag enums after rebase Now that pr/3 provides GoalType and ServerTag enums in the RPC wrappers, update the admin client methods to use them in their public signatures too. Co-Authored-By: Claude Opus 4.6 --- crates/fluss/src/client/admin.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index f815ef7f..e1c618e7 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -20,11 +20,11 @@ use crate::cluster::ServerNode; use crate::metadata::{ AclFilter, AclInfo, AcquireKvSnapshotLeaseResult, ActiveKvSnapshots, AlterConfig, AlterTableChanges, ClusterHealth, CreateAclResult, DatabaseDescriptor, DatabaseInfo, - DatabaseSummary, DescribeConfig, DropAclsFilterResult, JsonSerde, KvSnapshotLeaseForTable, - KvSnapshotMetadata, LakeSnapshot, LakeSnapshotInfo, LatestKvSnapshots, PartitionInfo, - PartitionSpec, PhysicalTablePath, ProducerOffsets, ProducerTableOffsets, RebalanceProgress, - RemoteLogManifestEntry, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, - TableStats, + DatabaseSummary, DescribeConfig, DropAclsFilterResult, GoalType, JsonSerde, + KvSnapshotLeaseForTable, KvSnapshotMetadata, LakeSnapshot, LakeSnapshotInfo, LatestKvSnapshots, + PartitionInfo, PartitionSpec, PhysicalTablePath, ProducerOffsets, ProducerTableOffsets, + RebalanceProgress, RemoteLogManifestEntry, Schema, SchemaInfo, ServerTag, TableBucket, + TableDescriptor, TableInfo, TablePath, TableStats, }; use crate::rpc::message::{ AcquireKvSnapshotLeaseRequest, AddServerTagRequest, AlterClusterConfigsRequest, @@ -709,7 +709,7 @@ impl FlussAdmin { } /// Add a tag to servers. - pub async fn add_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + pub async fn add_server_tag(&self, server_ids: Vec, server_tag: ServerTag) -> Result<()> { let _response = self .admin_gateway() .await? @@ -719,7 +719,11 @@ impl FlussAdmin { } /// Remove a tag from servers. - pub async fn remove_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + pub async fn remove_server_tag( + &self, + server_ids: Vec, + server_tag: ServerTag, + ) -> Result<()> { let _response = self .admin_gateway() .await? @@ -729,7 +733,7 @@ impl FlussAdmin { } /// Trigger a rebalance. Returns the rebalance id assigned by the server. - pub async fn rebalance(&self, goals: Vec) -> Result { + pub async fn rebalance(&self, goals: Vec) -> Result { let response = self .admin_gateway() .await? From 6c52a1f9a6ac775347767c86ff1e3a1e9da148e0 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Wed, 24 Jun 2026 09:48:43 +0800 Subject: [PATCH 5/6] Apply TableId/PartitionId/BucketId type aliases to admin methods Update the new admin method signatures introduced by this PR to use the i64/i32 type aliases from lib.rs instead of raw primitives, matching the underlying RPC message wrappers. Co-Authored-By: Claude Opus 4.7 --- crates/fluss/src/client/admin.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index e1c618e7..d91cc66c 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -557,7 +557,7 @@ impl FlussAdmin { /// Get table statistics for buckets. Pass empty `target_columns` to request stats for all columns. pub async fn get_table_stats( &self, - table_id: i64, + table_id: TableId, buckets_req: Vec, target_columns: Vec, ) -> Result { @@ -590,9 +590,9 @@ impl FlussAdmin { /// Get KV snapshot metadata (manifest file list). pub async fn get_kv_snapshot_metadata( &self, - table_id: i64, - partition_id: Option, - bucket_id: i32, + table_id: TableId, + partition_id: Option, + bucket_id: BucketId, snapshot_id: i64, ) -> Result { let response = self @@ -817,8 +817,8 @@ impl FlussAdmin { /// List remote log manifests for a table (optionally scoped to one partition). pub async fn list_remote_log_manifests( &self, - table_id: i64, - partition_id: Option, + table_id: TableId, + partition_id: Option, ) -> Result> { let response = self .admin_gateway() @@ -835,8 +835,8 @@ impl FlussAdmin { /// List active KV snapshots for a table (optionally scoped to one partition). pub async fn list_kv_snapshots( &self, - table_id: i64, - partition_id: Option, + table_id: TableId, + partition_id: Option, ) -> Result { let response = self .admin_gateway() From 828f9febf2dcadffca42d27043bf38993c96cd51 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Fri, 26 Jun 2026 09:28:02 +0800 Subject: [PATCH 6/6] Address reviewer feedback round 2 - Add `RegisterProducerResult` enum mirroring Java's `RegisterResult` (Created=0, AlreadyExists=1); `register_producer_offsets` now returns `Option` instead of raw `Option`. - Expose `comment: Option<&str>` on `alter_database` and the `AlterDatabaseRequest` wrapper (was hardcoded to `None`). - Replace fully-qualified paths with imports: `BucketStatsRequest` in admin.rs, `PbDatabaseSummary` in metadata/database.rs, and `AlterConfig` self-reference in metadata/table_change.rs. - Rename `create_acls(acl)` to `create_acls(acls)` and `drop_acls(acl_filter)` to `drop_acls(acl_filters)` for plural consistency with the `Vec` argument type. Co-Authored-By: Claude Opus 4.7 --- crates/fluss/src/client/admin.rs | 34 ++++++---- crates/fluss/src/metadata/database.rs | 3 +- crates/fluss/src/metadata/mod.rs | 2 + .../src/metadata/register_producer_result.rs | 66 +++++++++++++++++++ crates/fluss/src/metadata/table_change.rs | 3 +- .../fluss/src/rpc/message/alter_database.rs | 3 +- 6 files changed, 95 insertions(+), 16 deletions(-) create mode 100644 crates/fluss/src/metadata/register_producer_result.rs diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index d91cc66c..7015e573 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -19,12 +19,12 @@ use crate::client::metadata::Metadata; use crate::cluster::ServerNode; use crate::metadata::{ AclFilter, AclInfo, AcquireKvSnapshotLeaseResult, ActiveKvSnapshots, AlterConfig, - AlterTableChanges, ClusterHealth, CreateAclResult, DatabaseDescriptor, DatabaseInfo, - DatabaseSummary, DescribeConfig, DropAclsFilterResult, GoalType, JsonSerde, + AlterTableChanges, BucketStatsRequest, ClusterHealth, CreateAclResult, DatabaseDescriptor, + DatabaseInfo, DatabaseSummary, DescribeConfig, DropAclsFilterResult, GoalType, JsonSerde, KvSnapshotLeaseForTable, KvSnapshotMetadata, LakeSnapshot, LakeSnapshotInfo, LatestKvSnapshots, PartitionInfo, PartitionSpec, PhysicalTablePath, ProducerOffsets, ProducerTableOffsets, - RebalanceProgress, RemoteLogManifestEntry, Schema, SchemaInfo, ServerTag, TableBucket, - TableDescriptor, TableInfo, TablePath, TableStats, + RebalanceProgress, RegisterProducerResult, RemoteLogManifestEntry, Schema, SchemaInfo, + ServerTag, TableBucket, TableDescriptor, TableInfo, TablePath, TableStats, }; use crate::rpc::message::{ AcquireKvSnapshotLeaseRequest, AddServerTagRequest, AlterClusterConfigsRequest, @@ -511,11 +511,12 @@ impl FlussAdmin { .collect()) } - /// Alter a database's configuration. + /// Alter a database: config changes and/or an updated comment. pub async fn alter_database( &self, name: &str, config_changes: Vec, + comment: Option<&str>, ignore_if_not_exists: bool, ) -> Result<()> { let _response = self @@ -525,6 +526,7 @@ impl FlussAdmin { name, ignore_if_not_exists, config_changes, + comment, )) .await?; Ok(()) @@ -558,7 +560,7 @@ impl FlussAdmin { pub async fn get_table_stats( &self, table_id: TableId, - buckets_req: Vec, + buckets_req: Vec, target_columns: Vec, ) -> Result { let response = self @@ -647,11 +649,11 @@ impl FlussAdmin { } /// Create ACLs. Returns one result per submitted ACL (success or per-ACL error). - pub async fn create_acls(&self, acl: Vec) -> Result> { + pub async fn create_acls(&self, acls: Vec) -> Result> { let response = self .admin_gateway() .await? - .request(CreateAclsRequest::new(acl)) + .request(CreateAclsRequest::new(acls)) .await?; response .acl_res @@ -671,11 +673,14 @@ impl FlussAdmin { } /// Drop ACLs matching filters. Returns one result per submitted filter. - pub async fn drop_acls(&self, acl_filter: Vec) -> Result> { + pub async fn drop_acls( + &self, + acl_filters: Vec, + ) -> Result> { let response = self .admin_gateway() .await? - .request(DropAclsRequest::new(acl_filter)) + .request(DropAclsRequest::new(acl_filters)) .await?; response .filter_results @@ -765,13 +770,13 @@ impl FlussAdmin { Ok(()) } - /// Register producer offsets. Returns the server-side result code (if any). + /// Register producer offsets. Returns the server-side registration outcome (if any). pub async fn register_producer_offsets( &self, producer_id: &str, table_offsets: Vec, ttl_ms: Option, - ) -> Result> { + ) -> Result> { let response = self .admin_gateway() .await? @@ -781,7 +786,10 @@ impl FlussAdmin { ttl_ms, )) .await?; - Ok(response.result) + response + .result + .map(RegisterProducerResult::try_from_i32) + .transpose() } /// Get producer offsets. diff --git a/crates/fluss/src/metadata/database.rs b/crates/fluss/src/metadata/database.rs index 5403e9a3..76fe5114 100644 --- a/crates/fluss/src/metadata/database.rs +++ b/crates/fluss/src/metadata/database.rs @@ -18,6 +18,7 @@ use crate::error::Error::JsonSerdeError; use crate::error::Result; use crate::metadata::JsonSerde; +use crate::proto::PbDatabaseSummary; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::collections::HashMap; @@ -214,7 +215,7 @@ pub struct DatabaseSummary { } impl DatabaseSummary { - pub fn from_pb(pb: &crate::proto::PbDatabaseSummary) -> Self { + pub fn from_pb(pb: &PbDatabaseSummary) -> Self { Self { database_name: pb.database_name.clone(), created_time: pb.created_time, diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 4eaa5c5b..d992273c 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -29,6 +29,7 @@ mod lake_snapshot; mod partition; mod producer_offsets; mod rebalance; +mod register_producer_result; mod remote_log; mod schema_util; mod server_tag; @@ -50,6 +51,7 @@ pub use lake_snapshot::*; pub use partition::*; pub use producer_offsets::*; pub use rebalance::*; +pub use register_producer_result::*; pub use remote_log::*; pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; pub use server_tag::*; diff --git a/crates/fluss/src/metadata/register_producer_result.rs b/crates/fluss/src/metadata/register_producer_result.rs new file mode 100644 index 00000000..1c0c8e50 --- /dev/null +++ b/crates/fluss/src/metadata/register_producer_result.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; + +/// Mirrors Java `org.apache.fluss.client.admin.RegisterResult`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RegisterProducerResult { + /// Snapshot was newly created (first-startup scenario; no undo recovery needed). + Created, + /// Snapshot already existed (failover scenario; caller should perform undo recovery). + AlreadyExists, +} + +impl RegisterProducerResult { + pub fn to_i32(self) -> i32 { + match self { + Self::Created => 0, + Self::AlreadyExists => 1, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Created), + 1 => Ok(Self::AlreadyExists), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported RegisterProducerResult: {value}"), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_register_producer_result_roundtrip() { + for r in [ + RegisterProducerResult::Created, + RegisterProducerResult::AlreadyExists, + ] { + assert_eq!(RegisterProducerResult::try_from_i32(r.to_i32()).unwrap(), r); + } + } + + #[test] + fn test_register_producer_result_unknown() { + assert!(RegisterProducerResult::try_from_i32(99).is_err()); + } +} diff --git a/crates/fluss/src/metadata/table_change.rs b/crates/fluss/src/metadata/table_change.rs index f95aebd9..3e5866a2 100644 --- a/crates/fluss/src/metadata/table_change.rs +++ b/crates/fluss/src/metadata/table_change.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use super::AlterConfig; use crate::error::{Error, Result}; use crate::proto::{PbAddColumn, PbDropColumn, PbModifyColumn, PbRenameColumn}; @@ -126,7 +127,7 @@ impl RenameColumn { /// config changes. #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct AlterTableChanges { - pub config_changes: Vec, + pub config_changes: Vec, pub add_columns: Vec, pub drop_columns: Vec, pub rename_columns: Vec, diff --git a/crates/fluss/src/rpc/message/alter_database.rs b/crates/fluss/src/rpc/message/alter_database.rs index 3eb08163..2135e663 100644 --- a/crates/fluss/src/rpc/message/alter_database.rs +++ b/crates/fluss/src/rpc/message/alter_database.rs @@ -33,13 +33,14 @@ impl AlterDatabaseRequest { database_name: &str, ignore_if_not_exists: bool, config_changes: Vec, + comment: Option<&str>, ) -> Self { AlterDatabaseRequest { inner_request: proto::AlterDatabaseRequest { database_name: database_name.to_string(), ignore_if_not_exists, config_changes: config_changes.iter().map(AlterConfig::to_pb).collect(), - comment: None, + comment: comment.map(str::to_string), }, } }