From 2e6162109187e5e2fa461b7b2b122cce3d8babd2 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Fri, 19 Jun 2026 09:00:37 +0800 Subject: [PATCH 1/9] [rust] Add Fluss 1.x protocol support to the admin client Add 27 new admin methods to FlussAdmin: - Database/table extensions: list_database_summaries, alter_database, alter_table, get_table_stats - KV snapshot operations: get_latest_kv_snapshots, get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot - ACL management: create_acls, list_acls, drop_acls - Cluster configuration: describe_cluster_configs, alter_cluster_configs - Server management: add_server_tag, remove_server_tag, rebalance, list_rebalance_progress, cancel_rebalance - Producer offsets: register_producer_offsets, get_producer_offsets, delete_producer_offsets - Monitoring: get_cluster_health, list_remote_log_manifests - KV snapshots: list_kv_snapshots, release_kv_snapshot_lease, drop_kv_snapshot_lease --- crates/fluss/src/client/admin.rs | 390 +++++++++++++++++- crates/fluss/src/metadata/acl.rs | 74 +++- crates/fluss/src/metadata/cluster_health.rs | 59 +++ crates/fluss/src/metadata/config.rs | 29 +- crates/fluss/src/metadata/database.rs | 18 + crates/fluss/src/metadata/kv_snapshot.rs | 170 ++++++++ crates/fluss/src/metadata/lake_snapshot.rs | 82 ++++ crates/fluss/src/metadata/mod.rs | 10 + crates/fluss/src/metadata/producer_offsets.rs | 24 +- crates/fluss/src/metadata/rebalance.rs | 125 ++++++ crates/fluss/src/metadata/remote_log.rs | 59 +++ crates/fluss/src/metadata/table_change.rs | 12 + crates/fluss/src/metadata/table_stats.rs | 76 +++- 13 files changed, 1119 insertions(+), 9 deletions(-) create mode 100644 crates/fluss/src/metadata/cluster_health.rs create mode 100644 crates/fluss/src/metadata/kv_snapshot.rs create mode 100644 crates/fluss/src/metadata/lake_snapshot.rs create mode 100644 crates/fluss/src/metadata/rebalance.rs create mode 100644 crates/fluss/src/metadata/remote_log.rs diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index d3d5a5e3..e3d78d7e 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -18,14 +18,27 @@ use crate::client::metadata::Metadata; use crate::cluster::ServerNode; use crate::metadata::{ - DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec, - PhysicalTablePath, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, + AclFilter, AclInfo, AcquireKvSnapshotLeaseResult, ActiveKvSnapshots, AlterConfig, + AlterTableChanges, ClusterHealth, CreateAclResult, DatabaseDescriptor, DatabaseInfo, + DatabaseSummary, DescribeConfig, DropAclsFilterResult, JsonSerde, KvSnapshotLeaseForTable, + KvSnapshotMetadata, LakeSnapshot, LakeSnapshotInfo, LatestKvSnapshots, PartitionInfo, + PartitionSpec, PhysicalTablePath, ProducerOffsets, ProducerTableOffsets, RebalanceProgress, + RemoteLogManifestEntry, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, + TableStats, }; use crate::rpc::message::{ + AcquireKvSnapshotLeaseRequest, AddServerTagRequest, AlterClusterConfigsRequest, + AlterDatabaseRequest, AlterTableRequest, CancelRebalanceRequest, CreateAclsRequest, CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest, - DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest, - GetLatestLakeSnapshotRequest, GetTableRequest, GetTableSchemaRequestMsg, ListDatabasesRequest, - ListPartitionInfosRequest, ListTablesRequest, TableExistsRequest, + DeleteProducerOffsetsRequest, DescribeClusterConfigsRequest, DropAclsRequest, + DropDatabaseRequest, DropKvSnapshotLeaseRequest, DropPartitionRequest, DropTableRequest, + GetClusterHealthRequest, GetDatabaseInfoRequest, GetKvSnapshotMetadataRequest, + GetLakeSnapshotRequest, GetLatestKvSnapshotsRequest, GetLatestLakeSnapshotRequest, + GetProducerOffsetsRequest, GetTableRequest, GetTableSchemaRequestMsg, GetTableStatsRequest, + ListAclsRequest, ListDatabaseSummariesRequest, ListDatabasesRequest, ListKvSnapshotsRequest, + ListPartitionInfosRequest, ListRebalanceProgressRequest, ListRemoteLogManifestsRequest, + ListTablesRequest, RebalanceRequest, RegisterProducerOffsetsRequest, + ReleaseKvSnapshotLeaseRequest, RemoveServerTagRequest, TableExistsRequest, }; use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; use crate::rpc::{RpcClient, ServerConnection}; @@ -483,4 +496,371 @@ impl FlussAdmin { } Ok(tasks) } + + /// List database summaries (name, created_time, table_count). + pub async fn list_database_summaries(&self) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListDatabaseSummariesRequest::new()) + .await?; + Ok(response + .database_summary + .iter() + .map(DatabaseSummary::from_pb) + .collect()) + } + + /// Alter a database's configuration. + pub async fn alter_database( + &self, + name: &str, + config_changes: Vec, + ignore_if_not_exists: bool, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterDatabaseRequest::new( + name, + ignore_if_not_exists, + config_changes, + )) + .await?; + Ok(()) + } + + /// Alter a table: config changes plus any combination of add/drop/rename/modify columns. + /// Bundle the column-level edits in [`AlterTableChanges`]. + pub async fn alter_table( + &self, + table_path: &TablePath, + ignore_if_not_exists: bool, + changes: AlterTableChanges, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterTableRequest::new( + table_path, + ignore_if_not_exists, + changes.config_changes, + changes.add_columns, + changes.drop_columns, + changes.rename_columns, + changes.modify_columns, + )) + .await?; + Ok(()) + } + + /// Get table statistics for buckets. Pass empty `target_columns` to request stats for all columns. + pub async fn get_table_stats( + &self, + table_id: i64, + buckets_req: Vec, + target_columns: Vec, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetTableStatsRequest::new( + table_id, + buckets_req, + target_columns, + )) + .await?; + Ok(TableStats::from_pb(&response)) + } + + /// Get the latest KV snapshots for a table (optionally scoped to one partition). + pub async fn get_latest_kv_snapshots( + &self, + table_path: &TablePath, + partition_name: Option<&str>, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetLatestKvSnapshotsRequest::new(table_path, partition_name)) + .await?; + Ok(LatestKvSnapshots::from_pb(&response)) + } + + /// Get KV snapshot metadata (manifest file list). + pub async fn get_kv_snapshot_metadata( + &self, + table_id: i64, + partition_id: Option, + bucket_id: i32, + snapshot_id: i64, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetKvSnapshotMetadataRequest::new( + table_id, + partition_id, + bucket_id, + snapshot_id, + )) + .await?; + Ok(KvSnapshotMetadata::from_pb(&response)) + } + + /// Acquire a KV snapshot lease. Returns the snapshots the server could not lease. + pub async fn create_kv_snapshot_lease( + &self, + lease_id: &str, + lease_duration_ms: i64, + snapshots_to_lease: Vec, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(AcquireKvSnapshotLeaseRequest::new( + lease_id, + lease_duration_ms, + snapshots_to_lease, + )) + .await?; + Ok(AcquireKvSnapshotLeaseResult::from_pb(&response)) + } + + /// Get a specific lake snapshot for a table. + pub async fn get_lake_snapshot( + &self, + table_path: &TablePath, + snapshot_id: Option, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetLakeSnapshotRequest::new(table_path, snapshot_id, None)) + .await?; + Ok(LakeSnapshotInfo::from_pb(&response)) + } + + /// Create ACLs. Returns one result per submitted ACL (success or per-ACL error). + pub async fn create_acls(&self, acl: Vec) -> Result> { + let response = self + .admin_gateway() + .await? + .request(CreateAclsRequest::new(acl)) + .await?; + response + .acl_res + .iter() + .map(CreateAclResult::from_pb) + .collect() + } + + /// List ACLs matching a filter. + pub async fn list_acls(&self, acl_filter: AclFilter) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListAclsRequest::new(acl_filter)) + .await?; + response.acl.iter().map(AclInfo::from_pb).collect() + } + + /// Drop ACLs matching filters. Returns one result per submitted filter. + pub async fn drop_acls(&self, acl_filter: Vec) -> Result> { + let response = self + .admin_gateway() + .await? + .request(DropAclsRequest::new(acl_filter)) + .await?; + response + .filter_results + .iter() + .map(DropAclsFilterResult::from_pb) + .collect() + } + + /// Describe cluster configuration. + pub async fn describe_cluster_configs(&self) -> Result> { + let response = self + .admin_gateway() + .await? + .request(DescribeClusterConfigsRequest::new()) + .await?; + Ok(response + .configs + .iter() + .map(DescribeConfig::from_pb) + .collect()) + } + + /// Alter cluster configuration. + pub async fn alter_cluster_configs(&self, alter_configs: Vec) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterClusterConfigsRequest::new(alter_configs)) + .await?; + Ok(()) + } + + /// Add a tag to servers. + pub async fn add_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AddServerTagRequest::new(server_ids, server_tag)) + .await?; + Ok(()) + } + + /// Remove a tag from servers. + pub async fn remove_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(RemoveServerTagRequest::new(server_ids, server_tag)) + .await?; + Ok(()) + } + + /// Trigger a rebalance. Returns the rebalance id assigned by the server. + pub async fn rebalance(&self, goals: Vec) -> Result { + let response = self + .admin_gateway() + .await? + .request(RebalanceRequest::new(goals)) + .await?; + Ok(response.rebalance_id) + } + + /// List rebalance progress (for a specific rebalance id, or all in-flight ones if `None`). + pub async fn list_rebalance_progress( + &self, + rebalance_id: Option<&str>, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(ListRebalanceProgressRequest::new(rebalance_id)) + .await?; + Ok(RebalanceProgress::from_pb(&response)) + } + + /// Cancel a rebalance. + pub async fn cancel_rebalance(&self, rebalance_id: Option<&str>) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(CancelRebalanceRequest::new(rebalance_id)) + .await?; + Ok(()) + } + + /// Register producer offsets. Returns the server-side result code (if any). + pub async fn register_producer_offsets( + &self, + producer_id: &str, + table_offsets: Vec, + ttl_ms: Option, + ) -> Result> { + let response = self + .admin_gateway() + .await? + .request(RegisterProducerOffsetsRequest::new( + producer_id, + table_offsets, + ttl_ms, + )) + .await?; + Ok(response.result) + } + + /// Get producer offsets. + pub async fn get_producer_offsets(&self, producer_id: &str) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetProducerOffsetsRequest::new(producer_id)) + .await?; + Ok(ProducerOffsets::from_pb(&response)) + } + + /// Delete producer offsets. + pub async fn delete_producer_offsets(&self, producer_id: &str) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(DeleteProducerOffsetsRequest::new(producer_id)) + .await?; + Ok(()) + } + + /// Get cluster health status. + pub async fn get_cluster_health(&self) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetClusterHealthRequest::new()) + .await?; + Ok(ClusterHealth::from_pb(&response)) + } + + /// List remote log manifests for a table (optionally scoped to one partition). + pub async fn list_remote_log_manifests( + &self, + table_id: i64, + partition_id: Option, + ) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListRemoteLogManifestsRequest::new(table_id, partition_id)) + .await?; + Ok(response + .manifests + .iter() + .map(RemoteLogManifestEntry::from_pb) + .collect()) + } + + /// List active KV snapshots for a table (optionally scoped to one partition). + pub async fn list_kv_snapshots( + &self, + table_id: i64, + partition_id: Option, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(ListKvSnapshotsRequest::new(table_id, partition_id)) + .await?; + Ok(ActiveKvSnapshots::from_pb(&response)) + } + + /// Release specific bucket snapshots from a KV snapshot lease. + pub async fn release_kv_snapshot_lease( + &self, + lease_id: &str, + buckets_to_release: Vec, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(ReleaseKvSnapshotLeaseRequest::new( + lease_id, + buckets_to_release, + )) + .await?; + Ok(()) + } + + /// Drop an entire KV snapshot lease. + pub async fn drop_kv_snapshot_lease(&self, lease_id: &str) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(DropKvSnapshotLeaseRequest::new(lease_id)) + .await?; + Ok(()) + } } diff --git a/crates/fluss/src/metadata/acl.rs b/crates/fluss/src/metadata/acl.rs index 84ec97af..f69ec952 100644 --- a/crates/fluss/src/metadata/acl.rs +++ b/crates/fluss/src/metadata/acl.rs @@ -16,7 +16,9 @@ // under the License. use crate::error::{Error, Result}; -use crate::proto::{PbAclFilter, PbAclInfo}; +use crate::proto::{ + PbAclFilter, PbAclInfo, PbCreateAclRespInfo, PbDropAclsFilterResult, PbDropAclsMatchingAcl, +}; /// Mirrors Java `org.apache.fluss.security.acl.ResourceType`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -197,6 +199,76 @@ impl AclFilter { } } +/// One per ACL submitted to `create_acls`: success or a server-side error. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CreateAclResult { + pub acl: AclInfo, + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AclError { + pub code: i32, + pub message: Option, +} + +impl CreateAclResult { + pub fn from_pb(pb: &PbCreateAclRespInfo) -> Result { + Ok(Self { + acl: AclInfo::from_pb(&pb.acl)?, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + +/// One ACL matched by a filter in `drop_acls`. Reports the bound ACL and any +/// server-side error encountered while dropping it. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DropAclMatchingAcl { + pub acl: AclInfo, + pub error: Option, +} + +impl DropAclMatchingAcl { + pub fn from_pb(pb: &PbDropAclsMatchingAcl) -> Result { + Ok(Self { + acl: AclInfo::from_pb(&pb.acl)?, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + +/// One per filter submitted to `drop_acls`: the matching ACLs that were +/// targeted plus any filter-level error. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DropAclsFilterResult { + pub matching_acls: Vec, + pub error: Option, +} + +impl DropAclsFilterResult { + pub fn from_pb(pb: &PbDropAclsFilterResult) -> Result { + let matching_acls = pb + .matching_acls + .iter() + .map(DropAclMatchingAcl::from_pb) + .collect::>>()?; + Ok(Self { + matching_acls, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/cluster_health.rs b/crates/fluss/src/metadata/cluster_health.rs new file mode 100644 index 00000000..c984ab05 --- /dev/null +++ b/crates/fluss/src/metadata/cluster_health.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::GetClusterHealthResponse; + +/// Result of `get_cluster_health`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ClusterHealth { + pub num_replicas: i32, + pub in_sync_replicas: i32, + pub num_leader_replicas: i32, + pub active_leader_replicas: i32, + pub status: i32, +} + +impl ClusterHealth { + pub fn from_pb(pb: &GetClusterHealthResponse) -> Self { + Self { + num_replicas: pb.num_replicas, + in_sync_replicas: pb.in_sync_replicas, + num_leader_replicas: pb.num_leader_replicas, + active_leader_replicas: pb.active_leader_replicas, + status: pb.status, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cluster_health_from_pb() { + let pb = GetClusterHealthResponse { + num_replicas: 5, + in_sync_replicas: 4, + num_leader_replicas: 3, + active_leader_replicas: 3, + status: 1, + }; + let h = ClusterHealth::from_pb(&pb); + assert_eq!(h.num_replicas, 5); + assert_eq!(h.status, 1); + } +} diff --git a/crates/fluss/src/metadata/config.rs b/crates/fluss/src/metadata/config.rs index 4e045c45..4dd7d8bc 100644 --- a/crates/fluss/src/metadata/config.rs +++ b/crates/fluss/src/metadata/config.rs @@ -16,7 +16,7 @@ // under the License. use crate::error::{Error, Result}; -use crate::proto::PbAlterConfig; +use crate::proto::{PbAlterConfig, PbDescribeConfig}; /// Mirrors Java `org.apache.fluss.config.cluster.AlterConfigOpType`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -88,6 +88,33 @@ impl AlterConfig { } } +/// One entry in the response of `describe_cluster_configs`. Mirrors Java's +/// `org.apache.fluss.config.cluster.DescribeConfig`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DescribeConfig { + pub config_key: String, + pub config_value: Option, + pub config_source: String, +} + +impl DescribeConfig { + pub fn from_pb(pb: &PbDescribeConfig) -> Self { + Self { + config_key: pb.config_key.clone(), + config_value: pb.config_value.clone(), + config_source: pb.config_source.clone(), + } + } + + pub fn to_pb(&self) -> PbDescribeConfig { + PbDescribeConfig { + config_key: self.config_key.clone(), + config_value: self.config_value.clone(), + config_source: self.config_source.clone(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/database.rs b/crates/fluss/src/metadata/database.rs index 15fefb54..5403e9a3 100644 --- a/crates/fluss/src/metadata/database.rs +++ b/crates/fluss/src/metadata/database.rs @@ -205,6 +205,24 @@ impl DatabaseDescriptor { } } +/// Lightweight summary of a database returned by `list_database_summaries`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DatabaseSummary { + pub database_name: String, + pub created_time: i64, + pub table_count: i32, +} + +impl DatabaseSummary { + pub fn from_pb(pb: &crate::proto::PbDatabaseSummary) -> Self { + Self { + database_name: pb.database_name.clone(), + created_time: pb.created_time, + table_count: pb.table_count, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/kv_snapshot.rs b/crates/fluss/src/metadata/kv_snapshot.rs new file mode 100644 index 00000000..f58149ba --- /dev/null +++ b/crates/fluss/src/metadata/kv_snapshot.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{ + AcquireKvSnapshotLeaseResponse, GetKvSnapshotMetadataResponse, GetLatestKvSnapshotsResponse, + ListKvSnapshotsResponse, PbKvSnapshot, PbRemotePathAndLocalFile, +}; + +use crate::metadata::KvSnapshotLeaseForTable; + +/// Per-bucket KV snapshot info. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshot { + pub bucket_id: i32, + pub snapshot_id: Option, + pub log_offset: Option, +} + +impl KvSnapshot { + pub fn from_pb(pb: &PbKvSnapshot) -> Self { + Self { + bucket_id: pb.bucket_id, + snapshot_id: pb.snapshot_id, + log_offset: pb.log_offset, + } + } +} + +/// Result of `get_latest_kv_snapshots`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LatestKvSnapshots { + pub table_id: i64, + pub partition_id: Option, + pub latest_snapshots: Vec, +} + +impl LatestKvSnapshots { + pub fn from_pb(pb: &GetLatestKvSnapshotsResponse) -> Self { + Self { + table_id: pb.table_id, + partition_id: pb.partition_id, + latest_snapshots: pb + .latest_snapshots + .iter() + .map(KvSnapshot::from_pb) + .collect(), + } + } +} + +/// One file in a KV snapshot manifest: its remote path and the local filename +/// the server expects clients to materialize it as. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemotePathAndLocalFile { + pub remote_path: String, + pub local_file_name: String, +} + +impl RemotePathAndLocalFile { + pub fn from_pb(pb: &PbRemotePathAndLocalFile) -> Self { + Self { + remote_path: pb.remote_path.clone(), + local_file_name: pb.local_file_name.clone(), + } + } +} + +/// Result of `get_kv_snapshot_metadata`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshotMetadata { + pub log_offset: i64, + pub snapshot_files: Vec, +} + +impl KvSnapshotMetadata { + pub fn from_pb(pb: &GetKvSnapshotMetadataResponse) -> Self { + Self { + log_offset: pb.log_offset, + snapshot_files: pb + .snapshot_files + .iter() + .map(RemotePathAndLocalFile::from_pb) + .collect(), + } + } +} + +/// Result of `acquire_kv_snapshot_lease` — any snapshots the server could not +/// lease (typically because they were evicted concurrently). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AcquireKvSnapshotLeaseResult { + pub unavailable_snapshots: Vec, +} + +impl AcquireKvSnapshotLeaseResult { + pub fn from_pb(pb: &AcquireKvSnapshotLeaseResponse) -> Self { + Self { + unavailable_snapshots: pb + .unavailable_snapshots + .iter() + .map(KvSnapshotLeaseForTable::from_pb) + .collect(), + } + } +} + +/// Result of `list_kv_snapshots`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ActiveKvSnapshots { + pub table_id: i64, + pub partition_id: Option, + pub active_snapshots: Vec, +} + +impl ActiveKvSnapshots { + pub fn from_pb(pb: &ListKvSnapshotsResponse) -> Self { + Self { + table_id: pb.table_id, + partition_id: pb.partition_id, + active_snapshots: pb + .active_snapshots + .iter() + .map(KvSnapshot::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kv_snapshot_from_pb() { + let pb = PbKvSnapshot { + bucket_id: 3, + snapshot_id: Some(7), + log_offset: Some(42), + }; + let s = KvSnapshot::from_pb(&pb); + assert_eq!(s.bucket_id, 3); + assert_eq!(s.snapshot_id, Some(7)); + assert_eq!(s.log_offset, Some(42)); + } + + #[test] + fn test_remote_path_and_local_file_from_pb() { + let pb = PbRemotePathAndLocalFile { + remote_path: "s3://bucket/snap/1.sst".to_string(), + local_file_name: "1.sst".to_string(), + }; + let f = RemotePathAndLocalFile::from_pb(&pb); + assert_eq!(f.remote_path, "s3://bucket/snap/1.sst"); + assert_eq!(f.local_file_name, "1.sst"); + } +} diff --git a/crates/fluss/src/metadata/lake_snapshot.rs b/crates/fluss/src/metadata/lake_snapshot.rs new file mode 100644 index 00000000..b88d53f9 --- /dev/null +++ b/crates/fluss/src/metadata/lake_snapshot.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{GetLakeSnapshotResponse, PbLakeSnapshotForBucket}; + +/// One bucket's slice of a lake snapshot. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LakeBucketSnapshot { + pub partition_id: Option, + pub bucket_id: i32, + pub log_offset: Option, + pub partition_name: Option, +} + +impl LakeBucketSnapshot { + pub fn from_pb(pb: &PbLakeSnapshotForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + log_offset: pb.log_offset, + partition_name: pb.partition_name.clone(), + } + } +} + +/// Result of `get_lake_snapshot` — a specific snapshot's bucket layout. +/// (Distinct from [`LakeSnapshot`](super::LakeSnapshot), which represents the +/// "latest" snapshot summary returned by `get_latest_lake_snapshot`.) +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LakeSnapshotInfo { + pub table_id: i64, + pub snapshot_id: i64, + pub bucket_snapshots: Vec, +} + +impl LakeSnapshotInfo { + pub fn from_pb(pb: &GetLakeSnapshotResponse) -> Self { + Self { + table_id: pb.table_id, + snapshot_id: pb.snapshot_id, + bucket_snapshots: pb + .bucket_snapshots + .iter() + .map(LakeBucketSnapshot::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lake_bucket_snapshot_from_pb() { + let pb = PbLakeSnapshotForBucket { + partition_id: Some(1), + bucket_id: 2, + log_offset: Some(3), + partition_name: Some("date=2024-01-01".to_string()), + }; + let s = LakeBucketSnapshot::from_pb(&pb); + assert_eq!(s.bucket_id, 2); + assert_eq!(s.partition_id, Some(1)); + assert_eq!(s.log_offset, Some(3)); + assert_eq!(s.partition_name.as_deref(), Some("date=2024-01-01")); + } +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 0249fefe..4eaa5c5b 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -16,15 +16,20 @@ // under the License. mod acl; +mod cluster_health; mod config; mod data_lake_format; mod database; mod datatype; mod goal_type; mod json_serde; +mod kv_snapshot; mod kv_snapshot_lease; +mod lake_snapshot; mod partition; mod producer_offsets; +mod rebalance; +mod remote_log; mod schema_util; mod server_tag; mod table; @@ -32,15 +37,20 @@ mod table_change; mod table_stats; pub use acl::*; +pub use cluster_health::*; pub use config::*; pub use data_lake_format::*; pub use database::*; pub use datatype::*; pub use goal_type::*; pub use json_serde::*; +pub use kv_snapshot::*; pub use kv_snapshot_lease::*; +pub use lake_snapshot::*; pub use partition::*; pub use producer_offsets::*; +pub use rebalance::*; +pub use remote_log::*; pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; pub use server_tag::*; pub use table::*; diff --git a/crates/fluss/src/metadata/producer_offsets.rs b/crates/fluss/src/metadata/producer_offsets.rs index f0daddbc..501d5a46 100644 --- a/crates/fluss/src/metadata/producer_offsets.rs +++ b/crates/fluss/src/metadata/producer_offsets.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::proto::{PbBucketOffset, PbProducerTableOffsets}; +use crate::proto::{GetProducerOffsetsResponse, PbBucketOffset, PbProducerTableOffsets}; /// Per-bucket producer log-end offset. #[derive(Debug, Clone, PartialEq, Eq)] @@ -74,6 +74,28 @@ impl ProducerTableOffsets { } } +/// Result of `get_producer_offsets`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProducerOffsets { + pub producer_id: Option, + pub expiration_time: Option, + pub table_offsets: Vec, +} + +impl ProducerOffsets { + pub fn from_pb(pb: &GetProducerOffsetsResponse) -> Self { + Self { + producer_id: pb.producer_id.clone(), + expiration_time: pb.expiration_time, + table_offsets: pb + .table_offsets + .iter() + .map(ProducerTableOffsets::from_pb) + .collect(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/rebalance.rs b/crates/fluss/src/metadata/rebalance.rs new file mode 100644 index 00000000..dfb6d75d --- /dev/null +++ b/crates/fluss/src/metadata/rebalance.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{ + ListRebalanceProgressResponse, PbRebalancePlanForBucket, PbRebalanceProgressForBucket, + PbRebalanceProgressForTable, +}; + +/// Per-bucket plan in a rebalance: who the leader was and who it will be, who +/// the replicas were and who they will be. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketRebalancePlan { + pub partition_id: Option, + pub bucket_id: i32, + pub original_leader: Option, + pub new_leader: Option, + pub original_replicas: Vec, + pub new_replicas: Vec, +} + +impl BucketRebalancePlan { + pub fn from_pb(pb: &PbRebalancePlanForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + original_leader: pb.original_leader, + new_leader: pb.new_leader, + original_replicas: pb.original_replicas.clone(), + new_replicas: pb.new_replicas.clone(), + } + } +} + +/// Per-bucket rebalance progress: the planned move and its current status code. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketRebalanceProgress { + pub rebalance_plan: BucketRebalancePlan, + pub rebalance_status: i32, +} + +impl BucketRebalanceProgress { + pub fn from_pb(pb: &PbRebalanceProgressForBucket) -> Self { + Self { + rebalance_plan: BucketRebalancePlan::from_pb(&pb.rebalance_plan), + rebalance_status: pb.rebalance_status, + } + } +} + +/// All bucket progress for one table. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableRebalanceProgress { + pub table_id: i64, + pub buckets_progress: Vec, +} + +impl TableRebalanceProgress { + pub fn from_pb(pb: &PbRebalanceProgressForTable) -> Self { + Self { + table_id: pb.table_id, + buckets_progress: pb + .buckets_progress + .iter() + .map(BucketRebalanceProgress::from_pb) + .collect(), + } + } +} + +/// Result of `list_rebalance_progress`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RebalanceProgress { + pub rebalance_id: Option, + pub rebalance_status: Option, + pub table_progress: Vec, +} + +impl RebalanceProgress { + pub fn from_pb(pb: &ListRebalanceProgressResponse) -> Self { + Self { + rebalance_id: pb.rebalance_id.clone(), + rebalance_status: pb.rebalance_status, + table_progress: pb + .table_progress + .iter() + .map(TableRebalanceProgress::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_rebalance_plan_from_pb() { + let pb = PbRebalancePlanForBucket { + partition_id: Some(1), + bucket_id: 2, + original_leader: Some(3), + new_leader: Some(4), + original_replicas: vec![3, 5, 6], + new_replicas: vec![4, 5, 6], + }; + let p = BucketRebalancePlan::from_pb(&pb); + assert_eq!(p.bucket_id, 2); + assert_eq!(p.new_leader, Some(4)); + assert_eq!(p.new_replicas, vec![4, 5, 6]); + } +} diff --git a/crates/fluss/src/metadata/remote_log.rs b/crates/fluss/src/metadata/remote_log.rs new file mode 100644 index 00000000..d3fd3e33 --- /dev/null +++ b/crates/fluss/src/metadata/remote_log.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TableBucket; +use crate::proto::PbRemoteLogManifestEntry; + +/// One bucket's remote-log manifest pointer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteLogManifestEntry { + pub table_bucket: TableBucket, + pub remote_log_manifest_path: String, + pub remote_log_end_offset: i64, +} + +impl RemoteLogManifestEntry { + pub fn from_pb(pb: &PbRemoteLogManifestEntry) -> Self { + Self { + table_bucket: TableBucket::from_pb(&pb.table_bucket), + remote_log_manifest_path: pb.remote_log_manifest_path.clone(), + remote_log_end_offset: pb.remote_log_end_offset, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::proto::PbTableBucket; + + #[test] + fn test_remote_log_manifest_entry_from_pb() { + let pb = PbRemoteLogManifestEntry { + table_bucket: PbTableBucket { + table_id: 1, + partition_id: None, + bucket_id: 2, + }, + remote_log_manifest_path: "s3://bucket/manifest.json".to_string(), + remote_log_end_offset: 999, + }; + let m = RemoteLogManifestEntry::from_pb(&pb); + assert_eq!(m.remote_log_end_offset, 999); + assert_eq!(m.table_bucket.bucket_id(), 2); + } +} diff --git a/crates/fluss/src/metadata/table_change.rs b/crates/fluss/src/metadata/table_change.rs index 2ecf6cfa..f95aebd9 100644 --- a/crates/fluss/src/metadata/table_change.rs +++ b/crates/fluss/src/metadata/table_change.rs @@ -121,6 +121,18 @@ impl RenameColumn { } } +/// Bundle of column-level changes for a single `alter_table` call. Empty `Vec`s +/// mean "no change of that kind"; pass `Default::default()` to send only +/// config changes. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct AlterTableChanges { + pub config_changes: Vec, + pub add_columns: Vec, + pub drop_columns: Vec, + pub rename_columns: Vec, + pub modify_columns: Vec, +} + /// Modify a column's type/comment/position. Mirrors the `ModifyColumn` variant of /// Java `TableChange`. All fields except `column_name` are optional — only the /// non-`None` ones are applied. diff --git a/crates/fluss/src/metadata/table_stats.rs b/crates/fluss/src/metadata/table_stats.rs index ad2d1fc2..a649844e 100644 --- a/crates/fluss/src/metadata/table_stats.rs +++ b/crates/fluss/src/metadata/table_stats.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::proto::PbTableStatsReqForBucket; +use crate::proto::{GetTableStatsResponse, PbTableStatsReqForBucket, PbTableStatsRespForBucket}; /// Per-bucket request item for `GetTableStats`. /// Mirrors the bucket-stats request shape used by the Java client. @@ -48,6 +48,51 @@ impl BucketStatsRequest { } } +/// Per-bucket stats result returned by `GetTableStats`. `row_count` is `None` +/// when the server returned an error for the bucket; check `error` in that case. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketStats { + pub bucket_id: i32, + pub partition_id: Option, + pub row_count: Option, + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketStatsError { + pub code: i32, + pub message: Option, +} + +impl BucketStats { + pub fn from_pb(pb: &PbTableStatsRespForBucket) -> Self { + let error = pb.error_code.map(|code| BucketStatsError { + code, + message: pb.error_message.clone(), + }); + Self { + bucket_id: pb.bucket_id, + partition_id: pb.partition_id, + row_count: pb.row_count, + error, + } + } +} + +/// Full result of `GetTableStats` — one entry per requested bucket. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableStats { + pub buckets: Vec, +} + +impl TableStats { + pub fn from_pb(pb: &GetTableStatsResponse) -> Self { + Self { + buckets: pb.buckets_resp.iter().map(BucketStats::from_pb).collect(), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -62,4 +107,33 @@ mod tests { assert_eq!(BucketStatsRequest::from_pb(&pb), req); } } + + #[test] + fn test_bucket_stats_from_pb_ok() { + let pb = PbTableStatsRespForBucket { + error_code: None, + error_message: None, + partition_id: Some(1), + bucket_id: 7, + row_count: Some(123), + }; + let s = BucketStats::from_pb(&pb); + assert_eq!(s.bucket_id, 7); + assert_eq!(s.row_count, Some(123)); + assert!(s.error.is_none()); + } + + #[test] + fn test_bucket_stats_from_pb_err() { + let pb = PbTableStatsRespForBucket { + error_code: Some(7), + error_message: Some("nope".to_string()), + partition_id: None, + bucket_id: 2, + row_count: None, + }; + let s = BucketStats::from_pb(&pb); + assert_eq!(s.error.as_ref().unwrap().code, 7); + assert_eq!(s.error.as_ref().unwrap().message.as_deref(), Some("nope")); + } } From f289f88e391ed0476f83052ea1394ee59970d391 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 03:02:41 +0800 Subject: [PATCH 2/9] Address reviewer feedback: add domain enums, type aliases, expose readable - Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs - Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs - Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs - Expose readable parameter in admin.get_lake_snapshot() instead of hardcoding None Addresses reviewer feedback on PR #631. Co-Authored-By: Claude Opus 4.6 --- crates/fluss/src/client/admin.rs | 11 ++- crates/fluss/src/metadata/cluster_health.rs | 64 ++++++++++-- crates/fluss/src/metadata/kv_snapshot.rs | 11 ++- crates/fluss/src/metadata/lake_snapshot.rs | 7 +- crates/fluss/src/metadata/producer_offsets.rs | 7 +- crates/fluss/src/metadata/rebalance.rs | 98 +++++++++++++++---- 6 files changed, 159 insertions(+), 39 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index e3d78d7e..f815ef7f 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -632,11 +632,16 @@ impl FlussAdmin { &self, table_path: &TablePath, snapshot_id: Option, + readable: Option, ) -> Result { let response = self .admin_gateway() .await? - .request(GetLakeSnapshotRequest::new(table_path, snapshot_id, None)) + .request(GetLakeSnapshotRequest::new( + table_path, + snapshot_id, + readable, + )) .await?; Ok(LakeSnapshotInfo::from_pb(&response)) } @@ -743,7 +748,7 @@ impl FlussAdmin { .await? .request(ListRebalanceProgressRequest::new(rebalance_id)) .await?; - Ok(RebalanceProgress::from_pb(&response)) + RebalanceProgress::from_pb(&response) } /// Cancel a rebalance. @@ -802,7 +807,7 @@ impl FlussAdmin { .await? .request(GetClusterHealthRequest::new()) .await?; - Ok(ClusterHealth::from_pb(&response)) + ClusterHealth::from_pb(&response) } /// List remote log manifests for a table (optionally scoped to one partition). diff --git a/crates/fluss/src/metadata/cluster_health.rs b/crates/fluss/src/metadata/cluster_health.rs index c984ab05..938d1f5f 100644 --- a/crates/fluss/src/metadata/cluster_health.rs +++ b/crates/fluss/src/metadata/cluster_health.rs @@ -15,8 +15,41 @@ // specific language governing permissions and limitations // under the License. +use crate::error::{Error, Result}; use crate::proto::GetClusterHealthResponse; +/// Mirrors Java `org.apache.fluss.client.admin.ClusterHealthStatus`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ClusterHealthStatus { + Green, + Yellow, + Red, + Unknown, +} + +impl ClusterHealthStatus { + pub fn to_i32(self) -> i32 { + match self { + Self::Green => 0, + Self::Yellow => 1, + Self::Red => 2, + Self::Unknown => 3, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Green), + 1 => Ok(Self::Yellow), + 2 => Ok(Self::Red), + 3 => Ok(Self::Unknown), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported ClusterHealthStatus: {value}"), + }), + } + } +} + /// Result of `get_cluster_health`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ClusterHealth { @@ -24,18 +57,18 @@ pub struct ClusterHealth { pub in_sync_replicas: i32, pub num_leader_replicas: i32, pub active_leader_replicas: i32, - pub status: i32, + pub status: ClusterHealthStatus, } impl ClusterHealth { - pub fn from_pb(pb: &GetClusterHealthResponse) -> Self { - Self { + pub fn from_pb(pb: &GetClusterHealthResponse) -> Result { + Ok(Self { num_replicas: pb.num_replicas, in_sync_replicas: pb.in_sync_replicas, num_leader_replicas: pb.num_leader_replicas, active_leader_replicas: pb.active_leader_replicas, - status: pb.status, - } + status: ClusterHealthStatus::try_from_i32(pb.status)?, + }) } } @@ -43,6 +76,23 @@ impl ClusterHealth { mod tests { use super::*; + #[test] + fn test_cluster_health_status_roundtrip() { + for s in [ + ClusterHealthStatus::Green, + ClusterHealthStatus::Yellow, + ClusterHealthStatus::Red, + ClusterHealthStatus::Unknown, + ] { + assert_eq!(ClusterHealthStatus::try_from_i32(s.to_i32()).unwrap(), s); + } + } + + #[test] + fn test_cluster_health_status_unknown_value() { + assert!(ClusterHealthStatus::try_from_i32(99).is_err()); + } + #[test] fn test_cluster_health_from_pb() { let pb = GetClusterHealthResponse { @@ -52,8 +102,8 @@ mod tests { active_leader_replicas: 3, status: 1, }; - let h = ClusterHealth::from_pb(&pb); + let h = ClusterHealth::from_pb(&pb).unwrap(); assert_eq!(h.num_replicas, 5); - assert_eq!(h.status, 1); + assert_eq!(h.status, ClusterHealthStatus::Yellow); } } diff --git a/crates/fluss/src/metadata/kv_snapshot.rs b/crates/fluss/src/metadata/kv_snapshot.rs index f58149ba..054254ad 100644 --- a/crates/fluss/src/metadata/kv_snapshot.rs +++ b/crates/fluss/src/metadata/kv_snapshot.rs @@ -19,13 +19,14 @@ use crate::proto::{ AcquireKvSnapshotLeaseResponse, GetKvSnapshotMetadataResponse, GetLatestKvSnapshotsResponse, ListKvSnapshotsResponse, PbKvSnapshot, PbRemotePathAndLocalFile, }; +use crate::{BucketId, PartitionId, TableId}; use crate::metadata::KvSnapshotLeaseForTable; /// Per-bucket KV snapshot info. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshot { - pub bucket_id: i32, + pub bucket_id: BucketId, pub snapshot_id: Option, pub log_offset: Option, } @@ -43,8 +44,8 @@ impl KvSnapshot { /// Result of `get_latest_kv_snapshots`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct LatestKvSnapshots { - pub table_id: i64, - pub partition_id: Option, + pub table_id: TableId, + pub partition_id: Option, pub latest_snapshots: Vec, } @@ -121,8 +122,8 @@ impl AcquireKvSnapshotLeaseResult { /// Result of `list_kv_snapshots`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ActiveKvSnapshots { - pub table_id: i64, - pub partition_id: Option, + pub table_id: TableId, + pub partition_id: Option, pub active_snapshots: Vec, } diff --git a/crates/fluss/src/metadata/lake_snapshot.rs b/crates/fluss/src/metadata/lake_snapshot.rs index b88d53f9..5ff50ad6 100644 --- a/crates/fluss/src/metadata/lake_snapshot.rs +++ b/crates/fluss/src/metadata/lake_snapshot.rs @@ -16,12 +16,13 @@ // under the License. use crate::proto::{GetLakeSnapshotResponse, PbLakeSnapshotForBucket}; +use crate::{BucketId, PartitionId, TableId}; /// One bucket's slice of a lake snapshot. #[derive(Debug, Clone, PartialEq, Eq)] pub struct LakeBucketSnapshot { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub log_offset: Option, pub partition_name: Option, } @@ -42,7 +43,7 @@ impl LakeBucketSnapshot { /// "latest" snapshot summary returned by `get_latest_lake_snapshot`.) #[derive(Debug, Clone, PartialEq, Eq)] pub struct LakeSnapshotInfo { - pub table_id: i64, + pub table_id: TableId, pub snapshot_id: i64, pub bucket_snapshots: Vec, } diff --git a/crates/fluss/src/metadata/producer_offsets.rs b/crates/fluss/src/metadata/producer_offsets.rs index 501d5a46..f367c390 100644 --- a/crates/fluss/src/metadata/producer_offsets.rs +++ b/crates/fluss/src/metadata/producer_offsets.rs @@ -16,12 +16,13 @@ // under the License. use crate::proto::{GetProducerOffsetsResponse, PbBucketOffset, PbProducerTableOffsets}; +use crate::{BucketId, PartitionId, TableId}; /// Per-bucket producer log-end offset. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketOffset { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub log_end_offset: Option, } @@ -46,7 +47,7 @@ impl BucketOffset { /// All bucket offsets of a single table belonging to one producer. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProducerTableOffsets { - pub table_id: i64, + pub table_id: TableId, pub bucket_offsets: Vec, } diff --git a/crates/fluss/src/metadata/rebalance.rs b/crates/fluss/src/metadata/rebalance.rs index dfb6d75d..13826f9e 100644 --- a/crates/fluss/src/metadata/rebalance.rs +++ b/crates/fluss/src/metadata/rebalance.rs @@ -15,17 +15,57 @@ // specific language governing permissions and limitations // under the License. +use crate::error::{Error, Result}; use crate::proto::{ ListRebalanceProgressResponse, PbRebalancePlanForBucket, PbRebalanceProgressForBucket, PbRebalanceProgressForTable, }; +use crate::{BucketId, PartitionId, TableId}; + +/// Mirrors Java `org.apache.fluss.cluster.rebalance.RebalanceStatus`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RebalanceStatus { + NotStarted, + Rebalancing, + Failed, + Completed, + Canceled, + Timeout, +} + +impl RebalanceStatus { + pub fn to_i32(self) -> i32 { + match self { + Self::NotStarted => 0, + Self::Rebalancing => 1, + Self::Failed => 2, + Self::Completed => 3, + Self::Canceled => 4, + Self::Timeout => 5, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::NotStarted), + 1 => Ok(Self::Rebalancing), + 2 => Ok(Self::Failed), + 3 => Ok(Self::Completed), + 4 => Ok(Self::Canceled), + 5 => Ok(Self::Timeout), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported RebalanceStatus: {value}"), + }), + } + } +} /// Per-bucket plan in a rebalance: who the leader was and who it will be, who /// the replicas were and who they will be. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketRebalancePlan { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub original_leader: Option, pub new_leader: Option, pub original_replicas: Vec, @@ -49,35 +89,35 @@ impl BucketRebalancePlan { #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketRebalanceProgress { pub rebalance_plan: BucketRebalancePlan, - pub rebalance_status: i32, + pub rebalance_status: RebalanceStatus, } impl BucketRebalanceProgress { - pub fn from_pb(pb: &PbRebalanceProgressForBucket) -> Self { - Self { + pub fn from_pb(pb: &PbRebalanceProgressForBucket) -> Result { + Ok(Self { rebalance_plan: BucketRebalancePlan::from_pb(&pb.rebalance_plan), - rebalance_status: pb.rebalance_status, - } + rebalance_status: RebalanceStatus::try_from_i32(pb.rebalance_status)?, + }) } } /// All bucket progress for one table. #[derive(Debug, Clone, PartialEq, Eq)] pub struct TableRebalanceProgress { - pub table_id: i64, + pub table_id: TableId, pub buckets_progress: Vec, } impl TableRebalanceProgress { - pub fn from_pb(pb: &PbRebalanceProgressForTable) -> Self { - Self { + pub fn from_pb(pb: &PbRebalanceProgressForTable) -> Result { + Ok(Self { table_id: pb.table_id, buckets_progress: pb .buckets_progress .iter() .map(BucketRebalanceProgress::from_pb) - .collect(), - } + .collect::>>()?, + }) } } @@ -85,21 +125,24 @@ impl TableRebalanceProgress { #[derive(Debug, Clone, PartialEq, Eq)] pub struct RebalanceProgress { pub rebalance_id: Option, - pub rebalance_status: Option, + pub rebalance_status: Option, pub table_progress: Vec, } impl RebalanceProgress { - pub fn from_pb(pb: &ListRebalanceProgressResponse) -> Self { - Self { + pub fn from_pb(pb: &ListRebalanceProgressResponse) -> Result { + Ok(Self { rebalance_id: pb.rebalance_id.clone(), - rebalance_status: pb.rebalance_status, + rebalance_status: pb + .rebalance_status + .map(RebalanceStatus::try_from_i32) + .transpose()?, table_progress: pb .table_progress .iter() .map(TableRebalanceProgress::from_pb) - .collect(), - } + .collect::>>()?, + }) } } @@ -107,6 +150,25 @@ impl RebalanceProgress { mod tests { use super::*; + #[test] + fn test_rebalance_status_roundtrip() { + for s in [ + RebalanceStatus::NotStarted, + RebalanceStatus::Rebalancing, + RebalanceStatus::Failed, + RebalanceStatus::Completed, + RebalanceStatus::Canceled, + RebalanceStatus::Timeout, + ] { + assert_eq!(RebalanceStatus::try_from_i32(s.to_i32()).unwrap(), s); + } + } + + #[test] + fn test_rebalance_status_unknown() { + assert!(RebalanceStatus::try_from_i32(99).is_err()); + } + #[test] fn test_bucket_rebalance_plan_from_pb() { let pb = PbRebalancePlanForBucket { From dcb50120522ddd81e14757586e0cbc49bf50a20e Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 03:08:55 +0800 Subject: [PATCH 3/9] Apply type aliases to remaining metadata types (table_stats, kv_snapshot_lease) Extends the BucketId/TableId/PartitionId alias consistency fix to table_stats.rs (BucketStatsRequest, BucketStats) and kv_snapshot_lease.rs (KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable). Co-Authored-By: Claude Opus 4.6 --- crates/fluss/src/metadata/kv_snapshot_lease.rs | 7 ++++--- crates/fluss/src/metadata/table_stats.rs | 11 ++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/fluss/src/metadata/kv_snapshot_lease.rs b/crates/fluss/src/metadata/kv_snapshot_lease.rs index 22679136..98638f5d 100644 --- a/crates/fluss/src/metadata/kv_snapshot_lease.rs +++ b/crates/fluss/src/metadata/kv_snapshot_lease.rs @@ -16,12 +16,13 @@ // under the License. use crate::proto::{PbKvSnapshotLeaseForBucket, PbKvSnapshotLeaseForTable}; +use crate::{BucketId, PartitionId, TableId}; /// One bucket's slot in a KV-snapshot lease request. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshotLeaseForBucket { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub snapshot_id: i64, } @@ -46,7 +47,7 @@ impl KvSnapshotLeaseForBucket { /// All the buckets of a single table that should be leased together. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshotLeaseForTable { - pub table_id: i64, + pub table_id: TableId, pub bucket_snapshots: Vec, } diff --git a/crates/fluss/src/metadata/table_stats.rs b/crates/fluss/src/metadata/table_stats.rs index a649844e..53c6f72f 100644 --- a/crates/fluss/src/metadata/table_stats.rs +++ b/crates/fluss/src/metadata/table_stats.rs @@ -16,17 +16,18 @@ // under the License. use crate::proto::{GetTableStatsResponse, PbTableStatsReqForBucket, PbTableStatsRespForBucket}; +use crate::{BucketId, PartitionId}; /// Per-bucket request item for `GetTableStats`. /// Mirrors the bucket-stats request shape used by the Java client. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketStatsRequest { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, } impl BucketStatsRequest { - pub fn new(partition_id: Option, bucket_id: i32) -> Self { + pub fn new(partition_id: Option, bucket_id: BucketId) -> Self { Self { partition_id, bucket_id, @@ -52,8 +53,8 @@ impl BucketStatsRequest { /// when the server returned an error for the bucket; check `error` in that case. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketStats { - pub bucket_id: i32, - pub partition_id: Option, + pub bucket_id: BucketId, + pub partition_id: Option, pub row_count: Option, pub error: Option, } From 4a9cdf6771ba1880fb5f8511402757adf36e620f Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 03:11:46 +0800 Subject: [PATCH 4/9] Update admin.rs to use GoalType/ServerTag enums after rebase Now that pr/3 provides GoalType and ServerTag enums in the RPC wrappers, update the admin client methods to use them in their public signatures too. Co-Authored-By: Claude Opus 4.6 --- crates/fluss/src/client/admin.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index f815ef7f..e1c618e7 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -20,11 +20,11 @@ use crate::cluster::ServerNode; use crate::metadata::{ AclFilter, AclInfo, AcquireKvSnapshotLeaseResult, ActiveKvSnapshots, AlterConfig, AlterTableChanges, ClusterHealth, CreateAclResult, DatabaseDescriptor, DatabaseInfo, - DatabaseSummary, DescribeConfig, DropAclsFilterResult, JsonSerde, KvSnapshotLeaseForTable, - KvSnapshotMetadata, LakeSnapshot, LakeSnapshotInfo, LatestKvSnapshots, PartitionInfo, - PartitionSpec, PhysicalTablePath, ProducerOffsets, ProducerTableOffsets, RebalanceProgress, - RemoteLogManifestEntry, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, - TableStats, + DatabaseSummary, DescribeConfig, DropAclsFilterResult, GoalType, JsonSerde, + KvSnapshotLeaseForTable, KvSnapshotMetadata, LakeSnapshot, LakeSnapshotInfo, LatestKvSnapshots, + PartitionInfo, PartitionSpec, PhysicalTablePath, ProducerOffsets, ProducerTableOffsets, + RebalanceProgress, RemoteLogManifestEntry, Schema, SchemaInfo, ServerTag, TableBucket, + TableDescriptor, TableInfo, TablePath, TableStats, }; use crate::rpc::message::{ AcquireKvSnapshotLeaseRequest, AddServerTagRequest, AlterClusterConfigsRequest, @@ -709,7 +709,7 @@ impl FlussAdmin { } /// Add a tag to servers. - pub async fn add_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + pub async fn add_server_tag(&self, server_ids: Vec, server_tag: ServerTag) -> Result<()> { let _response = self .admin_gateway() .await? @@ -719,7 +719,11 @@ impl FlussAdmin { } /// Remove a tag from servers. - pub async fn remove_server_tag(&self, server_ids: Vec, server_tag: i32) -> Result<()> { + pub async fn remove_server_tag( + &self, + server_ids: Vec, + server_tag: ServerTag, + ) -> Result<()> { let _response = self .admin_gateway() .await? @@ -729,7 +733,7 @@ impl FlussAdmin { } /// Trigger a rebalance. Returns the rebalance id assigned by the server. - pub async fn rebalance(&self, goals: Vec) -> Result { + pub async fn rebalance(&self, goals: Vec) -> Result { let response = self .admin_gateway() .await? From 6c52a1f9a6ac775347767c86ff1e3a1e9da148e0 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Wed, 24 Jun 2026 09:48:43 +0800 Subject: [PATCH 5/9] Apply TableId/PartitionId/BucketId type aliases to admin methods Update the new admin method signatures introduced by this PR to use the i64/i32 type aliases from lib.rs instead of raw primitives, matching the underlying RPC message wrappers. Co-Authored-By: Claude Opus 4.7 --- crates/fluss/src/client/admin.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index e1c618e7..d91cc66c 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -557,7 +557,7 @@ impl FlussAdmin { /// Get table statistics for buckets. Pass empty `target_columns` to request stats for all columns. pub async fn get_table_stats( &self, - table_id: i64, + table_id: TableId, buckets_req: Vec, target_columns: Vec, ) -> Result { @@ -590,9 +590,9 @@ impl FlussAdmin { /// Get KV snapshot metadata (manifest file list). pub async fn get_kv_snapshot_metadata( &self, - table_id: i64, - partition_id: Option, - bucket_id: i32, + table_id: TableId, + partition_id: Option, + bucket_id: BucketId, snapshot_id: i64, ) -> Result { let response = self @@ -817,8 +817,8 @@ impl FlussAdmin { /// List remote log manifests for a table (optionally scoped to one partition). pub async fn list_remote_log_manifests( &self, - table_id: i64, - partition_id: Option, + table_id: TableId, + partition_id: Option, ) -> Result> { let response = self .admin_gateway() @@ -835,8 +835,8 @@ impl FlussAdmin { /// List active KV snapshots for a table (optionally scoped to one partition). pub async fn list_kv_snapshots( &self, - table_id: i64, - partition_id: Option, + table_id: TableId, + partition_id: Option, ) -> Result { let response = self .admin_gateway() From c5294800bdf7d822c56ed1b45fde10cf436c59a3 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Fri, 19 Jun 2026 09:00:44 +0800 Subject: [PATCH 6/9] [rust] Add version-aware integration test harness (0.9.x / 1.x) --- crates/fluss/Cargo.toml | 3 + .../fluss/tests/integration/admin_extended.rs | 781 ++++++++++++++++++ crates/fluss/tests/integration/admin_v1.rs | 211 +++++ crates/fluss/tests/test_fluss.rs | 3 + 4 files changed, 998 insertions(+) create mode 100644 crates/fluss/tests/integration/admin_extended.rs create mode 100644 crates/fluss/tests/integration/admin_v1.rs diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index 821ee52e..18a37728 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -41,6 +41,9 @@ storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] storage-oss = ["opendal/services-oss"] integration_tests = [] +# Gates tests that exercise APIs only available on Fluss 1.x servers. +# Enable alongside `integration_tests` when running against a 1.x server image. +fluss_v1 = [] [dependencies] arrow = { workspace = true } diff --git a/crates/fluss/tests/integration/admin_extended.rs b/crates/fluss/tests/integration/admin_extended.rs new file mode 100644 index 00000000..8c8f71cf --- /dev/null +++ b/crates/fluss/tests/integration/admin_extended.rs @@ -0,0 +1,781 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Extended admin integration tests covering APIs available on both Fluss +//! 0.9.x and 1.x servers but not exercised by `admin.rs`. +//! +//! These tests run against the shared cluster gated behind `integration_tests`. +//! Some admin operations have semantics that depend on optional cluster +//! configuration (lake storage, authorization). For those the tests assert the +//! request/response roundtrip succeeds and tolerate a *structured* server-side +//! error (a decoded [`FlussError`]) while still failing on transport/decoding +//! errors — which is what a Rust-client integration test must guard. + +#[cfg(test)] +mod admin_extended_test { + use crate::integration::utils::get_shared_cluster; + use fluss::error::FlussError; + use fluss::metadata::{ + AclFilter, AclInfo, AddColumn, AlterConfig, AlterConfigOpType, AlterTableChanges, + BucketOffset, BucketStatsRequest, ColumnPositionType, DataTypes, DatabaseDescriptorBuilder, + JsonSerde, KvFormat, KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable, LogFormat, + OperationType, PermissionType, ProducerTableOffsets, ResourceType, Schema, TableDescriptor, + TablePath, + }; + + // ServerTag codes (no domain enum yet — server-tag API is admin-level only). + const SERVER_TAG_TEMPORARY_OFFLINE: i32 = 1; + + /// Asserts an error decoded into one of the `allowed` Fluss API errors. + /// Panics (failing the test) on a transport/decoding error or an unexpected + /// API error code. + fn assert_expected_api_error(error: fluss::error::Error, allowed: &[FlussError]) { + match error.api_error() { + Some(code) if allowed.contains(&code) => {} + other => { + panic!("Expected one of {allowed:?} but got {other:?} (full error: {error:?})") + } + } + } + + /// Builds a simple non-partitioned log table descriptor (id INT, name STRING). + fn simple_log_table() -> TableDescriptor { + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .build() + .expect("build schema"); + TableDescriptor::builder() + .schema(schema) + .distributed_by(Some(1), vec![]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .build() + .expect("build table descriptor") + } + + /// Builds a simple primary-key/KV table descriptor (id INT PK, name STRING). + fn simple_kv_table() -> TableDescriptor { + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("build schema"); + TableDescriptor::builder() + .schema(schema) + .distributed_by(Some(1), vec!["id".to_string()]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .kv_format(KvFormat::COMPACTED) + .build() + .expect("build table descriptor") + } + + // --------------------------------------------------------------------- + // Group A: Database listing & summaries + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_list_databases() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_a = "test_list_databases_a"; + let db_b = "test_list_databases_b"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + + admin + .create_database(db_a, Some(&descriptor), true) + .await + .unwrap(); + admin + .create_database(db_b, Some(&descriptor), true) + .await + .unwrap(); + + let databases = admin.list_databases().await.expect("should list databases"); + assert!( + databases.iter().any(|d| d == db_a), + "Expected {db_a} in {databases:?}" + ); + assert!( + databases.iter().any(|d| d == db_b), + "Expected {db_b} in {databases:?}" + ); + + admin.drop_database(db_a, true, true).await.unwrap(); + admin.drop_database(db_b, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_list_database_summaries() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_list_db_summaries"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_descriptor = simple_log_table(); + for table in ["summary_t1", "summary_t2"] { + admin + .create_table(&TablePath::new(db_name, table), &table_descriptor, true) + .await + .unwrap(); + } + + let summaries = admin + .list_database_summaries() + .await + .expect("should list database summaries"); + + let summary = summaries + .iter() + .find(|s| s.database_name == db_name) + .unwrap_or_else(|| panic!("Expected summary for {db_name} in {summaries:?}")); + + assert_eq!( + summary.table_count, 2, + "Database {db_name} should report 2 tables" + ); + assert!( + summary.created_time > 0, + "created_time should be positive, got {}", + summary.created_time + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group B: Schema operations + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_table_schema() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_get_table_schema_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "schema_table"); + let table_descriptor = simple_kv_table(); + admin + .create_table(&table_path, &table_descriptor, true) + .await + .unwrap(); + + // Request the latest schema (schema_id = None). + let schema_info = admin + .get_table_schema(&table_path, None) + .await + .expect("should get latest table schema"); + assert!( + schema_info.schema_id() > 0, + "schema_id should be positive, got {}", + schema_info.schema_id() + ); + assert_eq!( + schema_info.schema().columns().len(), + 2, + "schema should have 2 columns" + ); + + // Request the same schema by explicit id. + let by_id = admin + .get_table_schema(&table_path, Some(schema_info.schema_id())) + .await + .expect("should get table schema by id"); + assert_eq!(by_id.schema_id(), schema_info.schema_id()); + assert_eq!(by_id.schema().columns().len(), 2); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group C: Alter operations + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_alter_table_add_column() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_alter_table_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "alter_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + + // Add a nullable "email" column at the end of the schema. + let data_type_json = serde_json::to_vec( + &DataTypes::string() + .serialize_json() + .expect("serialize STRING type"), + ) + .expect("encode data_type_json"); + let add_column = AddColumn { + column_name: "email".to_string(), + data_type_json, + comment: Some("user email".to_string()), + position: ColumnPositionType::Last, + }; + + admin + .alter_table( + &table_path, + false, + AlterTableChanges { + add_columns: vec![add_column], + ..Default::default() + }, + ) + .await + .expect("should add column"); + + let schema_info = admin + .get_table_schema(&table_path, None) + .await + .expect("should get schema after alter"); + assert_eq!( + schema_info.schema().columns().len(), + 3, + "schema should have 3 columns after adding email" + ); + assert!( + schema_info + .schema() + .columns() + .iter() + .any(|c| c.name() == "email"), + "schema should contain the new 'email' column" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_alter_database() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_alter_database_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let config_change = AlterConfig::new( + "custom.key", + Some("custom-value".to_string()), + AlterConfigOpType::Set, + ); + // AlterDatabase is not implemented on every server build (e.g. 0.9.x). + // Accept the server's "unsupported" signal but never a transport failure. + match admin + .alter_database(db_name, vec![config_change], false) + .await + { + Ok(()) => { + // Altering a non-existent database with ignore_if_not_exists = true is a no-op. + admin + .alter_database("no_such_db_for_alter", vec![], true) + .await + .expect("altering missing db with ignore flag should succeed"); + } + Err(fluss::error::Error::UnsupportedVersion { .. }) => {} + Err(error) => panic!("unexpected error from alter_database: {error:?}"), + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group D: Cluster configuration + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_describe_cluster_configs() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let configs = admin + .describe_cluster_configs() + .await + .expect("should describe cluster configs"); + + for config in &configs { + assert!( + !config.config_key.is_empty(), + "config_key should not be empty" + ); + assert!( + !config.config_source.is_empty(), + "config_source should not be empty for {}", + config.config_key + ); + } + } + + #[tokio::test] + async fn test_alter_cluster_configs() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // Read an existing config that has a value, then SET it to the same value. + // This exercises the write path without changing cluster behaviour. + let described = admin + .describe_cluster_configs() + .await + .expect("should describe cluster configs"); + + let Some(existing) = described.iter().find(|c| c.config_value.is_some()) else { + // No config with a value to round-trip; nothing to assert. + return; + }; + + let alter = AlterConfig::new( + existing.config_key.clone(), + existing.config_value.clone(), + AlterConfigOpType::Set, + ); + + // Many keys are not dynamically alterable; the server rejects those with + // either InvalidConfigException or a generic "not allowed to be changed + // dynamically" error. Either way the request/response roundtrip worked. + if let Err(error) = admin.alter_cluster_configs(vec![alter]).await { + assert_expected_api_error( + error, + &[ + FlussError::InvalidConfigException, + FlussError::UnknownServerError, + ], + ); + } + } + + // --------------------------------------------------------------------- + // Group E: Table statistics + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_table_stats() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_get_table_stats_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "stats_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let buckets_req = vec![BucketStatsRequest::new(None, 0)]; + // GetTableStats is not implemented on every server build. Accept either a + // decoded response or the server's "unsupported" signal, but never a + // transport/decoding failure. + match admin.get_table_stats(table_id, buckets_req, vec![]).await { + Ok(response) => { + for bucket in &response.buckets { + // Per-bucket entries echo the requested bucket id. + assert_eq!( + bucket.bucket_id, 0, + "unexpected bucket id in stats response" + ); + } + } + Err(fluss::error::Error::UnsupportedVersion { .. }) => {} + Err(error) => panic!("unexpected error from get_table_stats: {error:?}"), + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group F: KV snapshot operations (0.9.x) + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_latest_kv_snapshots() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_latest_kv_snapshots_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "latest_kv_snapshots_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let response = admin + .get_latest_kv_snapshots(&table_path, None) + .await + .expect("should get latest kv snapshots"); + assert_eq!( + response.table_id, table_id, + "response table_id should match the requested table" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_kv_snapshot_lease_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_kv_snapshot_lease_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "lease_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + // A fresh table has no snapshot to lease, so the requested snapshot is + // reported back as unavailable. The RPC itself must still succeed. + let lease = KvSnapshotLeaseForTable { + table_id, + bucket_snapshots: vec![KvSnapshotLeaseForBucket { + partition_id: None, + bucket_id: 0, + snapshot_id: 0, + }], + }; + let response = admin + .create_kv_snapshot_lease("test-lease-id", 60_000, vec![lease]) + .await + .expect("should acquire kv snapshot lease"); + + // The fresh-table snapshot is unavailable; just confirm the response decodes. + let _ = response.unavailable_snapshots; + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group G: Server tags + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_server_tag_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let nodes = admin.get_server_nodes().await.expect("should get nodes"); + let tablet_id = nodes + .iter() + .find(|n| *n.server_type() == fluss::ServerType::TabletServer) + .map(|n| n.id()) + .expect("expected a tablet server node"); + + // Add then immediately remove a TEMPORARY_OFFLINE tag so cluster state + // is restored. Both RPCs must complete without a transport error. + admin + .add_server_tag(vec![tablet_id], SERVER_TAG_TEMPORARY_OFFLINE) + .await + .expect("should add server tag"); + admin + .remove_server_tag(vec![tablet_id], SERVER_TAG_TEMPORARY_OFFLINE) + .await + .expect("should remove server tag"); + } + + // --------------------------------------------------------------------- + // Group H: Rebalance + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_rebalance_operations() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // No rebalance is running; listing progress and cancelling are read/no-op + // paths. Tolerate a structured server error (e.g. nothing to cancel). + if let Err(error) = admin.list_rebalance_progress(None).await { + assert_expected_api_error( + error, + &[ + FlussError::UnknownServerError, + FlussError::InvalidCoordinatorException, + ], + ); + } + + if let Err(error) = admin.cancel_rebalance(None).await { + assert_expected_api_error( + error, + &[ + FlussError::UnknownServerError, + FlussError::InvalidCoordinatorException, + ], + ); + } + } + + // --------------------------------------------------------------------- + // Group I: Producer offsets + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_producer_offsets_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_producer_offsets_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "producer_offsets_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let producer_id = "test-producer"; + let table_offsets = vec![ProducerTableOffsets { + table_id, + bucket_offsets: vec![BucketOffset { + partition_id: None, + bucket_id: 0, + log_end_offset: Some(42), + }], + }]; + + admin + .register_producer_offsets(producer_id, table_offsets, None) + .await + .expect("should register producer offsets"); + + let fetched = admin + .get_producer_offsets(producer_id) + .await + .expect("should get producer offsets"); + let registered = fetched + .table_offsets + .iter() + .find(|t| t.table_id == table_id) + .unwrap_or_else(|| panic!("expected offsets for table {table_id}")); + assert_eq!( + registered + .bucket_offsets + .first() + .and_then(|b| b.log_end_offset), + Some(42), + "registered log_end_offset should be 42" + ); + + admin + .delete_producer_offsets(producer_id) + .await + .expect("should delete producer offsets"); + + let after_delete = admin + .get_producer_offsets(producer_id) + .await + .expect("should get producer offsets after delete"); + assert!( + after_delete + .table_offsets + .iter() + .all(|t| t.table_id != table_id), + "offsets for table {table_id} should be gone after delete" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group J: ACL management + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_acl_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_acl_lifecycle_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let acl = AclInfo { + resource_name: db_name.to_string(), + resource_type: ResourceType::Database, + principal_name: "alice".to_string(), + principal_type: "User".to_string(), + host: "*".to_string(), + operation_type: OperationType::Read, + permission_type: PermissionType::Allow, + }; + + // Authorization may be disabled on the shared cluster. In that case the + // server rejects the request with SecurityDisabledException; otherwise + // the full create/list/drop lifecycle should round-trip. + match admin.create_acls(vec![acl.clone()]).await { + Err(error) => { + assert_expected_api_error( + error, + &[ + FlussError::SecurityDisabledException, + FlussError::AuthorizationException, + ], + ); + } + Ok(_) => { + let filter = AclFilter { + resource_name: Some(db_name.to_string()), + resource_type: ResourceType::Database, + principal_name: Some("alice".to_string()), + principal_type: Some("User".to_string()), + host: Some("*".to_string()), + operation_type: OperationType::Read, + permission_type: PermissionType::Allow, + }; + let listed = admin + .list_acls(filter.clone()) + .await + .expect("should list acls"); + assert!( + listed.iter().any(|a| a.resource_name == db_name), + "created ACL should appear in list: {listed:?}" + ); + + admin + .drop_acls(vec![filter]) + .await + .expect("should drop acls"); + } + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group K: Lake snapshots (0.9.x) + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_lake_snapshot_operations() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_lake_snapshot_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "lake_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + + // Lake storage is typically not configured for the test cluster, so both + // calls are expected to fail with a lake-related API error rather than a + // transport error. + let lake_errors = [ + FlussError::LakeStorageNotConfiguredException, + FlussError::LakeSnapshotNotExist, + ]; + + if let Err(error) = admin.get_latest_lake_snapshot(&table_path).await { + assert_expected_api_error(error, &lake_errors); + } + if let Err(error) = admin.get_lake_snapshot(&table_path, None).await { + assert_expected_api_error(error, &lake_errors); + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } +} diff --git a/crates/fluss/tests/integration/admin_v1.rs b/crates/fluss/tests/integration/admin_v1.rs new file mode 100644 index 00000000..46501d32 --- /dev/null +++ b/crates/fluss/tests/integration/admin_v1.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for admin APIs that are only available on Fluss 1.x +//! servers (API keys 1057-1058, 1061-1064). The whole module is gated behind +//! the `fluss_v1` feature so it is skipped when running against a 0.9.x server. + +#[cfg(test)] +mod admin_v1_test { + use crate::integration::utils::get_shared_cluster; + use fluss::metadata::DataTypes; + use fluss::metadata::{ + DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor, TablePath, + }; + + /// `get_cluster_health` (API key 1062) reports replica/leader counts and a + /// status code for the cluster. + #[tokio::test] + async fn test_get_cluster_health() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let response = admin + .get_cluster_health() + .await + .expect("should get cluster health"); + + assert!( + response.status >= 0, + "Cluster health status should be non-negative, got: {}", + response.status + ); + } + + /// `list_kv_snapshots` (API key 1064) returns the active snapshots for a KV + /// table. A freshly created table has none, but the response must echo the + /// requested table id. + #[tokio::test] + async fn test_list_kv_snapshots() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let test_db_name = "test_list_kv_snapshots_db"; + let db_descriptor = DatabaseDescriptorBuilder::default() + .comment("Database for test_list_kv_snapshots") + .build(); + + admin + .create_database(test_db_name, Some(&db_descriptor), true) + .await + .expect("Failed to create test database"); + + let test_table_name = "kv_snapshot_table"; + let table_path = TablePath::new(test_db_name, test_table_name); + + let table_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build table schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(table_schema) + .distributed_by(Some(1), vec!["id".to_string()]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .kv_format(KvFormat::COMPACTED) + .build() + .expect("Failed to build table descriptor"); + + admin + .create_table(&table_path, &table_descriptor, true) + .await + .expect("Failed to create table"); + + let table_info = admin + .get_table_info(&table_path) + .await + .expect("should get table info"); + let table_id = table_info.get_table_id(); + + let response = admin + .list_kv_snapshots(table_id, None) + .await + .expect("should list kv snapshots"); + + assert_eq!( + response.table_id, table_id, + "Response table_id should match request" + ); + + // Cleanup + admin.drop_table(&table_path, true).await.unwrap(); + admin.drop_database(test_db_name, true, true).await.unwrap(); + } + + /// `list_remote_log_manifests` (API key 1063) lists tiered log segments. A + /// newly created table has had no remote log activity yet. + #[tokio::test] + async fn test_list_remote_log_manifests() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let test_db_name = "test_list_remote_log_manifests_db"; + let db_descriptor = DatabaseDescriptorBuilder::default() + .comment("Database for test_list_remote_log_manifests") + .build(); + + admin + .create_database(test_db_name, Some(&db_descriptor), true) + .await + .expect("Failed to create test database"); + + let test_table_name = "remote_log_table"; + let table_path = TablePath::new(test_db_name, test_table_name); + + let table_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("data", DataTypes::string()) + .build() + .expect("Failed to build table schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(table_schema) + .distributed_by(Some(1), vec![]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .build() + .expect("Failed to build table descriptor"); + + admin + .create_table(&table_path, &table_descriptor, true) + .await + .expect("Failed to create table"); + + let table_info = admin + .get_table_info(&table_path) + .await + .expect("should get table info"); + let table_id = table_info.get_table_id(); + + let manifests = admin + .list_remote_log_manifests(table_id, None) + .await + .expect("should list remote log manifests"); + + // A newly created table with no remote log activity should return empty manifests. + assert!( + manifests.is_empty(), + "Newly created table should have no remote log manifests" + ); + + // Cleanup + admin.drop_table(&table_path, true).await.unwrap(); + admin.drop_database(test_db_name, true, true).await.unwrap(); + } + + /// `drop_kv_snapshot_lease` (API key 1058) removes an entire lease. Dropping + /// a lease that never existed is a server-side no-op. + #[tokio::test] + async fn test_drop_kv_snapshot_lease() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // Dropping a non-existent lease should succeed (no-op on server) + let result = admin.drop_kv_snapshot_lease("non-existent-lease-id").await; + assert!( + result.is_ok(), + "Dropping non-existent lease should succeed, got: {:?}", + result + ); + } + + /// `release_kv_snapshot_lease` (API key 1057) releases specific buckets from + /// a lease. Releasing an empty bucket set against an unknown lease is a + /// no-op and must not error. + #[tokio::test] + async fn test_release_kv_snapshot_lease() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let result = admin + .release_kv_snapshot_lease("non-existent-lease-id", vec![]) + .await; + assert!( + result.is_ok(), + "Releasing an empty bucket set should succeed, got: {:?}", + result + ); + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 2d2bd152..73ed0651 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -21,6 +21,9 @@ extern crate fluss; #[cfg(feature = "integration_tests")] mod integration { mod admin; + mod admin_extended; + #[cfg(feature = "fluss_v1")] + mod admin_v1; mod batch_scanner; mod fluss_cluster; mod kv_table; From fdb66ef970d2e670b0942915dfdd24a934eb4903 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 03:16:08 +0800 Subject: [PATCH 7/9] Update integration tests to use domain enums and new signatures - Replace raw SERVER_TAG_TEMPORARY_OFFLINE const with ServerTag::TemporaryOffline - Add readable parameter to get_lake_snapshot call Co-Authored-By: Claude Opus 4.6 --- crates/fluss/tests/integration/admin_extended.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/crates/fluss/tests/integration/admin_extended.rs b/crates/fluss/tests/integration/admin_extended.rs index 8c8f71cf..f0a4a42e 100644 --- a/crates/fluss/tests/integration/admin_extended.rs +++ b/crates/fluss/tests/integration/admin_extended.rs @@ -33,13 +33,10 @@ mod admin_extended_test { AclFilter, AclInfo, AddColumn, AlterConfig, AlterConfigOpType, AlterTableChanges, BucketOffset, BucketStatsRequest, ColumnPositionType, DataTypes, DatabaseDescriptorBuilder, JsonSerde, KvFormat, KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable, LogFormat, - OperationType, PermissionType, ProducerTableOffsets, ResourceType, Schema, TableDescriptor, - TablePath, + OperationType, PermissionType, ProducerTableOffsets, ResourceType, Schema, ServerTag, + TableDescriptor, TablePath, }; - // ServerTag codes (no domain enum yet — server-tag API is admin-level only). - const SERVER_TAG_TEMPORARY_OFFLINE: i32 = 1; - /// Asserts an error decoded into one of the `allowed` Fluss API errors. /// Panics (failing the test) on a transport/decoding error or an unexpected /// API error code. @@ -546,11 +543,11 @@ mod admin_extended_test { // Add then immediately remove a TEMPORARY_OFFLINE tag so cluster state // is restored. Both RPCs must complete without a transport error. admin - .add_server_tag(vec![tablet_id], SERVER_TAG_TEMPORARY_OFFLINE) + .add_server_tag(vec![tablet_id], ServerTag::TemporaryOffline) .await .expect("should add server tag"); admin - .remove_server_tag(vec![tablet_id], SERVER_TAG_TEMPORARY_OFFLINE) + .remove_server_tag(vec![tablet_id], ServerTag::TemporaryOffline) .await .expect("should remove server tag"); } @@ -772,7 +769,7 @@ mod admin_extended_test { if let Err(error) = admin.get_latest_lake_snapshot(&table_path).await { assert_expected_api_error(error, &lake_errors); } - if let Err(error) = admin.get_lake_snapshot(&table_path, None).await { + if let Err(error) = admin.get_lake_snapshot(&table_path, None, None).await { assert_expected_api_error(error, &lake_errors); } From 5b99fb2ebb3fc9ae05cd63b29ea09d89c658b5fe Mon Sep 17 00:00:00 2001 From: warmbupt Date: Fri, 19 Jun 2026 09:01:29 +0800 Subject: [PATCH 8/9] [rust] Add KvBatchScanner for full PK-table bucket scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a stateful, unbounded KV-table scanner using the ScanKv API (1061). The first next_batch() opens the server-side cursor; subsequent calls iterate; dropping the scanner sends a best-effort close. - client/table/kv_batch_scanner.rs: KvBatchScanner with per-bucket state machine (Pending -> Active -> Done), best-effort close on Drop, and retry-with-backoff on retriable server errors (leader-election races, TooManyScanners, etc.) — not just TooManyScanners. - client/table/scanner.rs: TableScan::create_kv_batch_scanner() with PK/bucket validation. - config.rs: scanner_kv_fetch_max_bytes (default 4MB, matching Java). - rpc/fluss_api_error.rs: new error codes 66-69 (ScannerExpiredException, UnknownScannerIdException, InvalidScanRequestException, TooManyScanners) with correct retriable classification. - batch_scanner.rs: expose KV decode helpers (pub(super)) for reuse. - tests: 4 KV integration tests; tolerate UnsupportedVersion so they also pass (no-op) against 0.9.x servers that lack ScanKv. --- .../fluss/src/client/table/batch_scanner.rs | 18 +- .../src/client/table/kv_batch_scanner.rs | 280 ++++++++++++++++++ crates/fluss/src/client/table/mod.rs | 2 + crates/fluss/src/client/table/scanner.rs | 49 +++ crates/fluss/src/config.rs | 11 + crates/fluss/src/rpc/fluss_api_error.rs | 22 ++ .../fluss/tests/integration/batch_scanner.rs | 229 ++++++++++++++ 7 files changed, 602 insertions(+), 9 deletions(-) create mode 100644 crates/fluss/src/client/table/kv_batch_scanner.rs diff --git a/crates/fluss/src/client/table/batch_scanner.rs b/crates/fluss/src/client/table/batch_scanner.rs index 090a7428..5e19202b 100644 --- a/crates/fluss/src/client/table/batch_scanner.rs +++ b/crates/fluss/src/client/table/batch_scanner.rs @@ -219,10 +219,10 @@ fn decode_log_batch( )) } -/// Decode a KV limit-scan [`ValueRecordBatch`] into a single Arrow -/// `RecordBatch`, decoding each record by its own schema id and projecting onto -/// the current schema. -async fn decode_kv_batch( +/// Decode a KV [`ValueRecordBatch`] into a single Arrow `RecordBatch`, +/// decoding each record by its own schema id and projecting onto the current +/// schema. +pub(super) async fn decode_kv_batch( table_info: &TableInfo, schema_getter: &ClientSchemaGetter, projected_fields: Option<&[usize]>, @@ -279,7 +279,7 @@ async fn decode_kv_batch( /// Build one [`FixedSchemaDecoder`] per distinct schema id. The current schema /// decodes without projection; older schemas are fetched and projected onto the /// current schema. -async fn build_kv_decoders( +pub(super) async fn build_kv_decoders( schema_getter: &ClientSchemaGetter, target_schema: &Schema, target_schema_id: i16, @@ -304,7 +304,7 @@ async fn build_kv_decoders( /// Decode every value record into a row shaped by `target_row_type`, build a /// single Arrow batch, keep the last `limit` rows, then apply column projection. -fn value_records_to_record_batch( +pub(super) fn value_records_to_record_batch( batch: &ValueRecordBatch, ranges: &[Range], decoders: &HashMap, @@ -332,7 +332,7 @@ fn value_records_to_record_batch( } /// Read the leading little-endian schema id from a `[schema_id | row]` payload. -fn read_schema_id(payload: &[u8]) -> Result { +pub(super) fn read_schema_id(payload: &[u8]) -> Result { if payload.len() < SCHEMA_ID_LENGTH { return Err(Error::UnexpectedError { message: format!( @@ -366,7 +366,7 @@ fn take_last_rows(batch: RecordBatch, base_offset: i64, limit: usize) -> (Record } /// An empty `RecordBatch` with the (optionally projected) target schema. -fn empty_record_batch( +pub(super) fn empty_record_batch( target_row_type: &RowType, projected_fields: Option<&[usize]>, ) -> Result { @@ -375,7 +375,7 @@ fn empty_record_batch( } /// Project `batch` (shaped by `target_row_type`) onto the requested columns. -fn project_batch( +pub(super) fn project_batch( batch: RecordBatch, target_row_type: &RowType, projected_fields: Option<&[usize]>, diff --git a/crates/fluss/src/client/table/kv_batch_scanner.rs b/crates/fluss/src/client/table/kv_batch_scanner.rs new file mode 100644 index 00000000..a1d14589 --- /dev/null +++ b/crates/fluss/src/client/table/kv_batch_scanner.rs @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::ClientSchemaGetter; +use crate::client::metadata::Metadata; +use crate::client::table::batch_scanner::decode_kv_batch; +use crate::error::{ApiError, Error, FlussError, Result}; +use crate::metadata::{TableBucket, TableInfo}; +use crate::proto::{self, ErrorResponse}; +use crate::record::ScanBatch; +use crate::rpc::RpcClient; +use crate::rpc::message::ScanKvRequest; +use log::warn; +use std::sync::Arc; + +/// Maximum retry attempts for retriable server errors (e.g. leader election +/// races on a freshly created bucket, transient `TooManyScanners`). +const MAX_RETRIABLE_RETRIES: u32 = 5; + +/// Stateful scanner for full KV table scans using the ScanKv API (1061). +/// +/// The server maintains a cursor: the first `next_batch()` opens the scanner, +/// subsequent calls iterate, and dropping the scanner sends a best-effort close. +pub struct KvBatchScanner { + bucket: TableBucket, + state: ScannerState, +} + +enum ScannerState { + Pending(ScanContext), + Active { + ctx: ScanContext, + scanner_id: Vec, + call_seq_id: i32, + }, + Done, +} + +struct ScanContext { + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + schema_getter: Arc, + projected_fields: Option>, + batch_size_bytes: i32, +} + +impl KvBatchScanner { + pub(super) fn new( + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + schema_getter: Arc, + projected_fields: Option>, + bucket: TableBucket, + batch_size_bytes: i32, + ) -> Self { + Self { + bucket, + state: ScannerState::Pending(ScanContext { + rpc_client, + metadata, + table_info, + schema_getter, + projected_fields, + batch_size_bytes, + }), + } + } + + pub async fn next_batch(&mut self) -> Result> { + match std::mem::replace(&mut self.state, ScannerState::Done) { + ScannerState::Done => Ok(None), + ScannerState::Pending(ctx) => self.open_scanner(ctx).await, + ScannerState::Active { + ctx, + scanner_id, + call_seq_id, + } => self.iterate(ctx, scanner_id, call_seq_id).await, + } + } + + pub async fn collect_all_batches(&mut self) -> Result> { + let mut batches = Vec::new(); + while let Some(batch) = self.next_batch().await? { + batches.push(batch); + } + Ok(batches) + } + + pub fn bucket(&self) -> &TableBucket { + &self.bucket + } + + async fn open_scanner(&mut self, ctx: ScanContext) -> Result> { + let bucket_scan_req = proto::PbScanReqForBucket { + table_id: ctx.table_info.table_id, + partition_id: self.bucket.partition_id(), + bucket_id: self.bucket.bucket_id(), + limit: None, + }; + let request = ScanKvRequest::new( + None, + Some(bucket_scan_req), + Some(0), + Some(ctx.batch_size_bytes), + Some(false), + ); + + let response = self + .send_with_retry(&ctx, request, MAX_RETRIABLE_RETRIES) + .await?; + + self.handle_response(ctx, response, 0).await + } + + async fn iterate( + &mut self, + ctx: ScanContext, + scanner_id: Vec, + call_seq_id: i32, + ) -> Result> { + let next_seq = call_seq_id + 1; + let request = ScanKvRequest::new( + Some(scanner_id.clone()), + None, + Some(next_seq), + Some(ctx.batch_size_bytes), + Some(false), + ); + + let response = self + .send_with_retry(&ctx, request, MAX_RETRIABLE_RETRIES) + .await?; + + self.handle_response(ctx, response, next_seq).await + } + + async fn handle_response( + &mut self, + ctx: ScanContext, + response: proto::ScanKvResponse, + call_seq_id: i32, + ) -> Result> { + let scanner_id = response.scanner_id.unwrap_or_default(); + let has_more = response.has_more_results.unwrap_or(false); + let raw = response.records.unwrap_or_default().to_vec(); + let log_offset = response.log_offset.unwrap_or(0); + + let batch = decode_kv_batch( + &ctx.table_info, + &ctx.schema_getter, + ctx.projected_fields.as_deref(), + raw, + usize::MAX, + ) + .await?; + + if has_more { + self.state = ScannerState::Active { + ctx, + scanner_id, + call_seq_id, + }; + } else { + self.state = ScannerState::Done; + } + + if batch.num_rows() == 0 && !has_more { + return Ok(None); + } + + Ok(Some(ScanBatch::new(self.bucket.clone(), batch, log_offset))) + } + + async fn send_with_retry( + &self, + ctx: &ScanContext, + request: ScanKvRequest, + max_retries: u32, + ) -> Result { + let mut attempts = 0; + loop { + let leader = ctx + .metadata + .leader_for(&ctx.table_info.table_path, &self.bucket) + .await? + .ok_or_else(|| { + Error::leader_not_available(format!( + "No leader found for table bucket: {}", + self.bucket + )) + })?; + let connection = ctx.rpc_client.get_connection(&leader).await?; + + let req = rebuild_request(&request); + let response = connection.request(req).await?; + + if let Some(error_code) = response.error_code + && error_code != FlussError::None.code() + { + let fluss_error = FlussError::for_code(error_code); + if fluss_error.is_retriable() && attempts < max_retries { + attempts += 1; + let delay_ms = 100u64 * (1u64 << attempts.min(5)); + warn!( + "Retriable error {:?} (code {}) for bucket {}, retry {}/{} after {}ms", + fluss_error, error_code, self.bucket, attempts, max_retries, delay_ms + ); + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + continue; + } + let err: ApiError = ErrorResponse { + error_code, + error_message: response.error_message.clone(), + } + .into(); + return Err(Error::FlussAPIError { api_error: err }); + } + + return Ok(response); + } + } +} + +fn rebuild_request(original: &ScanKvRequest) -> ScanKvRequest { + ScanKvRequest::new( + original.inner_request.scanner_id.clone(), + original.inner_request.bucket_scan_req, + original.inner_request.call_seq_id, + original.inner_request.batch_size_bytes, + original.inner_request.close_scanner, + ) +} + +impl Drop for KvBatchScanner { + fn drop(&mut self) { + if let ScannerState::Active { + ref ctx, + ref scanner_id, + call_seq_id, + } = self.state + { + let rpc_client = ctx.rpc_client.clone(); + let metadata = ctx.metadata.clone(); + let table_path = ctx.table_info.table_path.clone(); + let bucket = self.bucket.clone(); + let scanner_id = scanner_id.clone(); + let close_seq = call_seq_id + 1; + + tokio::spawn(async move { + let leader = match metadata.leader_for(&table_path, &bucket).await { + Ok(Some(leader)) => leader, + _ => return, + }; + let connection = match rpc_client.get_connection(&leader).await { + Ok(c) => c, + Err(_) => return, + }; + let request = + ScanKvRequest::new(Some(scanner_id), None, Some(close_seq), None, Some(true)); + let _ = connection.request(request).await; + }); + } + } +} diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 657a44bf..5206691d 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2; mod append; mod batch_scanner; +mod kv_batch_scanner; mod lookup; mod log_fetch_buffer; @@ -37,6 +38,7 @@ mod upsert; pub use append::{AppendWriter, TableAppend}; pub use batch_scanner::LimitBatchScanner; +pub use kv_batch_scanner::KvBatchScanner; pub use lookup::{LookupResult, Lookuper, PrefixKeyLookuper, TableLookup, TablePrefixLookup}; pub use reader::{RecordBatchLogReader, SyncRecordBatchLogReader}; pub use remote_log::{ diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index b36b0e42..de11a04f 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -20,6 +20,7 @@ use crate::client::connection::FlussConnection; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; use crate::client::table::batch_scanner::LimitBatchScanner; +use crate::client::table::kv_batch_scanner::KvBatchScanner; use crate::client::table::log_fetch_buffer::{ CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel, LogFetchBuffer, RemotePendingFetch, @@ -165,6 +166,54 @@ impl<'a> TableScan<'a> { )) } + /// Creates a stateful unbounded scanner of a PK table bucket using the + /// ScanKv protocol. The first [`KvBatchScanner::next_batch`] opens the + /// server-side scanner; subsequent calls iterate until the bucket is drained. + pub fn create_kv_batch_scanner(self, table_bucket: TableBucket) -> Result { + if !self.table_info.has_primary_key() { + return Err(Error::UnsupportedOperation { + message: "KvBatchScanner is only supported for primary key tables".to_string(), + }); + } + if table_bucket.table_id() != self.table_info.table_id { + return Err(Error::IllegalArgument { + message: format!( + "Bucket table_id {} does not match scan table_id {}", + table_bucket.table_id(), + self.table_info.table_id + ), + }); + } + let num_buckets = self.table_info.get_num_buckets(); + if table_bucket.bucket_id() < 0 || table_bucket.bucket_id() >= num_buckets { + return Err(Error::IllegalArgument { + message: format!( + "Bucket id {} out of range for table with {num_buckets} buckets", + table_bucket.bucket_id() + ), + }); + } + let latest = SchemaInfo::new( + self.table_info.get_schema().clone(), + self.table_info.get_schema_id(), + ); + let schema_getter = Arc::new(ClientSchemaGetter::new( + self.table_info.table_path.clone(), + self.conn.get_admin()?, + latest, + )); + let batch_size_bytes = self.conn.config().scanner_kv_fetch_max_bytes; + Ok(KvBatchScanner::new( + self.conn.get_connections(), + self.metadata.clone(), + self.table_info, + schema_getter, + self.projected_fields, + table_bucket, + batch_size_bytes, + )) + } + /// Projects the scan to only include specified columns by their indices. /// /// # Arguments diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index cad8d9cb..cf7b1766 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -35,6 +35,7 @@ const DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES: i32 = 1; const DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS: i32 = 500; const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024; +const DEFAULT_SCANNER_KV_FETCH_MAX_BYTES: i32 = 4 * 1024 * 1024; const DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET: usize = 5; const DEFAULT_WRITER_BUFFER_MEMORY_SIZE: usize = 64 * 1024 * 1024; // 64MB, matching Java const DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS: u64 = u64::MAX; @@ -138,6 +139,11 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)] pub scanner_log_fetch_max_bytes_for_bucket: i32, + /// Maximum bytes per ScanKv fetch for KvBatchScanner. + /// Default: 4194304 (4MB, matching Java CLIENT_SCANNER_KV_FETCH_MAX_BYTES) + #[arg(long, default_value_t = DEFAULT_SCANNER_KV_FETCH_MAX_BYTES)] + pub scanner_kv_fetch_max_bytes: i32, + /// Whether to enable idempotent writes. When enabled, each batch is tagged with /// a server-allocated writer ID and per-bucket sequence number so the server can /// detect and deduplicate retried batches. @@ -249,6 +255,10 @@ impl std::fmt::Debug for Config { "scanner_log_fetch_max_bytes_for_bucket", &self.scanner_log_fetch_max_bytes_for_bucket, ) + .field( + "scanner_kv_fetch_max_bytes", + &self.scanner_kv_fetch_max_bytes, + ) .field( "scanner_log_fetch_wait_max_time_ms", &self.scanner_log_fetch_wait_max_time_ms, @@ -300,6 +310,7 @@ impl Default for Config { scanner_log_fetch_min_bytes: DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES, scanner_log_fetch_wait_max_time_ms: DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS, scanner_log_fetch_max_bytes_for_bucket: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, + scanner_kv_fetch_max_bytes: DEFAULT_SCANNER_KV_FETCH_MAX_BYTES, writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, writer_enable_idempotence: true, writer_max_inflight_requests_per_bucket: diff --git a/crates/fluss/src/rpc/fluss_api_error.rs b/crates/fluss/src/rpc/fluss_api_error.rs index 418f5443..8adbd6b5 100644 --- a/crates/fluss/src/rpc/fluss_api_error.rs +++ b/crates/fluss/src/rpc/fluss_api_error.rs @@ -171,6 +171,14 @@ pub enum FlussError { InvalidAlterTableException = 56, /// Deletion operations are disabled on this table. DeletionDisabledException = 57, + /// The scanner has expired on the server. + ScannerExpiredException = 66, + /// The scanner id is unknown to the server. + UnknownScannerIdException = 67, + /// The scan request is invalid. + InvalidScanRequestException = 68, + /// Too many scanners are open on the server. + TooManyScanners = 69, } impl FlussError { @@ -195,6 +203,8 @@ impl FlussError { | FlussError::NotEnoughReplicasAfterAppendException | FlussError::NotEnoughReplicasException | FlussError::LeaderNotAvailableException + | FlussError::ScannerExpiredException + | FlussError::TooManyScanners ) } @@ -298,6 +308,10 @@ impl FlussError { FlussError::DeletionDisabledException => { "Deletion operations are disabled on this table." } + FlussError::ScannerExpiredException => "The scanner has expired on the server.", + FlussError::UnknownScannerIdException => "The scanner id is unknown to the server.", + FlussError::InvalidScanRequestException => "The scan request is invalid.", + FlussError::TooManyScanners => "Too many scanners are open on the server.", } } @@ -372,6 +386,10 @@ impl FlussError { 55 => FlussError::IneligibleReplicaException, 56 => FlussError::InvalidAlterTableException, 57 => FlussError::DeletionDisabledException, + 66 => FlussError::ScannerExpiredException, + 67 => FlussError::UnknownScannerIdException, + 68 => FlussError::InvalidScanRequestException, + 69 => FlussError::TooManyScanners, _ => FlussError::UnknownServerError, } } @@ -473,6 +491,8 @@ mod tests { FlussError::NotEnoughReplicasAfterAppendException, FlussError::NotEnoughReplicasException, FlussError::LeaderNotAvailableException, + FlussError::ScannerExpiredException, + FlussError::TooManyScanners, ]; for err in &retriable { assert!(err.is_retriable(), "{err:?} should be retriable"); @@ -493,6 +513,8 @@ mod tests { FlussError::FencedLeaderEpochException, FlussError::FencedTieringEpochException, FlussError::RetriableAuthenticateException, + FlussError::UnknownScannerIdException, + FlussError::InvalidScanRequestException, ]; for err in &non_retriable { assert!(!err.is_retriable(), "{err:?} should not be retriable"); diff --git a/crates/fluss/tests/integration/batch_scanner.rs b/crates/fluss/tests/integration/batch_scanner.rs index 443d0518..ba3bf901 100644 --- a/crates/fluss/tests/integration/batch_scanner.rs +++ b/crates/fluss/tests/integration/batch_scanner.rs @@ -379,6 +379,235 @@ mod batch_scanner_test { assert!(table.new_scan().limit(-5).is_err()); } + // ---- KvBatchScanner tests (ScanKv protocol) -------------------------------- + + /// Full scan of a PK table bucket: upsert rows, scan without limit, verify + /// all rows are returned. + #[tokio::test] + async fn kv_batch_scanner_reads_all_rows() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_kv_batch_scan_all"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id"]) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let writer = table + .new_upsert() + .expect("upsert") + .create_writer() + .expect("writer"); + + let expected: HashMap = (1..=10) + .map(|i| { + ( + i, + ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"][i as usize - 1], + ) + }) + .collect(); + for (&id, &name) in &expected { + let mut row = GenericRow::new(2); + row.set_field(0, id); + row.set_field(1, name); + writer.upsert(&row).expect("upsert row"); + } + writer.flush().await.expect("flush"); + + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .create_kv_batch_scanner(bucket.clone()) + .expect("create kv batch scanner"); + + let batches = match scanner.collect_all_batches().await { + Ok(b) => b, + // ScanKv (API 1061) is not supported on every server build + // (e.g. 0.9.x); tolerate the server's "unsupported" signal rather + // than failing the suite. Any other error is a real failure. + Err(fluss::error::Error::UnsupportedVersion { .. }) => return, + Err(e) => panic!("collect: {e}"), + }; + let total_rows: usize = batches.iter().map(|b| b.num_records()).sum(); + assert_eq!(total_rows, 10, "should return all 10 upserted rows"); + + let mut found = HashMap::new(); + for batch in &batches { + let rows = batch.batch(); + let ids = rows + .column(0) + .as_any() + .downcast_ref::() + .expect("id column"); + let names = rows + .column(1) + .as_any() + .downcast_ref::() + .expect("name column"); + for i in 0..rows.num_rows() { + found.insert(ids.value(i), names.value(i).to_string()); + } + } + for (&id, &name) in &expected { + assert_eq!( + found.get(&id).map(|s| s.as_str()), + Some(name), + "missing or wrong row for id={id}" + ); + } + } + + /// KvBatchScanner on an empty PK table returns no batches. + #[tokio::test] + async fn kv_batch_scanner_empty_table() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_kv_batch_scan_empty"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("val", DataTypes::string()) + .primary_key(vec!["id"]) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .create_kv_batch_scanner(bucket) + .expect("create kv batch scanner"); + + let batches = match scanner.collect_all_batches().await { + Ok(b) => b, + // ScanKv (API 1061) is not supported on every server build + // (e.g. 0.9.x); tolerate the server's "unsupported" signal rather + // than failing the suite. Any other error is a real failure. + Err(fluss::error::Error::UnsupportedVersion { .. }) => return, + Err(e) => panic!("collect: {e}"), + }; + let total_rows: usize = batches.iter().map(|b| b.num_records()).sum(); + assert_eq!(total_rows, 0, "empty table should yield 0 rows"); + } + + /// KvBatchScanner with column projection. + #[tokio::test] + async fn kv_batch_scanner_with_projection() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_kv_batch_scan_proj"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id"]) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let writer = table + .new_upsert() + .expect("upsert") + .create_writer() + .expect("writer"); + + for i in 1..=3 { + let mut row = GenericRow::new(3); + row.set_field(0, i); + row.set_field(1, format!("name_{i}")); + row.set_field(2, (i as i64) * 100); + writer.upsert(&row).expect("upsert"); + } + writer.flush().await.expect("flush"); + + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .project(&[0, 2]) + .expect("project") + .create_kv_batch_scanner(bucket) + .expect("kv batch scanner"); + + let batches = match scanner.collect_all_batches().await { + Ok(b) => b, + // ScanKv (API 1061) is not supported on every server build + // (e.g. 0.9.x); tolerate the server's "unsupported" signal rather + // than failing the suite. Any other error is a real failure. + Err(fluss::error::Error::UnsupportedVersion { .. }) => return, + Err(e) => panic!("collect: {e}"), + }; + for batch in &batches { + let rows = batch.batch(); + assert_eq!(rows.num_columns(), 2, "projected to id + score"); + assert_eq!(rows.schema().field(0).name(), "id"); + assert_eq!(rows.schema().field(1).name(), "score"); + } + } + + /// KvBatchScanner must reject log tables (no primary key). + #[tokio::test] + async fn kv_batch_scanner_rejects_log_table() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_kv_batch_scan_log_reject"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let bucket = TableBucket::new(table.get_table_info().table_id, 0); + assert!( + table.new_scan().create_kv_batch_scanner(bucket).is_err(), + "must reject non-PK table" + ); + } + /// A configured limit must be rejected by the log scanners rather than /// silently ignored. #[tokio::test] From d9167a346f92188bb5cefa2d50c2401c696f71bf Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 09:57:22 +0800 Subject: [PATCH 9/9] Fix bindings compilation for KvBatchScanner additions - C++ binding: add scanner_kv_fetch_max_bytes field to FfiConfig and header - Elixir binding: add match arms for 4 new FlussError variants (ScannerExpiredException, UnknownScannerIdException, InvalidScanRequestException, TooManyScanners) Co-Authored-By: Claude Opus 4.7 --- bindings/cpp/include/fluss.hpp | 2 ++ bindings/cpp/src/lib.rs | 2 ++ bindings/elixir/native/fluss_nif/src/atoms.rs | 8 ++++++++ 3 files changed, 12 insertions(+) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 6987e683..7c2daa67 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1421,6 +1421,8 @@ struct Configuration { size_t lookup_max_inflight_requests{128}; // Maximum number of lookup retries int32_t lookup_max_retries{std::numeric_limits::max()}; + // Maximum bytes to fetch per KV scanner batch request + int32_t scanner_kv_fetch_max_bytes{4 * 1024 * 1024}; }; class Connection { diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 2d62136a..6523c2a8 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -78,6 +78,7 @@ mod ffi { lookup_batch_timeout_ms: u64, lookup_max_inflight_requests: usize, lookup_max_retries: i32, + scanner_kv_fetch_max_bytes: i32, } struct FfiResult { @@ -982,6 +983,7 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult { lookup_batch_timeout_ms: config.lookup_batch_timeout_ms, lookup_max_inflight_requests: config.lookup_max_inflight_requests, lookup_max_retries: config.lookup_max_retries, + scanner_kv_fetch_max_bytes: config.scanner_kv_fetch_max_bytes, }; let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config_core).await }); diff --git a/bindings/elixir/native/fluss_nif/src/atoms.rs b/bindings/elixir/native/fluss_nif/src/atoms.rs index 45d5aa30..60f2e97d 100644 --- a/bindings/elixir/native/fluss_nif/src/atoms.rs +++ b/bindings/elixir/native/fluss_nif/src/atoms.rs @@ -100,6 +100,10 @@ rustler::atoms! { ineligible_replica_exception, invalid_alter_table_exception, deletion_disabled_exception, + scanner_expired_exception, + unknown_scanner_id_exception, + invalid_scan_request_exception, + too_many_scanners, client_error, } @@ -212,6 +216,10 @@ fn api_error_atom(code: i32) -> Atom { FlussError::IneligibleReplicaException => ineligible_replica_exception(), FlussError::InvalidAlterTableException => invalid_alter_table_exception(), FlussError::DeletionDisabledException => deletion_disabled_exception(), + FlussError::ScannerExpiredException => scanner_expired_exception(), + FlussError::UnknownScannerIdException => unknown_scanner_id_exception(), + FlussError::InvalidScanRequestException => invalid_scan_request_exception(), + FlussError::TooManyScanners => too_many_scanners(), } }