[rust] Add KvBatchScanner for full PK-table bucket scans#633
Open
gnuhpc wants to merge 10 commits into
Open
Conversation
9522a0f to
c421353
Compare
Contributor
Author
|
Rebased on top of updated #632. Verified against real Java Fluss 0.9.1-incubating and 1.0-SNAPSHOT-dev — all 4 new |
Add message wrappers for the remaining 1.x RPC APIs: - KV snapshot lifecycle: acquire/release/drop lease, list, metadata, latest snapshots, lake snapshot - Server management: add/remove server tag, rebalance + progress + cancel, get cluster health, list remote log manifests - Producer offsets: register/get/delete - ScanKv (API 1061): full KV-table bucket scan request/response
Replace raw i32 with proper Rust enums matching the Java Fluss definitions: GoalType (ReplicaDistribution/LeaderDistribution/RackAware) and ServerTag (PermanentOffline/TemporaryOffline). Addresses reviewer feedback on PR apache#630. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 27 new admin methods to FlussAdmin: - Database/table extensions: list_database_summaries, alter_database, alter_table, get_table_stats - KV snapshot operations: get_latest_kv_snapshots, get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot - ACL management: create_acls, list_acls, drop_acls - Cluster configuration: describe_cluster_configs, alter_cluster_configs - Server management: add_server_tag, remove_server_tag, rebalance, list_rebalance_progress, cancel_rebalance - Producer offsets: register_producer_offsets, get_producer_offsets, delete_producer_offsets - Monitoring: get_cluster_health, list_remote_log_manifests - KV snapshots: list_kv_snapshots, release_kv_snapshot_lease, drop_kv_snapshot_lease
…dable - Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs - Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs - Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs - Expose readable parameter in admin.get_lake_snapshot() instead of hardcoding None Addresses reviewer feedback on PR apache#631. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…hot_lease) Extends the BucketId/TableId/PartitionId alias consistency fix to table_stats.rs (BucketStatsRequest, BucketStats) and kv_snapshot_lease.rs (KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Now that pr/3 provides GoalType and ServerTag enums in the RPC wrappers, update the admin client methods to use them in their public signatures too. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace raw SERVER_TAG_TEMPORARY_OFFLINE const with ServerTag::TemporaryOffline - Add readable parameter to get_lake_snapshot call Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Introduce a stateful, unbounded KV-table scanner using the ScanKv API (1061). The first next_batch() opens the server-side cursor; subsequent calls iterate; dropping the scanner sends a best-effort close. - client/table/kv_batch_scanner.rs: KvBatchScanner with per-bucket state machine (Pending -> Active -> Done), best-effort close on Drop, and retry-with-backoff on retriable server errors (leader-election races, TooManyScanners, etc.) — not just TooManyScanners. - client/table/scanner.rs: TableScan::create_kv_batch_scanner() with PK/bucket validation. - config.rs: scanner_kv_fetch_max_bytes (default 4MB, matching Java). - rpc/fluss_api_error.rs: new error codes 66-69 (ScannerExpiredException, UnknownScannerIdException, InvalidScanRequestException, TooManyScanners) with correct retriable classification. - batch_scanner.rs: expose KV decode helpers (pub(super)) for reuse. - tests: 4 KV integration tests; tolerate UnsupportedVersion so they also pass (no-op) against 0.9.x servers that lack ScanKv.
- Add scanner_kv_fetch_max_bytes to C++ binding FfiConfig - Add 4 new FlussError variants to Elixir binding match - Replace raw i64/i32 with TableId/PartitionId/BucketId in RPC messages and corresponding admin.rs method signatures Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
c421353 to
70c6746
Compare
Contributor
Author
|
Fixed compilation of C++ and Elixir bindings:
Also applied All branches rebased onto latest main. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Introduces a stateful, unbounded KV-table scanner using the ScanKv API (1061). The first
next_batch()opens the server-side cursor; subsequent calls iterate; dropping the scanner sends a best-effort close.client/table/kv_batch_scanner.rs:KvBatchScannerwith a per-bucket state machine (Pending → Active → Done), best-effort close onDrop, and retry-with-backoff on any retriable server error (leader-election races on a freshly created bucket, transientTooManyScanners, …) — not justTooManyScanners.client/table/scanner.rs:TableScan::create_kv_batch_scanner()with PK / bucket-range validation.config.rs:scanner_kv_fetch_max_bytes(default 4 MB, matching JavaCLIENT_SCANNER_KV_FETCH_MAX_BYTES).rpc/fluss_api_error.rs: new error codes 66–69 (ScannerExpiredException,UnknownScannerIdException,InvalidScanRequestException,TooManyScanners) with correct retriable classification.client/table/batch_scanner.rs: expose the KV decode helpers (pub(super)) for reuse by the new scanner.tests/integration/batch_scanner.rs: 4 KV integration tests; they tolerateUnsupportedVersionso they also pass (as no-ops) against 0.9.x servers that lack ScanKv.Verification
Built +
cargo clippyclean;cargo fmtclean. Integration suite against a real Fluss 1.x cluster: 70/72 pass. The only 2 failures are the SASL negative auth tests (test_sasl_connect_with_wrong_password,test_sasl_connect_with_unknown_user) — these fail because the test docker image does not enforce SASL auth (verified: valid/wrong/unknown credentials all connect). The Rust client's SASL handshake is correct (sendsAuthenticateRequest, retriesRetriableAuthenticateException, propagates non-retriable); Fluss's own reference testSaslAuthenticationITCasewith identical config does reject bad credentials. So this is a test-image/server packaging issue, not an SDK defect.Stack
Part 6/6 (final), stacked on #632 → … → #628. All target
main.🤖 Generated with Claude Code