diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 6987e683..7c2daa67 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1421,6 +1421,8 @@ struct Configuration { size_t lookup_max_inflight_requests{128}; // Maximum number of lookup retries int32_t lookup_max_retries{std::numeric_limits::max()}; + // Maximum bytes to fetch per KV scanner batch request + int32_t scanner_kv_fetch_max_bytes{4 * 1024 * 1024}; }; class Connection { diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 2d62136a..6523c2a8 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -78,6 +78,7 @@ mod ffi { lookup_batch_timeout_ms: u64, lookup_max_inflight_requests: usize, lookup_max_retries: i32, + scanner_kv_fetch_max_bytes: i32, } struct FfiResult { @@ -982,6 +983,7 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult { lookup_batch_timeout_ms: config.lookup_batch_timeout_ms, lookup_max_inflight_requests: config.lookup_max_inflight_requests, lookup_max_retries: config.lookup_max_retries, + scanner_kv_fetch_max_bytes: config.scanner_kv_fetch_max_bytes, }; let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config_core).await }); diff --git a/bindings/elixir/native/fluss_nif/src/atoms.rs b/bindings/elixir/native/fluss_nif/src/atoms.rs index 45d5aa30..60f2e97d 100644 --- a/bindings/elixir/native/fluss_nif/src/atoms.rs +++ b/bindings/elixir/native/fluss_nif/src/atoms.rs @@ -100,6 +100,10 @@ rustler::atoms! { ineligible_replica_exception, invalid_alter_table_exception, deletion_disabled_exception, + scanner_expired_exception, + unknown_scanner_id_exception, + invalid_scan_request_exception, + too_many_scanners, client_error, } @@ -212,6 +216,10 @@ fn api_error_atom(code: i32) -> Atom { FlussError::IneligibleReplicaException => ineligible_replica_exception(), FlussError::InvalidAlterTableException => invalid_alter_table_exception(), FlussError::DeletionDisabledException => deletion_disabled_exception(), + FlussError::ScannerExpiredException => scanner_expired_exception(), + FlussError::UnknownScannerIdException => unknown_scanner_id_exception(), + FlussError::InvalidScanRequestException => invalid_scan_request_exception(), + FlussError::TooManyScanners => too_many_scanners(), } } diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index 821ee52e..18a37728 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -41,6 +41,9 @@ storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] storage-oss = ["opendal/services-oss"] integration_tests = [] +# Gates tests that exercise APIs only available on Fluss 1.x servers. +# Enable alongside `integration_tests` when running against a 1.x server image. +fluss_v1 = [] [dependencies] arrow = { workspace = true } diff --git a/crates/fluss/src/client/table/batch_scanner.rs b/crates/fluss/src/client/table/batch_scanner.rs index 090a7428..5e19202b 100644 --- a/crates/fluss/src/client/table/batch_scanner.rs +++ b/crates/fluss/src/client/table/batch_scanner.rs @@ -219,10 +219,10 @@ fn decode_log_batch( )) } -/// Decode a KV limit-scan [`ValueRecordBatch`] into a single Arrow -/// `RecordBatch`, decoding each record by its own schema id and projecting onto -/// the current schema. -async fn decode_kv_batch( +/// Decode a KV [`ValueRecordBatch`] into a single Arrow `RecordBatch`, +/// decoding each record by its own schema id and projecting onto the current +/// schema. +pub(super) async fn decode_kv_batch( table_info: &TableInfo, schema_getter: &ClientSchemaGetter, projected_fields: Option<&[usize]>, @@ -279,7 +279,7 @@ async fn decode_kv_batch( /// Build one [`FixedSchemaDecoder`] per distinct schema id. The current schema /// decodes without projection; older schemas are fetched and projected onto the /// current schema. -async fn build_kv_decoders( +pub(super) async fn build_kv_decoders( schema_getter: &ClientSchemaGetter, target_schema: &Schema, target_schema_id: i16, @@ -304,7 +304,7 @@ async fn build_kv_decoders( /// Decode every value record into a row shaped by `target_row_type`, build a /// single Arrow batch, keep the last `limit` rows, then apply column projection. -fn value_records_to_record_batch( +pub(super) fn value_records_to_record_batch( batch: &ValueRecordBatch, ranges: &[Range], decoders: &HashMap, @@ -332,7 +332,7 @@ fn value_records_to_record_batch( } /// Read the leading little-endian schema id from a `[schema_id | row]` payload. -fn read_schema_id(payload: &[u8]) -> Result { +pub(super) fn read_schema_id(payload: &[u8]) -> Result { if payload.len() < SCHEMA_ID_LENGTH { return Err(Error::UnexpectedError { message: format!( @@ -366,7 +366,7 @@ fn take_last_rows(batch: RecordBatch, base_offset: i64, limit: usize) -> (Record } /// An empty `RecordBatch` with the (optionally projected) target schema. -fn empty_record_batch( +pub(super) fn empty_record_batch( target_row_type: &RowType, projected_fields: Option<&[usize]>, ) -> Result { @@ -375,7 +375,7 @@ fn empty_record_batch( } /// Project `batch` (shaped by `target_row_type`) onto the requested columns. -fn project_batch( +pub(super) fn project_batch( batch: RecordBatch, target_row_type: &RowType, projected_fields: Option<&[usize]>, diff --git a/crates/fluss/src/client/table/kv_batch_scanner.rs b/crates/fluss/src/client/table/kv_batch_scanner.rs new file mode 100644 index 00000000..a1d14589 --- /dev/null +++ b/crates/fluss/src/client/table/kv_batch_scanner.rs @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::ClientSchemaGetter; +use crate::client::metadata::Metadata; +use crate::client::table::batch_scanner::decode_kv_batch; +use crate::error::{ApiError, Error, FlussError, Result}; +use crate::metadata::{TableBucket, TableInfo}; +use crate::proto::{self, ErrorResponse}; +use crate::record::ScanBatch; +use crate::rpc::RpcClient; +use crate::rpc::message::ScanKvRequest; +use log::warn; +use std::sync::Arc; + +/// Maximum retry attempts for retriable server errors (e.g. leader election +/// races on a freshly created bucket, transient `TooManyScanners`). +const MAX_RETRIABLE_RETRIES: u32 = 5; + +/// Stateful scanner for full KV table scans using the ScanKv API (1061). +/// +/// The server maintains a cursor: the first `next_batch()` opens the scanner, +/// subsequent calls iterate, and dropping the scanner sends a best-effort close. +pub struct KvBatchScanner { + bucket: TableBucket, + state: ScannerState, +} + +enum ScannerState { + Pending(ScanContext), + Active { + ctx: ScanContext, + scanner_id: Vec, + call_seq_id: i32, + }, + Done, +} + +struct ScanContext { + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + schema_getter: Arc, + projected_fields: Option>, + batch_size_bytes: i32, +} + +impl KvBatchScanner { + pub(super) fn new( + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + schema_getter: Arc, + projected_fields: Option>, + bucket: TableBucket, + batch_size_bytes: i32, + ) -> Self { + Self { + bucket, + state: ScannerState::Pending(ScanContext { + rpc_client, + metadata, + table_info, + schema_getter, + projected_fields, + batch_size_bytes, + }), + } + } + + pub async fn next_batch(&mut self) -> Result> { + match std::mem::replace(&mut self.state, ScannerState::Done) { + ScannerState::Done => Ok(None), + ScannerState::Pending(ctx) => self.open_scanner(ctx).await, + ScannerState::Active { + ctx, + scanner_id, + call_seq_id, + } => self.iterate(ctx, scanner_id, call_seq_id).await, + } + } + + pub async fn collect_all_batches(&mut self) -> Result> { + let mut batches = Vec::new(); + while let Some(batch) = self.next_batch().await? { + batches.push(batch); + } + Ok(batches) + } + + pub fn bucket(&self) -> &TableBucket { + &self.bucket + } + + async fn open_scanner(&mut self, ctx: ScanContext) -> Result> { + let bucket_scan_req = proto::PbScanReqForBucket { + table_id: ctx.table_info.table_id, + partition_id: self.bucket.partition_id(), + bucket_id: self.bucket.bucket_id(), + limit: None, + }; + let request = ScanKvRequest::new( + None, + Some(bucket_scan_req), + Some(0), + Some(ctx.batch_size_bytes), + Some(false), + ); + + let response = self + .send_with_retry(&ctx, request, MAX_RETRIABLE_RETRIES) + .await?; + + self.handle_response(ctx, response, 0).await + } + + async fn iterate( + &mut self, + ctx: ScanContext, + scanner_id: Vec, + call_seq_id: i32, + ) -> Result> { + let next_seq = call_seq_id + 1; + let request = ScanKvRequest::new( + Some(scanner_id.clone()), + None, + Some(next_seq), + Some(ctx.batch_size_bytes), + Some(false), + ); + + let response = self + .send_with_retry(&ctx, request, MAX_RETRIABLE_RETRIES) + .await?; + + self.handle_response(ctx, response, next_seq).await + } + + async fn handle_response( + &mut self, + ctx: ScanContext, + response: proto::ScanKvResponse, + call_seq_id: i32, + ) -> Result> { + let scanner_id = response.scanner_id.unwrap_or_default(); + let has_more = response.has_more_results.unwrap_or(false); + let raw = response.records.unwrap_or_default().to_vec(); + let log_offset = response.log_offset.unwrap_or(0); + + let batch = decode_kv_batch( + &ctx.table_info, + &ctx.schema_getter, + ctx.projected_fields.as_deref(), + raw, + usize::MAX, + ) + .await?; + + if has_more { + self.state = ScannerState::Active { + ctx, + scanner_id, + call_seq_id, + }; + } else { + self.state = ScannerState::Done; + } + + if batch.num_rows() == 0 && !has_more { + return Ok(None); + } + + Ok(Some(ScanBatch::new(self.bucket.clone(), batch, log_offset))) + } + + async fn send_with_retry( + &self, + ctx: &ScanContext, + request: ScanKvRequest, + max_retries: u32, + ) -> Result { + let mut attempts = 0; + loop { + let leader = ctx + .metadata + .leader_for(&ctx.table_info.table_path, &self.bucket) + .await? + .ok_or_else(|| { + Error::leader_not_available(format!( + "No leader found for table bucket: {}", + self.bucket + )) + })?; + let connection = ctx.rpc_client.get_connection(&leader).await?; + + let req = rebuild_request(&request); + let response = connection.request(req).await?; + + if let Some(error_code) = response.error_code + && error_code != FlussError::None.code() + { + let fluss_error = FlussError::for_code(error_code); + if fluss_error.is_retriable() && attempts < max_retries { + attempts += 1; + let delay_ms = 100u64 * (1u64 << attempts.min(5)); + warn!( + "Retriable error {:?} (code {}) for bucket {}, retry {}/{} after {}ms", + fluss_error, error_code, self.bucket, attempts, max_retries, delay_ms + ); + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + continue; + } + let err: ApiError = ErrorResponse { + error_code, + error_message: response.error_message.clone(), + } + .into(); + return Err(Error::FlussAPIError { api_error: err }); + } + + return Ok(response); + } + } +} + +fn rebuild_request(original: &ScanKvRequest) -> ScanKvRequest { + ScanKvRequest::new( + original.inner_request.scanner_id.clone(), + original.inner_request.bucket_scan_req, + original.inner_request.call_seq_id, + original.inner_request.batch_size_bytes, + original.inner_request.close_scanner, + ) +} + +impl Drop for KvBatchScanner { + fn drop(&mut self) { + if let ScannerState::Active { + ref ctx, + ref scanner_id, + call_seq_id, + } = self.state + { + let rpc_client = ctx.rpc_client.clone(); + let metadata = ctx.metadata.clone(); + let table_path = ctx.table_info.table_path.clone(); + let bucket = self.bucket.clone(); + let scanner_id = scanner_id.clone(); + let close_seq = call_seq_id + 1; + + tokio::spawn(async move { + let leader = match metadata.leader_for(&table_path, &bucket).await { + Ok(Some(leader)) => leader, + _ => return, + }; + let connection = match rpc_client.get_connection(&leader).await { + Ok(c) => c, + Err(_) => return, + }; + let request = + ScanKvRequest::new(Some(scanner_id), None, Some(close_seq), None, Some(true)); + let _ = connection.request(request).await; + }); + } + } +} diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 657a44bf..5206691d 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2; mod append; mod batch_scanner; +mod kv_batch_scanner; mod lookup; mod log_fetch_buffer; @@ -37,6 +38,7 @@ mod upsert; pub use append::{AppendWriter, TableAppend}; pub use batch_scanner::LimitBatchScanner; +pub use kv_batch_scanner::KvBatchScanner; pub use lookup::{LookupResult, Lookuper, PrefixKeyLookuper, TableLookup, TablePrefixLookup}; pub use reader::{RecordBatchLogReader, SyncRecordBatchLogReader}; pub use remote_log::{ diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index b36b0e42..de11a04f 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -20,6 +20,7 @@ use crate::client::connection::FlussConnection; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; use crate::client::table::batch_scanner::LimitBatchScanner; +use crate::client::table::kv_batch_scanner::KvBatchScanner; use crate::client::table::log_fetch_buffer::{ CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel, LogFetchBuffer, RemotePendingFetch, @@ -165,6 +166,54 @@ impl<'a> TableScan<'a> { )) } + /// Creates a stateful unbounded scanner of a PK table bucket using the + /// ScanKv protocol. The first [`KvBatchScanner::next_batch`] opens the + /// server-side scanner; subsequent calls iterate until the bucket is drained. + pub fn create_kv_batch_scanner(self, table_bucket: TableBucket) -> Result { + if !self.table_info.has_primary_key() { + return Err(Error::UnsupportedOperation { + message: "KvBatchScanner is only supported for primary key tables".to_string(), + }); + } + if table_bucket.table_id() != self.table_info.table_id { + return Err(Error::IllegalArgument { + message: format!( + "Bucket table_id {} does not match scan table_id {}", + table_bucket.table_id(), + self.table_info.table_id + ), + }); + } + let num_buckets = self.table_info.get_num_buckets(); + if table_bucket.bucket_id() < 0 || table_bucket.bucket_id() >= num_buckets { + return Err(Error::IllegalArgument { + message: format!( + "Bucket id {} out of range for table with {num_buckets} buckets", + table_bucket.bucket_id() + ), + }); + } + let latest = SchemaInfo::new( + self.table_info.get_schema().clone(), + self.table_info.get_schema_id(), + ); + let schema_getter = Arc::new(ClientSchemaGetter::new( + self.table_info.table_path.clone(), + self.conn.get_admin()?, + latest, + )); + let batch_size_bytes = self.conn.config().scanner_kv_fetch_max_bytes; + Ok(KvBatchScanner::new( + self.conn.get_connections(), + self.metadata.clone(), + self.table_info, + schema_getter, + self.projected_fields, + table_bucket, + batch_size_bytes, + )) + } + /// Projects the scan to only include specified columns by their indices. /// /// # Arguments diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index cad8d9cb..cf7b1766 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -35,6 +35,7 @@ const DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES: i32 = 1; const DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS: i32 = 500; const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024; +const DEFAULT_SCANNER_KV_FETCH_MAX_BYTES: i32 = 4 * 1024 * 1024; const DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET: usize = 5; const DEFAULT_WRITER_BUFFER_MEMORY_SIZE: usize = 64 * 1024 * 1024; // 64MB, matching Java const DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS: u64 = u64::MAX; @@ -138,6 +139,11 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)] pub scanner_log_fetch_max_bytes_for_bucket: i32, + /// Maximum bytes per ScanKv fetch for KvBatchScanner. + /// Default: 4194304 (4MB, matching Java CLIENT_SCANNER_KV_FETCH_MAX_BYTES) + #[arg(long, default_value_t = DEFAULT_SCANNER_KV_FETCH_MAX_BYTES)] + pub scanner_kv_fetch_max_bytes: i32, + /// Whether to enable idempotent writes. When enabled, each batch is tagged with /// a server-allocated writer ID and per-bucket sequence number so the server can /// detect and deduplicate retried batches. @@ -249,6 +255,10 @@ impl std::fmt::Debug for Config { "scanner_log_fetch_max_bytes_for_bucket", &self.scanner_log_fetch_max_bytes_for_bucket, ) + .field( + "scanner_kv_fetch_max_bytes", + &self.scanner_kv_fetch_max_bytes, + ) .field( "scanner_log_fetch_wait_max_time_ms", &self.scanner_log_fetch_wait_max_time_ms, @@ -300,6 +310,7 @@ impl Default for Config { scanner_log_fetch_min_bytes: DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES, scanner_log_fetch_wait_max_time_ms: DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS, scanner_log_fetch_max_bytes_for_bucket: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, + scanner_kv_fetch_max_bytes: DEFAULT_SCANNER_KV_FETCH_MAX_BYTES, writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, writer_enable_idempotence: true, writer_max_inflight_requests_per_bucket: diff --git a/crates/fluss/src/rpc/fluss_api_error.rs b/crates/fluss/src/rpc/fluss_api_error.rs index 418f5443..8adbd6b5 100644 --- a/crates/fluss/src/rpc/fluss_api_error.rs +++ b/crates/fluss/src/rpc/fluss_api_error.rs @@ -171,6 +171,14 @@ pub enum FlussError { InvalidAlterTableException = 56, /// Deletion operations are disabled on this table. DeletionDisabledException = 57, + /// The scanner has expired on the server. + ScannerExpiredException = 66, + /// The scanner id is unknown to the server. + UnknownScannerIdException = 67, + /// The scan request is invalid. + InvalidScanRequestException = 68, + /// Too many scanners are open on the server. + TooManyScanners = 69, } impl FlussError { @@ -195,6 +203,8 @@ impl FlussError { | FlussError::NotEnoughReplicasAfterAppendException | FlussError::NotEnoughReplicasException | FlussError::LeaderNotAvailableException + | FlussError::ScannerExpiredException + | FlussError::TooManyScanners ) } @@ -298,6 +308,10 @@ impl FlussError { FlussError::DeletionDisabledException => { "Deletion operations are disabled on this table." } + FlussError::ScannerExpiredException => "The scanner has expired on the server.", + FlussError::UnknownScannerIdException => "The scanner id is unknown to the server.", + FlussError::InvalidScanRequestException => "The scan request is invalid.", + FlussError::TooManyScanners => "Too many scanners are open on the server.", } } @@ -372,6 +386,10 @@ impl FlussError { 55 => FlussError::IneligibleReplicaException, 56 => FlussError::InvalidAlterTableException, 57 => FlussError::DeletionDisabledException, + 66 => FlussError::ScannerExpiredException, + 67 => FlussError::UnknownScannerIdException, + 68 => FlussError::InvalidScanRequestException, + 69 => FlussError::TooManyScanners, _ => FlussError::UnknownServerError, } } @@ -473,6 +491,8 @@ mod tests { FlussError::NotEnoughReplicasAfterAppendException, FlussError::NotEnoughReplicasException, FlussError::LeaderNotAvailableException, + FlussError::ScannerExpiredException, + FlussError::TooManyScanners, ]; for err in &retriable { assert!(err.is_retriable(), "{err:?} should be retriable"); @@ -493,6 +513,8 @@ mod tests { FlussError::FencedLeaderEpochException, FlussError::FencedTieringEpochException, FlussError::RetriableAuthenticateException, + FlussError::UnknownScannerIdException, + FlussError::InvalidScanRequestException, ]; for err in &non_retriable { assert!(!err.is_retriable(), "{err:?} should not be retriable"); diff --git a/crates/fluss/tests/integration/admin_extended.rs b/crates/fluss/tests/integration/admin_extended.rs new file mode 100644 index 00000000..63e627fb --- /dev/null +++ b/crates/fluss/tests/integration/admin_extended.rs @@ -0,0 +1,778 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Extended admin integration tests covering APIs available on both Fluss +//! 0.9.x and 1.x servers but not exercised by `admin.rs`. +//! +//! These tests run against the shared cluster gated behind `integration_tests`. +//! Some admin operations have semantics that depend on optional cluster +//! configuration (lake storage, authorization). For those the tests assert the +//! request/response roundtrip succeeds and tolerate a *structured* server-side +//! error (a decoded [`FlussError`]) while still failing on transport/decoding +//! errors — which is what a Rust-client integration test must guard. + +#[cfg(test)] +mod admin_extended_test { + use crate::integration::utils::get_shared_cluster; + use fluss::error::FlussError; + use fluss::metadata::{ + AclFilter, AclInfo, AddColumn, AlterConfig, AlterConfigOpType, AlterTableChanges, + BucketOffset, BucketStatsRequest, ColumnPositionType, DataTypes, DatabaseDescriptorBuilder, + JsonSerde, KvFormat, KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable, LogFormat, + OperationType, PermissionType, ProducerTableOffsets, ResourceType, Schema, ServerTag, + TableDescriptor, TablePath, + }; + + /// Asserts an error decoded into one of the `allowed` Fluss API errors. + /// Panics (failing the test) on a transport/decoding error or an unexpected + /// API error code. + fn assert_expected_api_error(error: fluss::error::Error, allowed: &[FlussError]) { + match error.api_error() { + Some(code) if allowed.contains(&code) => {} + other => { + panic!("Expected one of {allowed:?} but got {other:?} (full error: {error:?})") + } + } + } + + /// Builds a simple non-partitioned log table descriptor (id INT, name STRING). + fn simple_log_table() -> TableDescriptor { + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .build() + .expect("build schema"); + TableDescriptor::builder() + .schema(schema) + .distributed_by(Some(1), vec![]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .build() + .expect("build table descriptor") + } + + /// Builds a simple primary-key/KV table descriptor (id INT PK, name STRING). + fn simple_kv_table() -> TableDescriptor { + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("build schema"); + TableDescriptor::builder() + .schema(schema) + .distributed_by(Some(1), vec!["id".to_string()]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .kv_format(KvFormat::COMPACTED) + .build() + .expect("build table descriptor") + } + + // --------------------------------------------------------------------- + // Group A: Database listing & summaries + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_list_databases() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_a = "test_list_databases_a"; + let db_b = "test_list_databases_b"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + + admin + .create_database(db_a, Some(&descriptor), true) + .await + .unwrap(); + admin + .create_database(db_b, Some(&descriptor), true) + .await + .unwrap(); + + let databases = admin.list_databases().await.expect("should list databases"); + assert!( + databases.iter().any(|d| d == db_a), + "Expected {db_a} in {databases:?}" + ); + assert!( + databases.iter().any(|d| d == db_b), + "Expected {db_b} in {databases:?}" + ); + + admin.drop_database(db_a, true, true).await.unwrap(); + admin.drop_database(db_b, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_list_database_summaries() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_list_db_summaries"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_descriptor = simple_log_table(); + for table in ["summary_t1", "summary_t2"] { + admin + .create_table(&TablePath::new(db_name, table), &table_descriptor, true) + .await + .unwrap(); + } + + let summaries = admin + .list_database_summaries() + .await + .expect("should list database summaries"); + + let summary = summaries + .iter() + .find(|s| s.database_name == db_name) + .unwrap_or_else(|| panic!("Expected summary for {db_name} in {summaries:?}")); + + assert_eq!( + summary.table_count, 2, + "Database {db_name} should report 2 tables" + ); + assert!( + summary.created_time > 0, + "created_time should be positive, got {}", + summary.created_time + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group B: Schema operations + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_table_schema() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_get_table_schema_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "schema_table"); + let table_descriptor = simple_kv_table(); + admin + .create_table(&table_path, &table_descriptor, true) + .await + .unwrap(); + + // Request the latest schema (schema_id = None). + let schema_info = admin + .get_table_schema(&table_path, None) + .await + .expect("should get latest table schema"); + assert!( + schema_info.schema_id() > 0, + "schema_id should be positive, got {}", + schema_info.schema_id() + ); + assert_eq!( + schema_info.schema().columns().len(), + 2, + "schema should have 2 columns" + ); + + // Request the same schema by explicit id. + let by_id = admin + .get_table_schema(&table_path, Some(schema_info.schema_id())) + .await + .expect("should get table schema by id"); + assert_eq!(by_id.schema_id(), schema_info.schema_id()); + assert_eq!(by_id.schema().columns().len(), 2); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group C: Alter operations + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_alter_table_add_column() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_alter_table_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "alter_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + + // Add a nullable "email" column at the end of the schema. + let data_type_json = serde_json::to_vec( + &DataTypes::string() + .serialize_json() + .expect("serialize STRING type"), + ) + .expect("encode data_type_json"); + let add_column = AddColumn { + column_name: "email".to_string(), + data_type_json, + comment: Some("user email".to_string()), + position: ColumnPositionType::Last, + }; + + admin + .alter_table( + &table_path, + false, + AlterTableChanges { + add_columns: vec![add_column], + ..Default::default() + }, + ) + .await + .expect("should add column"); + + let schema_info = admin + .get_table_schema(&table_path, None) + .await + .expect("should get schema after alter"); + assert_eq!( + schema_info.schema().columns().len(), + 3, + "schema should have 3 columns after adding email" + ); + assert!( + schema_info + .schema() + .columns() + .iter() + .any(|c| c.name() == "email"), + "schema should contain the new 'email' column" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_alter_database() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_alter_database_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let config_change = AlterConfig::new( + "custom.key", + Some("custom-value".to_string()), + AlterConfigOpType::Set, + ); + // AlterDatabase is not implemented on every server build (e.g. 0.9.x). + // Accept the server's "unsupported" signal but never a transport failure. + match admin + .alter_database(db_name, vec![config_change], None, false) + .await + { + Ok(()) => { + // Altering a non-existent database with ignore_if_not_exists = true is a no-op. + admin + .alter_database("no_such_db_for_alter", vec![], None, true) + .await + .expect("altering missing db with ignore flag should succeed"); + } + Err(fluss::error::Error::UnsupportedVersion { .. }) => {} + Err(error) => panic!("unexpected error from alter_database: {error:?}"), + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group D: Cluster configuration + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_describe_cluster_configs() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let configs = admin + .describe_cluster_configs() + .await + .expect("should describe cluster configs"); + + for config in &configs { + assert!( + !config.config_key.is_empty(), + "config_key should not be empty" + ); + assert!( + !config.config_source.is_empty(), + "config_source should not be empty for {}", + config.config_key + ); + } + } + + #[tokio::test] + async fn test_alter_cluster_configs() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // Read an existing config that has a value, then SET it to the same value. + // This exercises the write path without changing cluster behaviour. + let described = admin + .describe_cluster_configs() + .await + .expect("should describe cluster configs"); + + let Some(existing) = described.iter().find(|c| c.config_value.is_some()) else { + // No config with a value to round-trip; nothing to assert. + return; + }; + + let alter = AlterConfig::new( + existing.config_key.clone(), + existing.config_value.clone(), + AlterConfigOpType::Set, + ); + + // Many keys are not dynamically alterable; the server rejects those with + // either InvalidConfigException or a generic "not allowed to be changed + // dynamically" error. Either way the request/response roundtrip worked. + if let Err(error) = admin.alter_cluster_configs(vec![alter]).await { + assert_expected_api_error( + error, + &[ + FlussError::InvalidConfigException, + FlussError::UnknownServerError, + ], + ); + } + } + + // --------------------------------------------------------------------- + // Group E: Table statistics + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_table_stats() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_get_table_stats_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "stats_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let buckets_req = vec![BucketStatsRequest::new(None, 0)]; + // GetTableStats is not implemented on every server build. Accept either a + // decoded response or the server's "unsupported" signal, but never a + // transport/decoding failure. + match admin.get_table_stats(table_id, buckets_req, vec![]).await { + Ok(response) => { + for bucket in &response.buckets { + // Per-bucket entries echo the requested bucket id. + assert_eq!( + bucket.bucket_id, 0, + "unexpected bucket id in stats response" + ); + } + } + Err(fluss::error::Error::UnsupportedVersion { .. }) => {} + Err(error) => panic!("unexpected error from get_table_stats: {error:?}"), + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group F: KV snapshot operations (0.9.x) + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_latest_kv_snapshots() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_latest_kv_snapshots_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "latest_kv_snapshots_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let response = admin + .get_latest_kv_snapshots(&table_path, None) + .await + .expect("should get latest kv snapshots"); + assert_eq!( + response.table_id, table_id, + "response table_id should match the requested table" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + #[tokio::test] + async fn test_kv_snapshot_lease_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_kv_snapshot_lease_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "lease_table"); + admin + .create_table(&table_path, &simple_kv_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + // A fresh table has no snapshot to lease, so the requested snapshot is + // reported back as unavailable. The RPC itself must still succeed. + let lease = KvSnapshotLeaseForTable { + table_id, + bucket_snapshots: vec![KvSnapshotLeaseForBucket { + partition_id: None, + bucket_id: 0, + snapshot_id: 0, + }], + }; + let response = admin + .create_kv_snapshot_lease("test-lease-id", 60_000, vec![lease]) + .await + .expect("should acquire kv snapshot lease"); + + // The fresh-table snapshot is unavailable; just confirm the response decodes. + let _ = response.unavailable_snapshots; + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group G: Server tags + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_server_tag_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let nodes = admin.get_server_nodes().await.expect("should get nodes"); + let tablet_id = nodes + .iter() + .find(|n| *n.server_type() == fluss::ServerType::TabletServer) + .map(|n| n.id()) + .expect("expected a tablet server node"); + + // Add then immediately remove a TEMPORARY_OFFLINE tag so cluster state + // is restored. Both RPCs must complete without a transport error. + admin + .add_server_tag(vec![tablet_id], ServerTag::TemporaryOffline) + .await + .expect("should add server tag"); + admin + .remove_server_tag(vec![tablet_id], ServerTag::TemporaryOffline) + .await + .expect("should remove server tag"); + } + + // --------------------------------------------------------------------- + // Group H: Rebalance + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_rebalance_operations() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // No rebalance is running; listing progress and cancelling are read/no-op + // paths. Tolerate a structured server error (e.g. nothing to cancel). + if let Err(error) = admin.list_rebalance_progress(None).await { + assert_expected_api_error( + error, + &[ + FlussError::UnknownServerError, + FlussError::InvalidCoordinatorException, + ], + ); + } + + if let Err(error) = admin.cancel_rebalance(None).await { + assert_expected_api_error( + error, + &[ + FlussError::UnknownServerError, + FlussError::InvalidCoordinatorException, + ], + ); + } + } + + // --------------------------------------------------------------------- + // Group I: Producer offsets + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_producer_offsets_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_producer_offsets_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "producer_offsets_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + let table_id = admin + .get_table_info(&table_path) + .await + .unwrap() + .get_table_id(); + + let producer_id = "test-producer"; + let table_offsets = vec![ProducerTableOffsets { + table_id, + bucket_offsets: vec![BucketOffset { + partition_id: None, + bucket_id: 0, + log_end_offset: Some(42), + }], + }]; + + admin + .register_producer_offsets(producer_id, table_offsets, None) + .await + .expect("should register producer offsets"); + + let fetched = admin + .get_producer_offsets(producer_id) + .await + .expect("should get producer offsets"); + let registered = fetched + .table_offsets + .iter() + .find(|t| t.table_id == table_id) + .unwrap_or_else(|| panic!("expected offsets for table {table_id}")); + assert_eq!( + registered + .bucket_offsets + .first() + .and_then(|b| b.log_end_offset), + Some(42), + "registered log_end_offset should be 42" + ); + + admin + .delete_producer_offsets(producer_id) + .await + .expect("should delete producer offsets"); + + let after_delete = admin + .get_producer_offsets(producer_id) + .await + .expect("should get producer offsets after delete"); + assert!( + after_delete + .table_offsets + .iter() + .all(|t| t.table_id != table_id), + "offsets for table {table_id} should be gone after delete" + ); + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group J: ACL management + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_acl_lifecycle() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_acl_lifecycle_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let acl = AclInfo { + resource_name: db_name.to_string(), + resource_type: ResourceType::Database, + principal_name: "alice".to_string(), + principal_type: "User".to_string(), + host: "*".to_string(), + operation_type: OperationType::Read, + permission_type: PermissionType::Allow, + }; + + // Authorization may be disabled on the shared cluster. In that case the + // server rejects the request with SecurityDisabledException; otherwise + // the full create/list/drop lifecycle should round-trip. + match admin.create_acls(vec![acl.clone()]).await { + Err(error) => { + assert_expected_api_error( + error, + &[ + FlussError::SecurityDisabledException, + FlussError::AuthorizationException, + ], + ); + } + Ok(_) => { + let filter = AclFilter { + resource_name: Some(db_name.to_string()), + resource_type: ResourceType::Database, + principal_name: Some("alice".to_string()), + principal_type: Some("User".to_string()), + host: Some("*".to_string()), + operation_type: OperationType::Read, + permission_type: PermissionType::Allow, + }; + let listed = admin + .list_acls(filter.clone()) + .await + .expect("should list acls"); + assert!( + listed.iter().any(|a| a.resource_name == db_name), + "created ACL should appear in list: {listed:?}" + ); + + admin + .drop_acls(vec![filter]) + .await + .expect("should drop acls"); + } + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } + + // --------------------------------------------------------------------- + // Group K: Lake snapshots (0.9.x) + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_lake_snapshot_operations() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let db_name = "test_lake_snapshot_db"; + let descriptor = DatabaseDescriptorBuilder::default().build(); + admin + .create_database(db_name, Some(&descriptor), true) + .await + .unwrap(); + + let table_path = TablePath::new(db_name, "lake_table"); + admin + .create_table(&table_path, &simple_log_table(), true) + .await + .unwrap(); + + // Lake storage is typically not configured for the test cluster, so both + // calls are expected to fail with a lake-related API error rather than a + // transport error. + let lake_errors = [ + FlussError::LakeStorageNotConfiguredException, + FlussError::LakeSnapshotNotExist, + ]; + + if let Err(error) = admin.get_latest_lake_snapshot(&table_path).await { + assert_expected_api_error(error, &lake_errors); + } + if let Err(error) = admin.get_lake_snapshot(&table_path, None, None).await { + assert_expected_api_error(error, &lake_errors); + } + + admin.drop_database(db_name, true, true).await.unwrap(); + } +} diff --git a/crates/fluss/tests/integration/admin_v1.rs b/crates/fluss/tests/integration/admin_v1.rs new file mode 100644 index 00000000..6a5bec9c --- /dev/null +++ b/crates/fluss/tests/integration/admin_v1.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for admin APIs that are only available on Fluss 1.x +//! servers (API keys 1057-1058, 1061-1064). The whole module is gated behind +//! the `fluss_v1` feature so it is skipped when running against a 0.9.x server. + +#[cfg(test)] +mod admin_v1_test { + use crate::integration::utils::get_shared_cluster; + use fluss::metadata::DataTypes; + use fluss::metadata::{ + DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor, TablePath, + }; + + /// `get_cluster_health` (API key 1062) reports replica/leader counts and a + /// status code for the cluster. + #[tokio::test] + async fn test_get_cluster_health() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let _response = admin + .get_cluster_health() + .await + .expect("should get cluster health"); + } + + /// `list_kv_snapshots` (API key 1064) returns the active snapshots for a KV + /// table. A freshly created table has none, but the response must echo the + /// requested table id. + #[tokio::test] + async fn test_list_kv_snapshots() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let test_db_name = "test_list_kv_snapshots_db"; + let db_descriptor = DatabaseDescriptorBuilder::default() + .comment("Database for test_list_kv_snapshots") + .build(); + + admin + .create_database(test_db_name, Some(&db_descriptor), true) + .await + .expect("Failed to create test database"); + + let test_table_name = "kv_snapshot_table"; + let table_path = TablePath::new(test_db_name, test_table_name); + + let table_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build table schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(table_schema) + .distributed_by(Some(1), vec!["id".to_string()]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .kv_format(KvFormat::COMPACTED) + .build() + .expect("Failed to build table descriptor"); + + admin + .create_table(&table_path, &table_descriptor, true) + .await + .expect("Failed to create table"); + + let table_info = admin + .get_table_info(&table_path) + .await + .expect("should get table info"); + let table_id = table_info.get_table_id(); + + let response = admin + .list_kv_snapshots(table_id, None) + .await + .expect("should list kv snapshots"); + + assert_eq!( + response.table_id, table_id, + "Response table_id should match request" + ); + + // Cleanup + admin.drop_table(&table_path, true).await.unwrap(); + admin.drop_database(test_db_name, true, true).await.unwrap(); + } + + /// `list_remote_log_manifests` (API key 1063) lists tiered log segments. A + /// newly created table has had no remote log activity yet. + #[tokio::test] + async fn test_list_remote_log_manifests() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let test_db_name = "test_list_remote_log_manifests_db"; + let db_descriptor = DatabaseDescriptorBuilder::default() + .comment("Database for test_list_remote_log_manifests") + .build(); + + admin + .create_database(test_db_name, Some(&db_descriptor), true) + .await + .expect("Failed to create test database"); + + let test_table_name = "remote_log_table"; + let table_path = TablePath::new(test_db_name, test_table_name); + + let table_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("data", DataTypes::string()) + .build() + .expect("Failed to build table schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(table_schema) + .distributed_by(Some(1), vec![]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .build() + .expect("Failed to build table descriptor"); + + admin + .create_table(&table_path, &table_descriptor, true) + .await + .expect("Failed to create table"); + + let table_info = admin + .get_table_info(&table_path) + .await + .expect("should get table info"); + let table_id = table_info.get_table_id(); + + let manifests = admin + .list_remote_log_manifests(table_id, None) + .await + .expect("should list remote log manifests"); + + // A newly created table with no remote log activity should return empty manifests. + assert!( + manifests.is_empty(), + "Newly created table should have no remote log manifests" + ); + + // Cleanup + admin.drop_table(&table_path, true).await.unwrap(); + admin.drop_database(test_db_name, true, true).await.unwrap(); + } + + /// `drop_kv_snapshot_lease` (API key 1058) removes an entire lease. Dropping + /// a lease that never existed is a server-side no-op. + #[tokio::test] + async fn test_drop_kv_snapshot_lease() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + // Dropping a non-existent lease should succeed (no-op on server) + let result = admin.drop_kv_snapshot_lease("non-existent-lease-id").await; + assert!( + result.is_ok(), + "Dropping non-existent lease should succeed, got: {:?}", + result + ); + } + + /// `release_kv_snapshot_lease` (API key 1057) releases specific buckets from + /// a lease. Releasing an empty bucket set against an unknown lease is a + /// no-op and must not error. + #[tokio::test] + async fn test_release_kv_snapshot_lease() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("should get admin"); + + let result = admin + .release_kv_snapshot_lease("non-existent-lease-id", vec![]) + .await; + assert!( + result.is_ok(), + "Releasing an empty bucket set should succeed, got: {:?}", + result + ); + } +} diff --git a/crates/fluss/tests/integration/batch_scanner.rs b/crates/fluss/tests/integration/batch_scanner.rs index 443d0518..ba3bf901 100644 --- a/crates/fluss/tests/integration/batch_scanner.rs +++ b/crates/fluss/tests/integration/batch_scanner.rs @@ -379,6 +379,235 @@ mod batch_scanner_test { assert!(table.new_scan().limit(-5).is_err()); } + // ---- KvBatchScanner tests (ScanKv protocol) -------------------------------- + + /// Full scan of a PK table bucket: upsert rows, scan without limit, verify + /// all rows are returned. + #[tokio::test] + async fn kv_batch_scanner_reads_all_rows() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_kv_batch_scan_all"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id"]) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let writer = table + .new_upsert() + .expect("upsert") + .create_writer() + .expect("writer"); + + let expected: HashMap = (1..=10) + .map(|i| { + ( + i, + ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"][i as usize - 1], + ) + }) + .collect(); + for (&id, &name) in &expected { + let mut row = GenericRow::new(2); + row.set_field(0, id); + row.set_field(1, name); + writer.upsert(&row).expect("upsert row"); + } + writer.flush().await.expect("flush"); + + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .create_kv_batch_scanner(bucket.clone()) + .expect("create kv batch scanner"); + + let batches = match scanner.collect_all_batches().await { + Ok(b) => b, + // ScanKv (API 1061) is not supported on every server build + // (e.g. 0.9.x); tolerate the server's "unsupported" signal rather + // than failing the suite. Any other error is a real failure. + Err(fluss::error::Error::UnsupportedVersion { .. }) => return, + Err(e) => panic!("collect: {e}"), + }; + let total_rows: usize = batches.iter().map(|b| b.num_records()).sum(); + assert_eq!(total_rows, 10, "should return all 10 upserted rows"); + + let mut found = HashMap::new(); + for batch in &batches { + let rows = batch.batch(); + let ids = rows + .column(0) + .as_any() + .downcast_ref::() + .expect("id column"); + let names = rows + .column(1) + .as_any() + .downcast_ref::() + .expect("name column"); + for i in 0..rows.num_rows() { + found.insert(ids.value(i), names.value(i).to_string()); + } + } + for (&id, &name) in &expected { + assert_eq!( + found.get(&id).map(|s| s.as_str()), + Some(name), + "missing or wrong row for id={id}" + ); + } + } + + /// KvBatchScanner on an empty PK table returns no batches. + #[tokio::test] + async fn kv_batch_scanner_empty_table() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_kv_batch_scan_empty"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("val", DataTypes::string()) + .primary_key(vec!["id"]) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .create_kv_batch_scanner(bucket) + .expect("create kv batch scanner"); + + let batches = match scanner.collect_all_batches().await { + Ok(b) => b, + // ScanKv (API 1061) is not supported on every server build + // (e.g. 0.9.x); tolerate the server's "unsupported" signal rather + // than failing the suite. Any other error is a real failure. + Err(fluss::error::Error::UnsupportedVersion { .. }) => return, + Err(e) => panic!("collect: {e}"), + }; + let total_rows: usize = batches.iter().map(|b| b.num_records()).sum(); + assert_eq!(total_rows, 0, "empty table should yield 0 rows"); + } + + /// KvBatchScanner with column projection. + #[tokio::test] + async fn kv_batch_scanner_with_projection() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_kv_batch_scan_proj"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id"]) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let writer = table + .new_upsert() + .expect("upsert") + .create_writer() + .expect("writer"); + + for i in 1..=3 { + let mut row = GenericRow::new(3); + row.set_field(0, i); + row.set_field(1, format!("name_{i}")); + row.set_field(2, (i as i64) * 100); + writer.upsert(&row).expect("upsert"); + } + writer.flush().await.expect("flush"); + + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .project(&[0, 2]) + .expect("project") + .create_kv_batch_scanner(bucket) + .expect("kv batch scanner"); + + let batches = match scanner.collect_all_batches().await { + Ok(b) => b, + // ScanKv (API 1061) is not supported on every server build + // (e.g. 0.9.x); tolerate the server's "unsupported" signal rather + // than failing the suite. Any other error is a real failure. + Err(fluss::error::Error::UnsupportedVersion { .. }) => return, + Err(e) => panic!("collect: {e}"), + }; + for batch in &batches { + let rows = batch.batch(); + assert_eq!(rows.num_columns(), 2, "projected to id + score"); + assert_eq!(rows.schema().field(0).name(), "id"); + assert_eq!(rows.schema().field(1).name(), "score"); + } + } + + /// KvBatchScanner must reject log tables (no primary key). + #[tokio::test] + async fn kv_batch_scanner_rejects_log_table() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_kv_batch_scan_log_reject"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let bucket = TableBucket::new(table.get_table_info().table_id, 0); + assert!( + table.new_scan().create_kv_batch_scanner(bucket).is_err(), + "must reject non-PK table" + ); + } + /// A configured limit must be rejected by the log scanners rather than /// silently ignored. #[tokio::test] diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 2d2bd152..73ed0651 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -21,6 +21,9 @@ extern crate fluss; #[cfg(feature = "integration_tests")] mod integration { mod admin; + mod admin_extended; + #[cfg(feature = "fluss_v1")] + mod admin_v1; mod batch_scanner; mod fluss_cluster; mod kv_table;