Skip to content

[rust] Add KvBatchScanner for full PK-table bucket scans#633

Open
gnuhpc wants to merge 10 commits into
apache:mainfrom
gnuhpc:pr/6-kv-batch-scanner
Open

[rust] Add KvBatchScanner for full PK-table bucket scans#633
gnuhpc wants to merge 10 commits into
apache:mainfrom
gnuhpc:pr/6-kv-batch-scanner

Conversation

@gnuhpc

@gnuhpc gnuhpc commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

What

Introduces a stateful, unbounded KV-table scanner using the ScanKv API (1061). The first next_batch() opens the server-side cursor; subsequent calls iterate; dropping the scanner sends a best-effort close.

  • client/table/kv_batch_scanner.rs: KvBatchScanner with a per-bucket state machine (Pending → Active → Done), best-effort close on Drop, and retry-with-backoff on any retriable server error (leader-election races on a freshly created bucket, transient TooManyScanners, …) — not just TooManyScanners.
  • client/table/scanner.rs: TableScan::create_kv_batch_scanner() with PK / bucket-range validation.
  • config.rs: scanner_kv_fetch_max_bytes (default 4 MB, matching Java CLIENT_SCANNER_KV_FETCH_MAX_BYTES).
  • rpc/fluss_api_error.rs: new error codes 66–69 (ScannerExpiredException, UnknownScannerIdException, InvalidScanRequestException, TooManyScanners) with correct retriable classification.
  • client/table/batch_scanner.rs: expose the KV decode helpers (pub(super)) for reuse by the new scanner.
  • tests/integration/batch_scanner.rs: 4 KV integration tests; they tolerate UnsupportedVersion so they also pass (as no-ops) against 0.9.x servers that lack ScanKv.

Verification

Built + cargo clippy clean; cargo fmt clean. Integration suite against a real Fluss 1.x cluster: 70/72 pass. The only 2 failures are the SASL negative auth tests (test_sasl_connect_with_wrong_password, test_sasl_connect_with_unknown_user) — these fail because the test docker image does not enforce SASL auth (verified: valid/wrong/unknown credentials all connect). The Rust client's SASL handshake is correct (sends AuthenticateRequest, retries RetriableAuthenticateException, propagates non-retriable); Fluss's own reference test SaslAuthenticationITCase with identical config does reject bad credentials. So this is a test-image/server packaging issue, not an SDK defect.

Stack

Part 6/6 (final), stacked on #632 → … → #628. All target main.

🤖 Generated with Claude Code

@gnuhpc

gnuhpc commented Jun 20, 2026

Copy link
Copy Markdown
Contributor Author

Rebased on top of updated #632. ScanKvRequest::new is now pub(crate) (consumer is internal-only — KvBatchScanner). The KvBatchScanner public API itself was already domain-type-clean.

Verified against real Java Fluss 0.9.1-incubating and 1.0-SNAPSHOT-dev — all 4 new kv_batch_scanner_* integration tests pass on 0.9.x (ScanKv on 0.9.x returns UnsupportedVersion, tolerated by design) and on 1.x.

warmbupt and others added 10 commits June 22, 2026 10:34
Add message wrappers for the remaining 1.x RPC APIs:
- KV snapshot lifecycle: acquire/release/drop lease, list, metadata,
  latest snapshots, lake snapshot
- Server management: add/remove server tag, rebalance + progress +
  cancel, get cluster health, list remote log manifests
- Producer offsets: register/get/delete
- ScanKv (API 1061): full KV-table bucket scan request/response
Replace raw i32 with proper Rust enums matching the Java Fluss
definitions: GoalType (ReplicaDistribution/LeaderDistribution/RackAware)
and ServerTag (PermanentOffline/TemporaryOffline). Addresses reviewer
feedback on PR apache#630.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 27 new admin methods to FlussAdmin:
- Database/table extensions: list_database_summaries, alter_database,
  alter_table, get_table_stats
- KV snapshot operations: get_latest_kv_snapshots,
  get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot
- ACL management: create_acls, list_acls, drop_acls
- Cluster configuration: describe_cluster_configs, alter_cluster_configs
- Server management: add_server_tag, remove_server_tag, rebalance,
  list_rebalance_progress, cancel_rebalance
- Producer offsets: register_producer_offsets, get_producer_offsets,
  delete_producer_offsets
- Monitoring: get_cluster_health, list_remote_log_manifests
- KV snapshots: list_kv_snapshots, release_kv_snapshot_lease,
  drop_kv_snapshot_lease
…dable

- Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs
- Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs
- Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in
  kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs
- Expose readable parameter in admin.get_lake_snapshot() instead of
  hardcoding None

Addresses reviewer feedback on PR apache#631.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…hot_lease)

Extends the BucketId/TableId/PartitionId alias consistency fix to
table_stats.rs (BucketStatsRequest, BucketStats) and
kv_snapshot_lease.rs (KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Now that pr/3 provides GoalType and ServerTag enums in the RPC
wrappers, update the admin client methods to use them in their
public signatures too.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace raw SERVER_TAG_TEMPORARY_OFFLINE const with ServerTag::TemporaryOffline
- Add readable parameter to get_lake_snapshot call

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Introduce a stateful, unbounded KV-table scanner using the ScanKv API
(1061). The first next_batch() opens the server-side cursor; subsequent
calls iterate; dropping the scanner sends a best-effort close.

- client/table/kv_batch_scanner.rs: KvBatchScanner with per-bucket state
  machine (Pending -> Active -> Done), best-effort close on Drop, and
  retry-with-backoff on retriable server errors (leader-election races,
  TooManyScanners, etc.) — not just TooManyScanners.
- client/table/scanner.rs: TableScan::create_kv_batch_scanner() with
  PK/bucket validation.
- config.rs: scanner_kv_fetch_max_bytes (default 4MB, matching Java).
- rpc/fluss_api_error.rs: new error codes 66-69
  (ScannerExpiredException, UnknownScannerIdException,
  InvalidScanRequestException, TooManyScanners) with correct retriable
  classification.
- batch_scanner.rs: expose KV decode helpers (pub(super)) for reuse.
- tests: 4 KV integration tests; tolerate UnsupportedVersion so they
  also pass (no-op) against 0.9.x servers that lack ScanKv.
- Add scanner_kv_fetch_max_bytes to C++ binding FfiConfig
- Add 4 new FlussError variants to Elixir binding match
- Replace raw i64/i32 with TableId/PartitionId/BucketId in RPC messages
  and corresponding admin.rs method signatures

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@gnuhpc gnuhpc force-pushed the pr/6-kv-batch-scanner branch from c421353 to 70c6746 Compare June 22, 2026 02:50
@gnuhpc

gnuhpc commented Jun 22, 2026

Copy link
Copy Markdown
Contributor Author

Fixed compilation of C++ and Elixir bindings:

  • C++ binding: added scanner_kv_fetch_max_bytes field to FfiConfig and header
  • Elixir binding: added match arms for 4 new FlussError variants (ScannerExpiredException, UnknownScannerIdException, InvalidScanRequestException, TooManyScanners)

Also applied TableId/PartitionId/BucketId type aliases to RPC message signatures and corresponding admin methods.

All branches rebased onto latest main.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants