diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 7a0e0f5dd2..8cad44e743 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -59,6 +59,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -122,6 +123,18 @@ public final class RecordAccumulator { private final Clock clock; private final DynamicWriteBatchSizeEstimator batchSizeEstimator; + // Per-bucket backpressure throttle expiry timestamp. Accessed strictly by key on + // hot paths (get / put / remove); writes happen on every backpressure signal and + // every eviction, so the container is sized for lock-striped O(1) updates without + // any whole-map snapshot cost. + private final ConcurrentMap throttleExpiryMs = new ConcurrentHashMap<>(); + private final long maxThrottleMs; + + // Latest Cluster snapshot fed to the metadata-driven throttle sweep. Identity + // equality against this reference short-circuits the sweep when metadata hasn't + // changed. + private volatile Cluster lastClusterRef = Cluster.empty(); + // TODO add retryBackoffMs to retry the produce request upon receiving an error. // TODO add deliveryTimeoutMs to report success or failure on record delivery. // TODO add nextBatchExpiryTimeMs @@ -155,6 +168,7 @@ public final class RecordAccumulator { (int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes()); this.idempotenceManager = idempotenceManager; this.clock = clock; + this.maxThrottleMs = conf.get(ConfigOptions.CLIENT_WRITER_KV_BACKPRESSURE_MAX_THROTTLE_MS); registerMetrics(writerMetricGroup); } @@ -520,6 +534,22 @@ private long bucketReady( } else { TableBucket tableBucket = cluster.getTableBucket(tableIdOpt.get(), physicalTablePath, bucketId); + + // If this bucket is throttled, don't mark its node as ready. + // Instead, factor the remaining throttle time into the next check delay. + Long throttleExpiry = throttleExpiryMs.get(tableBucket); + if (throttleExpiry != null) { + long now = clock.milliseconds(); + if (now < throttleExpiry) { + nextReadyCheckDelayMs = + Math.min(nextReadyCheckDelayMs, throttleExpiry - now); + continue; + } + // Expired — evict here to reclaim entries for buckets whose deque + // has gone empty and won't reach the drain-time throttle check. + throttleExpiryMs.remove(tableBucket); + } + Integer leader = cluster.leaderFor(tableBucket); if (leader == null) { // This is a bucket for which leader is not known, but messages are @@ -762,7 +792,7 @@ private List drainBatchesForOneNode(Cluster cluster, Integer no // request size due to compression; in this case we will still // eventually send this batch in a single request. break; - } else if (shouldStopDrainBatchesForBucket(first, tableBucket)) { + } else if (shouldSkipBucket(first, tableBucket)) { // Buckets are independent — skip this one, keep draining others. continue; } @@ -777,7 +807,7 @@ private List drainBatchesForOneNode(Cluster cluster, Integer no // we update it and reset the batch sequence. This should be only done when // all // its in-flight batches have completed. This is guarantee in - // `shouldStopDrainBatchesForBucket`. + // `shouldSkipBucket`. idempotenceManager.maybeUpdateWriterId(tableBucket); // If the batch already has an assigned batch sequence, then we should not @@ -845,7 +875,11 @@ private List drainBatchesForOneNode(Cluster cluster, Integer no return ready; } - private boolean shouldStopDrainBatchesForBucket(WriteBatch first, TableBucket tableBucket) { + private boolean shouldSkipBucket(WriteBatch first, TableBucket tableBucket) { + // Backpressure throttle check: skip this bucket if still under throttle + if (isThrottled(tableBucket)) { + return true; + } if (idempotenceManager.idempotenceEnabled()) { if (!idempotenceManager.isWriterIdValid()) { // we cannot send the batch until we have refreshed writer id. @@ -883,6 +917,81 @@ private boolean shouldStopDrainBatchesForBucket(WriteBatch first, TableBucket ta return false; } + // ---- Backpressure throttle methods ---- + + /** + * Check if a bucket is currently under backpressure throttle. + * + *

Performs lazy eviction: if the throttle has expired, the entry is removed from the map to + * prevent unbounded growth. + * + * @return true if the bucket should be skipped during drain + */ + boolean isThrottled(TableBucket tableBucket) { + Long expiry = throttleExpiryMs.get(tableBucket); + if (expiry == null) { + return false; + } + if (clock.milliseconds() < expiry) { + return true; + } + // Expired — evict to prevent map leak + throttleExpiryMs.remove(tableBucket); + return false; + } + + /** + * Update the throttle state for a bucket based on the received pressure signal. + * + *

The delay grows quadratically with pressure: {@code delay = maxThrottleMs * p^2}, where + * {@code p ∈ [0, 1)}. This provides meaningful throttling across the full ramp-up window while + * remaining gentle at low pressure. + * + * @param tableBucket the bucket to update + * @param pressure value in {@code [0, 1)}; {@code 0} means recovered, positive values trigger a + * throttle window + */ + void updateThrottle(TableBucket tableBucket, float pressure) { + if (pressure > 0f) { + long delay = (long) (maxThrottleMs * pressure * pressure); + if (delay > 0) { + throttleExpiryMs.put(tableBucket, clock.milliseconds() + delay); + return; + } + } + // Recovered or below the meaningful resolution: remove throttle + throttleExpiryMs.remove(tableBucket); + } + + /** + * Apply a hard back-off after the server rejected the write with a {@code + * StorageBackpressureException}. This signals that the storage engine has reached its own + * slowdown threshold; we stall the bucket for the full {@link #maxThrottleMs} window so the + * standard retry path can take effect. + */ + void applyStorageBackpressureBackoff(TableBucket tableBucket) { + throttleExpiryMs.put(tableBucket, clock.milliseconds() + maxThrottleMs); + } + + /** + * Evict throttle entries whose buckets no longer exist in the given cluster (leader unknown, + * partition dropped, table dropped). + * + *

Invoked on every Sender loop with the current cluster snapshot. The identity short-circuit + * makes this an O(1) no-op when metadata hasn't changed, so the actual O(N) walk only runs once + * per real metadata refresh. + */ + void maybeEvictStaleThrottles(Cluster cluster) { + if (cluster == lastClusterRef) { + return; + } + lastClusterRef = cluster; + if (throttleExpiryMs.isEmpty()) { + return; + } + throttleExpiryMs.keySet().removeIf(tb -> cluster.leaderFor(tb) == null); + } + private int getDrainIndex(int id) { return nodesDrainIndex.computeIfAbsent(id, s -> 0); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java index a8e62cd370..3c308e0a37 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.OutOfOrderSequenceException; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.RetriableException; +import org.apache.fluss.exception.StorageBackpressureException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; @@ -209,6 +210,10 @@ private void addToInflightBatches(Map> batches) { private void sendWriteData() throws Exception { Cluster clusterSnapshot = metadataUpdater.getCluster(); + // Refresh per-bucket throttle entries against the current cluster snapshot, + // dropping any whose bucket has disappeared from metadata. + accumulator.maybeEvictStaleThrottles(clusterSnapshot); + // get the list of buckets with data ready to send. ReadyCheckResult readyCheckResult = accumulator.ready(clusterSnapshot); @@ -498,7 +503,16 @@ private void handlePutKvResponse( tableId, respForBucket.hasPartitionId() ? respForBucket.getPartitionId() : null, respForBucket.getBucketId()); + + // Update backpressure throttle from pressure signal + if (respForBucket.hasPressure()) { + accumulator.updateThrottle(tb, respForBucket.getPressure()); + } + ReadyWriteBatch writeBatch = recordsByBucket.get(tb); + if (writeBatch == null) { + continue; + } if (respForBucket.hasErrorCode()) { Set invalidMetadataTables = handleWriteBatchException( @@ -534,6 +548,12 @@ private Set handleWriteBatchException( ReadyWriteBatch readyWriteBatch, ApiError error) { Set invalidMetadataTables = new HashSet<>(); WriteBatch writeBatch = readyWriteBatch.writeBatch(); + if (error.exception() instanceof StorageBackpressureException) { + // Hard rejection: the storage engine reached its slowdown trigger and rejected the + // write. Apply a hard back-off equal to the configured max throttle window before + // letting the standard retry path re-enqueue the batch. + accumulator.applyStorageBackpressureBackoff(readyWriteBatch.tableBucket()); + } if (error.error() == Errors.DUPLICATE_SEQUENCE_EXCEPTION) { // If we have received a duplicate batch sequence error, it means that the batch // sequence has advanced beyond the sequence of the current batch. diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index 0e4ab9c97f..9f8c41ae7a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -671,6 +671,8 @@ private RecordAccumulator createTestRecordAccumulator( conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new MemorySize(totalSize)); conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(pageSize)); conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(batchSize)); + // Use a small max throttle to keep test latencies tight; quadratic curve applies. + conf.set(ConfigOptions.CLIENT_WRITER_KV_BACKPRESSURE_MAX_THROTTLE_MS, 1000L); return new RecordAccumulator( conf, new IdempotenceManager( @@ -701,4 +703,96 @@ private int getBatchNumInAccum(RecordAccumulator accum) { + (bucketBatches2 == null ? 0 : bucketBatches2.size()) + (bucketBatches3 == null ? 0 : bucketBatches3.size()); } + + // ---- Backpressure throttle tests ---- + + @Test + void testIsThrottledReturnsFalseWhenNoThrottle() { + RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE); + assertThat(accum.isThrottled(tb1)).isFalse(); + } + + @Test + void testUpdateThrottleWithPositivePressure() { + RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE); + // pressure = 0.5 -> delay = 1000 * 0.5^2 = 250ms + accum.updateThrottle(tb1, 0.5f); + assertThat(accum.isThrottled(tb1)).isTrue(); + + // Advance time past the throttle + clock.advanceTime(Duration.ofMillis(251)); + assertThat(accum.isThrottled(tb1)).isFalse(); + } + + @Test + void testUpdateThrottleWithHighPressure() { + RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE); + // pressure = 0.9 -> delay = 1000 * 0.9^2 = 810ms + accum.updateThrottle(tb1, 0.9f); + assertThat(accum.isThrottled(tb1)).isTrue(); + + clock.advanceTime(Duration.ofMillis(500)); + assertThat(accum.isThrottled(tb1)).isTrue(); + + clock.advanceTime(Duration.ofMillis(311)); + assertThat(accum.isThrottled(tb1)).isFalse(); + } + + @Test + void testApplyStorageBackpressureBackoff() { + RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE); + // StorageBackpressureException back-off: stalls the bucket for the full max throttle + // window. + accum.applyStorageBackpressureBackoff(tb1); + assertThat(accum.isThrottled(tb1)).isTrue(); + + // Still throttled within the configured max throttle window. + clock.advanceTime(Duration.ofMillis(999)); + assertThat(accum.isThrottled(tb1)).isTrue(); + + // Released exactly when the window elapses. + clock.advanceTime(Duration.ofMillis(2)); + assertThat(accum.isThrottled(tb1)).isFalse(); + } + + @Test + void testUpdateThrottleRecovery() { + RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE); + // First apply pressure + accum.updateThrottle(tb1, 0.5f); + assertThat(accum.isThrottled(tb1)).isTrue(); + + // Then recover with pressure=0 + accum.updateThrottle(tb1, 0f); + assertThat(accum.isThrottled(tb1)).isFalse(); + } + + @Test + void testThrottledBucketSkippedInDrain() throws Exception { + IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + long batchSize = getTestBatchSize(row); + RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, Integer.MAX_VALUE); + cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3)); + + // Append records to tb1 and tb2 + accum.append(createRecord(row), writeCallback, cluster, 0, false); + accum.append(createRecord(row), writeCallback, cluster, 1, false); + + // Throttle tb1 + accum.updateThrottle(tb1, 0.5f); + + // Drain should only produce batches for tb2 (tb1 is throttled) + Map> batches = + accum.drain( + cluster, + new HashSet<>(Collections.singletonList(node1.id())), + Integer.MAX_VALUE); + // tb2 should be in the batch, tb1 should not + List node1Batches = batches.get(node1.id()); + if (node1Batches != null) { + for (ReadyWriteBatch b : node1Batches) { + assertThat(b.tableBucket()).isNotEqualTo(tb1); + } + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index ec819f575f..e09d7ec91b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1235,6 +1235,19 @@ public class ConfigOptions { + " Dynamic partition strategy refers to creating partitions based on the data " + "being written for partitioned table if the wrote partition don't exists."); + public static final ConfigOption CLIENT_WRITER_KV_BACKPRESSURE_MAX_THROTTLE_MS = + key("client.writer.kv-backpressure.max-throttle-ms") + .longType() + .defaultValue(3000L) + .withDescription( + "The upper bound of the per-bucket throttle delay the client applies in response to " + + "KV backpressure signals piggybacked by the server. " + + "Throttle delay is computed as: max_throttle_ms * p^2, where p is the " + + "normalized pressure in [0, 1) reported by the server. " + + "Once the server rejects a write with StorageBackpressureException, " + + "the client also uses this value as the initial retry backoff so that the " + + "transition between throttle and rejection is continuous."); + public static final ConfigOption CLIENT_REQUEST_TIMEOUT = key("client.request-timeout") .durationType() @@ -2063,6 +2076,21 @@ public class ConfigOptions { + "KV recovery. Should be less than or equal to " + "'kv.recover.remote-log.prefetch-num'."); + // ------------------------------------------------------------------------ + // ConfigOptions for KV backpressure + // ------------------------------------------------------------------------ + + public static final ConfigOption KV_BACKPRESSURE_L0_SLOWDOWN_TRIGGER = + key("kv.backpressure.l0-slowdown-trigger") + .intType() + .defaultValue(8) + .withDescription( + "The L0 file count at which Fluss starts emitting proactive backpressure to clients. " + + "This value should be lower than the underlying storage engine's own " + + "L0 slowdown trigger (RocksDB level0_slowdown_writes_trigger, default 20), " + + "so that clients begin throttling before the storage engine is forced to throttle itself. " + + "The gap between this value and the storage trigger forms the throttle ramp-up window."); + // ------------------------------------------------------------------------ // ConfigOptions for metrics // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/StorageBackpressureException.java b/fluss-common/src/main/java/org/apache/fluss/exception/StorageBackpressureException.java new file mode 100644 index 0000000000..83e7dd4be0 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/StorageBackpressureException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown by a tablet server to reject writes when the underlying KV storage has reached its + * write-pressure threshold (i.e. the L0 file count has hit the storage engine's slowdown trigger). + * + *

This is the second tier of the cooperative backpressure model: the first tier is the proactive + * client-side throttle (driven by per-bucket pressure piggybacked on responses); once the storage + * engine itself is about to enter its internal slowdown, the server short-circuits the write and + * returns this retriable exception so that the RPC handler thread is not blocked by the storage + * engine's internal sleep. + * + *

Clients should retry after a backoff equal to the configured throttle ceiling. + */ +@PublicEvolving +public class StorageBackpressureException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public StorageBackpressureException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 07ebf57b6d..4a6b71fe0d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -181,6 +181,28 @@ public class MetricNames { public static final String ROCKSDB_COMPACTION_TIME_MICROS_MAX = "rocksdbCompactionTimeMicrosMax"; + // -------------------------------------------------------------------------------------------- + // KV backpressure metrics (table-level) + // -------------------------------------------------------------------------------------------- + /** + * Maximum normalized backpressure value across all buckets of this table, in {@code [0, 1]}. + * Reflects how close the hottest bucket is to the storage engine's hard-rejection trigger. + */ + public static final String KV_BACKPRESSURE_MAX_PRESSURE = "kvBackpressureMaxPressure"; + + /** + * Number of buckets of this table currently under backpressure (i.e. pressure {@code > 0}). + * Reflects how broadly backpressure has spread across the table. + */ + public static final String KV_BACKPRESSURE_AFFECTED_BUCKETS = "kvBackpressureAffectedBuckets"; + + /** + * Total number of write requests rejected with {@code StorageBackpressureException} on this + * table since process start. The rate of this counter reflects how often the storage engine + * crosses its hard-rejection trigger. + */ + public static final String KV_BACKPRESSURE_REJECTIONS_TOTAL = "kvBackpressureRejectionsTotal"; + // Table-level RocksDB metrics (aggregated from all buckets of a table, Sum aggregation) /** Total bytes read across all buckets of this table (Sum aggregation). */ public static final String ROCKSDB_BYTES_READ_TOTAL = "rocksdbBytesReadTotal"; diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/PutKvResultForBucket.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/PutKvResultForBucket.java index 788944d77d..843b2bc3c4 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/PutKvResultForBucket.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/PutKvResultForBucket.java @@ -26,6 +26,10 @@ /** Result of {@link PutKvRequest} for each table bucket. */ @Internal public class PutKvResultForBucket extends WriteResultForBucket { + + /** Backpressure pressure value: 0=normal, (0,1)=DELAYED zone, -1=STOPPED. */ + private float pressure; + public PutKvResultForBucket(TableBucket tableBucket, long changeLogEndOffset) { super(tableBucket, changeLogEndOffset, ApiError.NONE); } @@ -34,6 +38,15 @@ public PutKvResultForBucket(TableBucket tableBucket, ApiError error) { super(tableBucket, -1L, error); } + public float getPressure() { + return pressure; + } + + public PutKvResultForBucket setPressure(float pressure) { + this.pressure = pressure; + return this; + } + @Override public T copy(Errors newError) { //noinspection unchecked diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 0307932ecd..0bb36329d9 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -76,6 +76,7 @@ import org.apache.fluss.exception.ServerNotExistException; import org.apache.fluss.exception.ServerTagAlreadyExistException; import org.apache.fluss.exception.ServerTagNotExistException; +import org.apache.fluss.exception.StorageBackpressureException; import org.apache.fluss.exception.StorageException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; @@ -270,7 +271,11 @@ public enum Errors { DISK_WRITE_LOCKED( 70, "The tablet server has rejected writes because its data disk usage reached the configured write-limit ratio.", - DiskWriteLockedException::new); + DiskWriteLockedException::new), + STORAGE_BACKPRESSURE_EXCEPTION( + 71, + "The tablet server has rejected the write because the KV storage engine has reached its write-pressure threshold.", + StorageBackpressureException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index e8381215fc..13436269a5 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -901,6 +901,10 @@ message PbPutKvRespForBucket { // the log end offset (LEO) of the changelog after this write // this is the offset of the next record to be written, used for exactly-once semantics optional int64 log_end_offset = 5; + // backpressure signal from RocksDB write stall detection: + // 0 = normal (no pressure), (0,1) = DELAYED zone (normalized L0 ratio), + // -1 = STOPPED (client should halt sending to this bucket) + optional float pressure = 6; } message PbLookupReqForBucket { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvFlushScheduler.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvFlushScheduler.java new file mode 100644 index 0000000000..6868642a23 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvFlushScheduler.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.utils.ExecutorUtils; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** Shared scheduler for asynchronous KV flushes on one tablet server. */ +public class KvFlushScheduler implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(KvFlushScheduler.class); + + private static final long RETRY_DELAY_MS = 100L; + + private final ThreadPoolExecutor normalExecutor; + private final ThreadPoolExecutor pressureExecutor; + private final ScheduledExecutorService retryExecutor; + private final AtomicLong sequence = new AtomicLong(); + + private volatile boolean closed; + + public KvFlushScheduler(Configuration conf) { + int totalThreads = Math.max(2, conf.get(ConfigOptions.SERVER_IO_POOL_SIZE)); + int pressureThreads = Math.max(1, totalThreads / 4); + int normalThreads = Math.max(1, totalThreads - pressureThreads); + this.normalExecutor = + newExecutor(normalThreads, "kv-flush-normal", new PriorityBlockingQueue<>()); + this.pressureExecutor = + newExecutor(pressureThreads, "kv-flush-pressure", new PriorityBlockingQueue<>()); + this.retryExecutor = + new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("kv-flush-retry")); + LOG.info( + "Created KV flush scheduler with {} normal threads and {} pressure threads.", + normalThreads, + pressureThreads); + } + + public void enqueue(KvTablet tablet) { + if (closed) { + return; + } + FlushTask task = new FlushTask(tablet, sequence.incrementAndGet()); + if (tablet.usePressureFlushLane()) { + pressureExecutor.execute(task); + } else { + normalExecutor.execute(task); + } + } + + public void retryLater(KvTablet tablet) { + if (closed) { + return; + } + retryExecutor.schedule( + () -> tablet.requestFlushRetry(), RETRY_DELAY_MS, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + closed = true; + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, normalExecutor); + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, pressureExecutor); + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, retryExecutor); + } + + private static ThreadPoolExecutor newExecutor( + int threads, String threadName, PriorityBlockingQueue queue) { + return new ThreadPoolExecutor( + threads, + threads, + 0L, + TimeUnit.MILLISECONDS, + queue, + new ExecutorThreadFactory(threadName)); + } + + private static final class FlushTask implements Runnable, Comparable { + private final KvTablet tablet; + private final long sequence; + private final long score; + + private FlushTask(KvTablet tablet, long sequence) { + this.tablet = tablet; + this.sequence = sequence; + this.score = tablet.flushScore(); + } + + @Override + public void run() { + tablet.runScheduledFlush(); + } + + @Override + public int compareTo(FlushTask that) { + int scoreCompare = Long.compare(that.score, this.score); + if (scoreCompare != 0) { + return scoreCompare; + } + return Long.compare(this.sequence, that.sequence); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 9f2f1f426f..43ebeab42e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -140,6 +140,8 @@ public static RateLimiter getDefaultRateLimiter() { /** Current shared rate limiter configuration in bytes per second. */ private volatile long currentSharedRateLimitBytesPerSec; + private final KvFlushScheduler kvFlushScheduler; + private volatile boolean isShutdown = false; private KvManager( @@ -162,6 +164,7 @@ private KvManager( this.sharedRocksDBRateLimiter = createSharedRateLimiter(conf); this.currentSharedRateLimitBytesPerSec = conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes(); + this.kvFlushScheduler = new KvFlushScheduler(conf); } private static RateLimiter createSharedRateLimiter(Configuration conf) { @@ -205,6 +208,7 @@ public void startup() { public void shutdown() { LOG.info("Shutting down KvManager"); isShutdown = true; + kvFlushScheduler.close(); List kvs = new ArrayList<>(currentKvs.values()); for (KvTablet kvTablet : kvs) { try { @@ -276,6 +280,7 @@ public KvTablet getOrCreateKv( schemaGetter, tableConfig.getChangelogImage(), sharedRocksDBRateLimiter, + kvFlushScheduler, autoIncrementManager); currentKvs.put(tableBucket, tablet); @@ -394,6 +399,7 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti schemaGetter, tableConfig.getChangelogImage(), sharedRocksDBRateLimiter, + kvFlushScheduler, autoIncrementManager); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 0683f5e054..5eb6d694fd 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -25,6 +25,7 @@ import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.KvStorageException; import org.apache.fluss.exception.SchemaNotExistException; +import org.apache.fluss.exception.StorageBackpressureException; import org.apache.fluss.memory.MemorySegmentPool; import org.apache.fluss.metadata.ChangelogImage; import org.apache.fluss.metadata.DeleteBehavior; @@ -51,6 +52,7 @@ import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer; +import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.PreparedFlush; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; @@ -120,6 +122,8 @@ public final class KvTablet { private final RocksDBKv rocksDBKv; private final KvPreWriteBuffer kvPreWriteBuffer; private final TabletServerMetricGroup serverMetricGroup; + private final KvFlushScheduler kvFlushScheduler; + private final boolean closeFlushScheduler; // A lock that guards all modifications to the kv. private final ReadWriteLock kvLock = new ReentrantReadWriteLock(); @@ -147,6 +151,18 @@ public final class KvTablet { */ private volatile long flushedLogOffset = 0; + @GuardedBy("kvLock") + private FlushState flushState = FlushState.IDLE; + + @GuardedBy("kvLock") + private long requestedFlushOffset = 0; + + private volatile boolean flushBackpressured = false; + + private volatile @Nullable Runnable flushCompleteListener; + + private volatile @Nullable FatalErrorHandler asyncFatalErrorHandler; + private volatile long rowCount; @GuardedBy("kvLock") @@ -169,6 +185,8 @@ private KvTablet( SchemaGetter schemaGetter, ChangelogImage changelogImage, @Nullable RocksDBStatistics rocksDBStatistics, + KvFlushScheduler kvFlushScheduler, + boolean closeFlushScheduler, AutoIncrementManager autoIncrementManager) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; @@ -177,6 +195,8 @@ private KvTablet( this.rocksDBKv = rocksDBKv; this.writeBatchSize = writeBatchSize; this.serverMetricGroup = serverMetricGroup; + this.kvFlushScheduler = kvFlushScheduler; + this.closeFlushScheduler = closeFlushScheduler; this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter(), serverMetricGroup); this.logFormat = logFormat; this.arrowWriterProvider = new ArrowWriterPool(arrowBufferAllocator); @@ -212,6 +232,83 @@ public static KvTablet create( RateLimiter sharedRateLimiter, AutoIncrementManager autoIncrementManager) throws IOException { + return create( + tablePath, + tableBucket, + logTablet, + kvTabletDir, + serverConf, + serverMetricGroup, + arrowBufferAllocator, + memorySegmentPool, + kvFormat, + rowMerger, + arrowCompressionInfo, + schemaGetter, + changelogImage, + sharedRateLimiter, + new KvFlushScheduler(serverConf), + true, + autoIncrementManager); + } + + public static KvTablet create( + PhysicalTablePath tablePath, + TableBucket tableBucket, + LogTablet logTablet, + File kvTabletDir, + Configuration serverConf, + TabletServerMetricGroup serverMetricGroup, + BufferAllocator arrowBufferAllocator, + MemorySegmentPool memorySegmentPool, + KvFormat kvFormat, + RowMerger rowMerger, + ArrowCompressionInfo arrowCompressionInfo, + SchemaGetter schemaGetter, + ChangelogImage changelogImage, + RateLimiter sharedRateLimiter, + KvFlushScheduler kvFlushScheduler, + AutoIncrementManager autoIncrementManager) + throws IOException { + return create( + tablePath, + tableBucket, + logTablet, + kvTabletDir, + serverConf, + serverMetricGroup, + arrowBufferAllocator, + memorySegmentPool, + kvFormat, + rowMerger, + arrowCompressionInfo, + schemaGetter, + changelogImage, + sharedRateLimiter, + kvFlushScheduler, + false, + autoIncrementManager); + } + + private static KvTablet create( + PhysicalTablePath tablePath, + TableBucket tableBucket, + LogTablet logTablet, + File kvTabletDir, + Configuration serverConf, + TabletServerMetricGroup serverMetricGroup, + BufferAllocator arrowBufferAllocator, + MemorySegmentPool memorySegmentPool, + KvFormat kvFormat, + RowMerger rowMerger, + ArrowCompressionInfo arrowCompressionInfo, + SchemaGetter schemaGetter, + ChangelogImage changelogImage, + RateLimiter sharedRateLimiter, + KvFlushScheduler kvFlushScheduler, + boolean closeFlushScheduler, + AutoIncrementManager autoIncrementManager) + throws IOException { RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter); // Create RocksDB statistics accessor (will be registered to TableMetricGroup by Replica) @@ -243,6 +340,8 @@ public static KvTablet create( schemaGetter, changelogImage, rocksDBStatistics, + kvFlushScheduler, + closeFlushScheduler, autoIncrementManager); } @@ -254,9 +353,12 @@ private static RocksDBKv buildRocksDBKv( new RocksDBResourceContainer(configuration, kvDir, true, sharedRateLimiter); RocksDBKvBuilder rocksDBKvBuilder = new RocksDBKvBuilder( - kvDir, - rocksDBResourceContainer, - rocksDBResourceContainer.getColumnOptions()); + kvDir, + rocksDBResourceContainer, + rocksDBResourceContainer.getColumnOptions()) + .setFlussL0SlowdownTrigger( + configuration.get( + ConfigOptions.KV_BACKPRESSURE_L0_SLOWDOWN_TRIGGER)); return rocksDBKvBuilder.build(); } @@ -376,6 +478,22 @@ public LogAppendInfo putAsLeader( () -> { rocksDBKv.checkIfRocksDBClosed(); + // Write-path admission gate: reject the request if the accumulated + // pre-write buffer would produce enough L0 SSTs (upon flush) to reach + // the storage engine's slowdown trigger. This combines an L0 headroom + // check with a buffer-byte budget so that a single large flush can + // never breach the RocksDB slowdown threshold. + if (rocksDBKv.wouldExceedFlushBudget(kvPreWriteBuffer.pendingFlushBytes())) { + requestFlushInternal( + Math.max(requestedFlushOffset, logTablet.getHighWatermark())); + throw new StorageBackpressureException( + String.format( + "Write rejected for %s: flush budget exceeded " + + "(L0 headroom or buffer size limit reached). " + + "Retry after backoff.", + tableBucket)); + } + SchemaInfo schemaInfo = schemaGetter.getLatestSchemaInfo(); Schema latestSchema = schemaInfo.getSchema(); short latestSchemaId = (short) schemaInfo.getSchemaId(); @@ -668,41 +786,230 @@ private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Except } public void flush(long exclusiveUpToLogOffset, FatalErrorHandler fatalErrorHandler) { - // todo: need to introduce a backpressure mechanism - // to avoid too much records in kvPreWriteBuffer + boolean madeProgress = + inWriteLock( + kvLock, + () -> { + // when kv manager is closed which means kv tablet is already closed, + // but the tablet server may still handle fetch log request from + // follower as the tablet rpc service is closed asynchronously, then + // update the watermark and then flush the pre-write buffer. + + // In such case, if the tablet is already closed, we won't flush + // pre-write buffer, just warning it. + if (isClosed) { + LOG.warn( + "The kv tablet for {} is already closed, ignore flushing kv pre-write buffer.", + tableBucket); + return false; + } + + // Flush-path predictive gate: skip the flush if it would push L0 to + // or beyond the storage engine's slowdown trigger. The pre-write + // buffer is left intact; the next flush attempt after L0 drops will + // succeed and HW will resume. + if (rocksDBKv.wouldExceedSlowdownTriggerOnFlush()) { + flushBackpressured = true; + return false; + } + + try { + int rowCountDiff = kvPreWriteBuffer.flush(exclusiveUpToLogOffset); + if (exclusiveUpToLogOffset > flushedLogOffset) { + flushedLogOffset = exclusiveUpToLogOffset; + } + if (rowCount != ROW_COUNT_DISABLED) { + // row count is enabled, we update the row count after flush. + long currentRowCount = rowCount; + rowCount = currentRowCount + rowCountDiff; + } + flushBackpressured = false; + return true; + } catch (Throwable t) { + fatalErrorHandler.onFatalError( + new KvStorageException( + "Failed to flush kv pre-write buffer.")); + return false; + } + }); + if (madeProgress) { + notifyFlushComplete(); + } + } + + public void requestFlush(long exclusiveUpToLogOffset, FatalErrorHandler fatalErrorHandler) { + asyncFatalErrorHandler = fatalErrorHandler; + inWriteLock(kvLock, () -> requestFlushInternal(exclusiveUpToLogOffset)); + } + + public void setFlushCompleteListener(@Nullable Runnable flushCompleteListener) { + this.flushCompleteListener = flushCompleteListener; + } + + public long getFlushedLogOffset() { + return flushedLogOffset; + } + + boolean usePressureFlushLane() { + return currentPressure() > 0f; + } + + long flushScore() { + return inReadLock( + kvLock, + () -> { + long offsetLag = Math.max(0L, requestedFlushOffset - flushedLogOffset); + return offsetLag + kvPreWriteBuffer.activeFlushBytes(); + }); + } + + @GuardedBy("kvLock") + private void requestFlushInternal(long exclusiveUpToLogOffset) { + if (isClosed || exclusiveUpToLogOffset <= flushedLogOffset) { + return; + } + if (exclusiveUpToLogOffset > requestedFlushOffset) { + requestedFlushOffset = exclusiveUpToLogOffset; + } + if (flushState == FlushState.IDLE) { + flushState = FlushState.QUEUED; + kvFlushScheduler.enqueue(this); + } + } + + void requestFlushRetry() { inWriteLock( kvLock, () -> { - // when kv manager is closed which means kv tablet is already closed, - // but the tablet server may still handle fetch log request from follower - // as the tablet rpc service is closed asynchronously, then update the watermark - // and then flush the pre-write buffer. + if (!isClosed && flushState == FlushState.L0_BLOCKED) { + flushState = FlushState.QUEUED; + kvFlushScheduler.enqueue(this); + } + }); + } - // In such case, if the tablet is already closed, we won't flush pre-write - // buffer, just warning it. + void runScheduledFlush() { + boolean madeProgress = false; + PreparedFlush preparedFlush = null; + boolean flushCompleted = false; + try { + while (true) { + preparedFlush = prepareScheduledFlush(); + flushCompleted = false; + if (preparedFlush == null) { + break; + } + if (!preparedFlush.isEmpty()) { + writePreparedFlush(preparedFlush); + } + completeScheduledFlush(preparedFlush); + flushCompleted = true; + madeProgress = true; + } + } catch (Throwable t) { + if (preparedFlush != null && !flushCompleted) { + abortScheduledFlush(preparedFlush); + } + failScheduledFlush(t); + } finally { + if (madeProgress || isFlushBackpressured()) { + notifyFlushComplete(); + } + } + } + + private @Nullable PreparedFlush prepareScheduledFlush() { + return inWriteLock( + kvLock, + () -> { if (isClosed) { - LOG.warn( - "The kv tablet for {} is already closed, ignore flushing kv pre-write buffer.", - tableBucket); - } else { - try { - int rowCountDiff = kvPreWriteBuffer.flush(exclusiveUpToLogOffset); - if (exclusiveUpToLogOffset > flushedLogOffset) { - flushedLogOffset = exclusiveUpToLogOffset; - } - if (rowCount != ROW_COUNT_DISABLED) { - // row count is enabled, we update the row count after flush. - long currentRowCount = rowCount; - rowCount = currentRowCount + rowCountDiff; - } - } catch (Throwable t) { - fatalErrorHandler.onFatalError( - new KvStorageException("Failed to flush kv pre-write buffer.")); + flushState = FlushState.IDLE; + return null; + } + long targetOffset = requestedFlushOffset; + if (targetOffset <= flushedLogOffset) { + flushState = FlushState.IDLE; + return null; + } + if (rocksDBKv.wouldExceedSlowdownTriggerOnFlush()) { + flushState = FlushState.L0_BLOCKED; + flushBackpressured = true; + kvFlushScheduler.retryLater(this); + return null; + } + flushState = FlushState.RUNNING; + flushBackpressured = false; + PreparedFlush preparedFlush = kvPreWriteBuffer.prepareFlush(targetOffset); + if (preparedFlush.isEmpty()) { + flushedLogOffset = targetOffset; + } + return preparedFlush; + }); + } + + private void writePreparedFlush(PreparedFlush preparedFlush) throws Exception { + try (ResourceGuard.Lease lease = rocksDBKv.getResourceGuard().acquireResource(); + KvBatchWriter kvBatchWriter = createKvBatchWriter()) { + for (KvPreWriteBuffer.KvEntry entry : preparedFlush.entries()) { + KvPreWriteBuffer.Value value = entry.getValue(); + if (value.get() == null) { + kvBatchWriter.delete(entry.getKey().get()); + } else { + kvBatchWriter.put(entry.getKey().get(), value.get()); + } + } + kvBatchWriter.flush(); + } + } + + private void completeScheduledFlush(PreparedFlush preparedFlush) { + inWriteLock( + kvLock, + () -> { + if (!isClosed) { + int rowCountDiff = kvPreWriteBuffer.completeFlush(preparedFlush); + if (preparedFlush.exclusiveUpToLogSequenceNumber() > flushedLogOffset) { + flushedLogOffset = preparedFlush.exclusiveUpToLogSequenceNumber(); } + if (rowCount != ROW_COUNT_DISABLED) { + rowCount += rowCountDiff; + } + flushBackpressured = false; + } + if (requestedFlushOffset > flushedLogOffset && !isClosed) { + flushState = FlushState.RUNNING; + } else { + flushState = FlushState.IDLE; } }); } + private void abortScheduledFlush(PreparedFlush preparedFlush) { + inWriteLock(kvLock, () -> kvPreWriteBuffer.abortFlush(preparedFlush)); + } + + private void failScheduledFlush(Throwable t) { + inWriteLock( + kvLock, + () -> { + flushState = FlushState.IDLE; + FatalErrorHandler fatalErrorHandler = asyncFatalErrorHandler; + if (fatalErrorHandler != null) { + fatalErrorHandler.onFatalError( + new KvStorageException("Failed to flush kv pre-write buffer.", t)); + } else { + LOG.error("Failed to flush kv pre-write buffer for {}.", tableBucket, t); + } + }); + } + + private void notifyFlushComplete() { + Runnable listener = flushCompleteListener; + if (listener != null) { + listener.run(); + } + } + /** put key,value,logOffset into pre-write buffer directly. */ void putToPreWriteBuffer( ChangeType changeType, byte[] key, @Nullable byte[] value, long logOffset) { @@ -843,19 +1150,25 @@ public KvBatchWriter createKvBatchWriter() { public void close() throws Exception { LOG.info("close kv tablet {} for table {}.", tableBucket, physicalPath); - inWriteLock( - kvLock, - () -> { - if (isClosed) { - return; - } - // Note: RocksDB metrics lifecycle is managed by TableMetricGroup - // No need to close it here - if (rocksDBKv != null) { - rocksDBKv.close(); - } - isClosed = true; - }); + boolean shouldClose = + inWriteLock( + kvLock, + () -> { + if (isClosed) { + return false; + } + isClosed = true; + flushState = FlushState.IDLE; + return true; + }); + if (shouldClose && closeFlushScheduler) { + kvFlushScheduler.close(); + } + if (shouldClose && rocksDBKv != null) { + // Note: RocksDB metrics lifecycle is managed by TableMetricGroup. + // Close outside kvLock so an async flush can finish and release its RocksDB lease. + rocksDBKv.close(); + } } /** Completely delete the kv directory and all contents form the file system with no delay. */ @@ -894,4 +1207,29 @@ KvPreWriteBuffer getKvPreWriteBuffer() { public RocksDBKv getRocksDBKv() { return rocksDBKv; } + + /** + * Returns the current normalized backpressure pressure in {@code [0, 1)}. Sampled by {@code + * DelayedWrite#onComplete} so the value reflects the post-flush L0 state when the response is + * about to be sent back to the client. + */ + public float currentPressure() { + return rocksDBKv.currentPressure(); + } + + /** + * Returns whether the most recent flush attempt was skipped by the predictive L0 gate. While + * {@code true}, the pre-write buffer holds unflushed data and the high watermark must not + * advance. + */ + public boolean isFlushBackpressured() { + return flushBackpressured; + } + + private enum FlushState { + IDLE, + QUEUED, + RUNNING, + L0_BLOCKED + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java index e14d09eacf..d939bba415 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java @@ -29,11 +29,13 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -101,6 +103,12 @@ public class KvPreWriteBuffer implements AutoCloseable { // the max LSN in the buffer private long maxLogSequenceNumber = -1; + // Accumulated byte size of entries that are not yet prepared by an async flush. + private long activeFlushBytes = 0; + + // Accumulated byte size of entries prepared by an async flush but not completed yet. + private long preparedFlushBytes = 0; + public KvPreWriteBuffer( KvBatchWriter kvBatchWriter, TabletServerMetricGroup serverMetricGroup) { this.kvBatchWriter = kvBatchWriter; @@ -158,6 +166,8 @@ private void doPut(ChangeType changeType, Key key, Value value, long lsn) { allKvEntries.addLast(kvEntry); // update the max lsn maxLogSequenceNumber = lsn; + // track accumulated bytes for flush budget gating + activeFlushBytes += entryBytes(key, value); } /** @@ -194,10 +204,21 @@ public void truncateTo(long targetLogSequenceNumber, TruncateReason truncateReas break; } descIter.remove(); + if (entry.state == EntryState.PREPARED) { + throw new IllegalStateException( + "Cannot truncate prepared pre-write entry. logSequenceNumber=" + + entry.getLogSequenceNumber() + + ", targetLogSequenceNumber=" + + targetLogSequenceNumber); + } + activeFlushBytes -= entryBytes(entry.getKey(), entry.getValue()); boolean removed = kvEntryMap.remove(entry.getKey(), entry); // if the latest entry is removed, we need to rollback the previous entry to the map - if (removed && entry.previousEntry != null) { - kvEntryMap.put(entry.getKey(), entry.previousEntry); + if (removed) { + KvEntry previousEntry = previousEntryInBuffer(entry.previousEntry); + if (previousEntry != null) { + kvEntryMap.put(entry.getKey(), previousEntry); + } } } if (!descIter.hasNext()) { @@ -225,6 +246,8 @@ public int flush(long exclusiveUpToLogSequenceNumber) throws IOException { // first remove the entry from the list it.remove(); + activeFlushBytes -= entryBytes(entry.getKey(), entry.getValue()); + entry.state = EntryState.FLUSHED; // then write data using write batch writer Value value = entry.getValue(); @@ -256,6 +279,99 @@ public int flush(long exclusiveUpToLogSequenceNumber) throws IOException { return rowCountDiff; } + /** Returns the accumulated byte size of all entries waiting to be flushed. */ + public long pendingFlushBytes() { + return activeFlushBytes + preparedFlushBytes; + } + + /** Returns the byte size of entries that have not been prepared by an async flush yet. */ + public long activeFlushBytes() { + return activeFlushBytes; + } + + /** Returns the byte size of entries prepared by an async flush but not completed yet. */ + public long preparedFlushBytes() { + return preparedFlushBytes; + } + + /** + * Prepares a prefix of entries for asynchronous flush without removing them from the buffer. + * + *

The prepared entries remain visible through {@link #get(Key)}, so foreground writes can + * still derive correct CDC records while the background flush writes RocksDB. + */ + public PreparedFlush prepareFlush(long exclusiveUpToLogSequenceNumber) { + ArrayList entries = new ArrayList<>(); + int rowCountDiff = 0; + long preparedBytes = 0L; + for (KvEntry entry : allKvEntries) { + if (entry.getLogSequenceNumber() >= exclusiveUpToLogSequenceNumber) { + break; + } + if (entry.state == EntryState.PREPARED) { + throw new IllegalStateException( + "Found an already prepared entry while preparing async flush."); + } + entry.state = EntryState.PREPARED; + entries.add(entry); + long entryBytes = entryBytes(entry.getKey(), entry.getValue()); + preparedBytes += entryBytes; + activeFlushBytes -= entryBytes; + preparedFlushBytes += entryBytes; + if (entry.getChangeType() == ChangeType.INSERT) { + rowCountDiff += 1; + } else if (entry.getChangeType() == ChangeType.DELETE) { + rowCountDiff -= 1; + } + } + return new PreparedFlush( + exclusiveUpToLogSequenceNumber, entries, rowCountDiff, preparedBytes); + } + + /** Completes a prepared async flush and removes flushed entries from the buffer. */ + public int completeFlush(PreparedFlush preparedFlush) { + for (KvEntry entry : preparedFlush.entries) { + KvEntry first = allKvEntries.removeFirst(); + if (first != entry) { + throw new IllegalStateException("Prepared flush entries are no longer a prefix."); + } + if (entry.state != EntryState.PREPARED) { + throw new IllegalStateException("Prepared flush entry is not in PREPARED state."); + } + entry.state = EntryState.FLUSHED; + preparedFlushBytes -= entryBytes(entry.getKey(), entry.getValue()); + kvEntryMap.remove(entry.getKey(), entry); + } + if (allKvEntries.isEmpty()) { + maxLogSequenceNumber = -1; + } + return preparedFlush.rowCountDiff; + } + + /** Aborts a prepared async flush and makes its entries active again. */ + public void abortFlush(PreparedFlush preparedFlush) { + for (KvEntry entry : preparedFlush.entries) { + if (entry.state == EntryState.PREPARED) { + entry.state = EntryState.ACTIVE; + long entryBytes = entryBytes(entry.getKey(), entry.getValue()); + preparedFlushBytes -= entryBytes; + activeFlushBytes += entryBytes; + } + } + } + + private static long entryBytes(Key key, Value value) { + return key.key.length + (value.value != null ? value.value.length : 0); + } + + private static KvEntry previousEntryInBuffer(@Nullable KvEntry entry) { + KvEntry current = entry; + while (current != null && current.state == EntryState.FLUSHED) { + current = current.previousEntry; + } + return current; + } + @VisibleForTesting public Map getKvEntryMap() { return kvEntryMap; @@ -307,6 +423,8 @@ public static class KvEntry { // the previous mapped value in the buffer before this key-value put @Nullable private final KvEntry previousEntry; + private EntryState state = EntryState.ACTIVE; + public static KvEntry of(ChangeType changeType, Key key, Value value, long sequenceNumber) { return new KvEntry(changeType, key, value, sequenceNumber, null); } @@ -387,6 +505,51 @@ public String toString() { } } + /** Prepared entries for an asynchronous flush. */ + public static final class PreparedFlush { + private final long exclusiveUpToLogSequenceNumber; + private final List entries; + private final int rowCountDiff; + private final long preparedBytes; + + private PreparedFlush( + long exclusiveUpToLogSequenceNumber, + List entries, + int rowCountDiff, + long preparedBytes) { + this.exclusiveUpToLogSequenceNumber = exclusiveUpToLogSequenceNumber; + this.entries = entries; + this.rowCountDiff = rowCountDiff; + this.preparedBytes = preparedBytes; + } + + public long exclusiveUpToLogSequenceNumber() { + return exclusiveUpToLogSequenceNumber; + } + + public List entries() { + return entries; + } + + public int rowCountDiff() { + return rowCountDiff; + } + + public long preparedBytes() { + return preparedBytes; + } + + public boolean isEmpty() { + return entries.isEmpty(); + } + } + + private enum EntryState { + ACTIVE, + PREPARED, + FLUSHED + } + /** A key wrapper to wrap a byte array with overriding the hashCode and equals method. */ public static class Key { private final byte[] key; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java index f3998f4435..0e728d3f01 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java @@ -34,6 +34,8 @@ import org.rocksdb.RocksIterator; import org.rocksdb.Statistics; import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -44,6 +46,11 @@ /** A wrapper for the operation of {@link org.rocksdb.RocksDB}. */ public class RocksDBKv implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKv.class); + + /** RocksDB property name for the number of SST files at level 0 (column-family scoped). */ + private static final String NUM_FILES_AT_LEVEL0 = "rocksdb.num-files-at-level0"; + /** The container of RocksDB option factory and predefined options. */ private final RocksDBResourceContainer optionsContainer; @@ -70,6 +77,30 @@ public class RocksDBKv implements AutoCloseable { /** RocksDB Statistics for metrics collection. */ private final @Nullable Statistics statistics; + /** L0 file count at which RocksDB starts throttling writes. Read from ColumnFamilyOptions. */ + private final int level0SlowdownWritesTrigger; + + /** + * Maximum number of L0 files that could be produced between a gate check and the actual + * completion of all pending memtable flushes. Derived from {@code maxWriteBufferNumber}: at + * most 1 file from the flush triggered by this write, plus up to {@code maxWriteBufferNumber - + * 1} immutable memtables already queued for flush that have not yet materialized as L0 SSTs. + * The gate predicate uses this as the headroom to guarantee RocksDB's slowdown trigger is never + * reached. + */ + private final int maxPendingFlushL0Files; + + /** RocksDB memtable size in bytes. Used to compute the flush budget for the write path. */ + private final long writeBufferSize; + + /** + * L0 file count at which Fluss starts emitting proactive backpressure signals (piggybacked on + * PutKv responses) so clients can throttle before the storage engine blocks writes. Strictly + * below {@link #level0SlowdownWritesTrigger}; when {@code >=} the RocksDB L0 slowdown trigger, + * proactive backpressure is treated as misconfigured and disabled. + */ + private final int flussL0SlowdownTrigger; + // mark whether this kv is already closed and prevent duplicate closing private volatile boolean closed = false; @@ -78,13 +109,21 @@ public RocksDBKv( RocksDB db, ResourceGuard rocksDBResourceGuard, ColumnFamilyHandle defaultColumnFamilyHandle, - @Nullable Statistics statistics) { + @Nullable Statistics statistics, + int level0SlowdownWritesTrigger, + int maxWriteBufferNumber, + long writeBufferSize, + int flussL0SlowdownTrigger) { this.optionsContainer = optionsContainer; this.db = db; this.rocksDBResourceGuard = rocksDBResourceGuard; this.writeOptions = optionsContainer.getWriteOptions(); this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; this.statistics = statistics; + this.level0SlowdownWritesTrigger = level0SlowdownWritesTrigger; + this.maxPendingFlushL0Files = maxWriteBufferNumber; + this.writeBufferSize = writeBufferSize; + this.flussL0SlowdownTrigger = flussL0SlowdownTrigger; } public ResourceGuard getResourceGuard() { @@ -230,4 +269,115 @@ public Cache getBlockCache() { public ColumnFamilyHandle getDefaultColumnFamilyHandle() { return defaultColumnFamilyHandle; } + + /** + * Returns whether executing one more flush could push L0 file count to or beyond the storage + * engine's slowdown trigger, accounting for pending memtable flushes that have not yet + * materialized as L0 SSTs. + * + *

Used exclusively by the flush path ({@code KvTablet#flush}). The write path uses + * {@link #wouldExceedFlushBudget(long)} which additionally accounts for buffer accumulation. + * + *

The headroom ({@link #maxPendingFlushL0Files}) is derived from {@code + * maxWriteBufferNumber}: 1 file from this flush + up to {@code maxWriteBufferNumber - 1} + * immutable memtables already queued. This guarantees the slowdown trigger is never reached + * regardless of timing between the gate check and background flush completion. + */ + public boolean wouldExceedSlowdownTriggerOnFlush() { + if (level0SlowdownWritesTrigger <= 0) { + return false; + } + return currentL0FileCount() + maxPendingFlushL0Files >= level0SlowdownWritesTrigger; + } + + /** + * Returns whether accepting more writes would exceed the safe flush budget. Combines two checks + * with a single L0 read: + * + *

    + *
  1. L0 gate: {@code currentL0 + maxPendingFlushL0Files >= slowdownTrigger} + *
  2. Buffer budget: {@code pendingBufferBytes >= remainingL0Slots * writeBufferSize} + *
+ * + *

This guarantees that no flush of the accumulated pre-write buffer can trigger RocksDB's + * {@code level0_slowdown_writes_trigger}, regardless of how many memtable switches the + * WriteBatch induces. + */ + public boolean wouldExceedFlushBudget(long pendingBufferBytes) { + if (level0SlowdownWritesTrigger <= 0) { + return false; + } + long l0 = currentL0FileCount(); + long remainingSlots = (long) level0SlowdownWritesTrigger - l0 - maxPendingFlushL0Files - 1; + if (remainingSlots <= 0) { + return true; + } + if (writeBufferSize <= 0) { + return false; + } + return pendingBufferBytes >= remainingSlots * writeBufferSize; + } + + /** + * Returns the current normalized backpressure pressure in {@code [0, 1)} for piggyback on write + * responses. Mapping: + * + *

    + *
  • {@code l0 < flussL0SlowdownTrigger}: returns {@code 0}. + *
  • {@code flussL0SlowdownTrigger <= l0 < level0SlowdownWritesTrigger}: returns {@code (l0 + * - flussL0SlowdownTrigger) / (level0SlowdownWritesTrigger - flussL0SlowdownTrigger)}. + *
  • {@code l0 >= level0SlowdownWritesTrigger}: clamped to a value strictly below 1. + *
+ * + *

Never throws; hard rejection is the responsibility of {@link + * #wouldExceedSlowdownTriggerOnFlush()} at the write/flush gates. + */ + public float currentPressure() { + if (level0SlowdownWritesTrigger <= flussL0SlowdownTrigger) { + return 0f; + } + long l0Files = currentL0FileCount(); + if (l0Files < flussL0SlowdownTrigger) { + return 0f; + } + int window = level0SlowdownWritesTrigger - flussL0SlowdownTrigger; + long offset = Math.min(l0Files - flussL0SlowdownTrigger, (long) window - 1); + return (float) offset / window; + } + + /** + * Reads the current L0 file count via {@link RocksDB#getProperty(ColumnFamilyHandle, String)}. + * + *

Note: {@code rocksdb.num-files-at-level} is a parametric string property in + * RocksDB, not registered as an int property. {@code getLongProperty} (which maps to C++ {@code + * GetIntProperty}) returns NotFound for it. We therefore use the string accessor and parse the + * result. Returns {@code 0} when the property is unavailable so the caller treats it as "no + * pressure". + * + *

The native call is fenced by {@link #rocksDBResourceGuard} so that {@link #close()} blocks + * until any in-flight pressure sampling completes; touching the RocksDB native handle after + * disposal would otherwise SIGABRT the JVM. When the guard is already closed, returns {@code 0} + * so callers degrade to "no pressure" instead of failing the request. + */ + private long currentL0FileCount() { + final ResourceGuard.Lease lease; + try { + lease = rocksDBResourceGuard.acquireResource(); + } catch (IOException acquireFailed) { + // RocksDB is already closed (or being closed); treat as no pressure. + return 0L; + } + try { + String value = db.getProperty(defaultColumnFamilyHandle, NUM_FILES_AT_LEVEL0); + if (value == null || value.isEmpty()) { + return 0L; + } + return Long.parseLong(value); + } catch (RocksDBException e) { + LOG.warn("Failed to query L0 file count for backpressure", e); + return 0L; + } finally { + lease.close(); + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java index 92e86fb964..84cadc0cf7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java @@ -60,6 +60,14 @@ public class RocksDBKvBuilder { /** Path where this configured instance stores its RocksDB database. */ private final File instanceRocksDBPath; + /** + * L0 file count at which Fluss starts emitting proactive piggyback backpressure. Default {@link + * Integer#MAX_VALUE} effectively disables the proactive throttle signal; the storage engine's + * own L0 slowdown trigger still produces a hard rejection ({@link + * org.apache.fluss.exception.StorageBackpressureException}) at the upper bound. + */ + private int flussL0SlowdownTrigger = Integer.MAX_VALUE; + /** The number of (re)tries for loading the RocksDB JNI library. */ private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3; @@ -76,6 +84,18 @@ public RocksDBKvBuilder( this.instanceRocksDBPath = getInstanceRocksDBPath(instanceBasePath); } + /** + * Sets the L0 file count at which Fluss starts emitting proactive backpressure signals + * (piggybacked on PutKv responses) so clients can throttle before the storage engine blocks + * writes. Should be strictly less than the column family's {@code + * level0_slowdown_writes_trigger}. Defaults to {@link Integer#MAX_VALUE} which disables + * proactive backpressure entirely. + */ + public RocksDBKvBuilder setFlussL0SlowdownTrigger(int flussL0SlowdownTrigger) { + this.flussL0SlowdownTrigger = flussL0SlowdownTrigger; + return this; + } + public RocksDBKv build() throws KvBuildingException { ColumnFamilyHandle defaultColumnFamilyHandle = null; RocksDB db = null; @@ -112,7 +132,11 @@ public RocksDBKv build() throws KvBuildingException { db, rocksDBResourceGuard, defaultColumnFamilyHandle, - optionsContainer.getStatistics()); + optionsContainer.getStatistics(), + columnFamilyOptions.level0SlowdownWritesTrigger(), + columnFamilyOptions.maxWriteBufferNumber(), + columnFamilyOptions.writeBufferSize(), + flussL0SlowdownTrigger); } void prepareDirectories() throws IOException { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java index ed5b111019..9f52809fc0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java @@ -50,6 +50,13 @@ public class BucketMetricGroup extends AbstractMetricGroup { // RocksDB statistics for this bucket (null for non-KV tables) private volatile @Nullable RocksDBStatistics rocksDBStatistics; + // Latest normalized KV backpressure level observed on this bucket, in [0, 1]. Only meaningful + // for KV (primary-key) tables: written by the pre-write KV backpressure gate, read by the + // table-level aggregating gauges. Defaults to 0 ("no pressure") for non-KV tables and for KV + // buckets that haven't seen a write yet, so the table-level aggregation is well-defined in + // both cases. + private volatile float kvBackpressureLevel = 0f; + public BucketMetricGroup( MetricRegistry registry, @Nullable String partitionName, @@ -124,6 +131,25 @@ public RocksDBStatistics getRocksDBStatistics() { return rocksDBStatistics; } + /** + * Record the latest normalized KV backpressure level observed on this bucket. Called from the + * pre-write KV backpressure gate so table-level aggregating gauges can read it without going + * through RocksDB again. Only meaningful for KV (primary-key) tables. + * + * @param level normalized pressure in {@code [0, 1]} + */ + public void recordKvBackpressureLevel(float level) { + this.kvBackpressureLevel = level; + } + + /** + * @return the latest KV backpressure level recorded on this bucket, in {@code [0, 1]}. Always 0 + * for non-KV tables. + */ + public float getKvBackpressureLevel() { + return kvBackpressureLevel; + } + @Override public void close() { // Clean up RocksDB statistics before closing the metric group diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java index b103dc1596..1452ccc06c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java @@ -56,6 +56,12 @@ public class TableMetricGroup extends AbstractMetricGroup { // table-level metrics for kv, will be null if the table isn't a kv table private final @Nullable KvMetricGroup kvMetrics; + // Cumulative count of write requests rejected by KV backpressure + // (StorageBackpressureException), + // aggregated across all buckets of this table. Null when the table isn't a KV table, since + // backpressure is only emitted by primary-key tables backed by RocksDB. + private final @Nullable Counter kvBackpressureRejectedRequests; + public TableMetricGroup( MetricRegistry registry, TablePath tablePath, @@ -74,10 +80,15 @@ public TableMetricGroup( logMetrics = new LogMetricGroup(this, TabletType.CDC_LOG); // Register RocksDB aggregated metrics for kv tables registerRocksDBMetrics(); + // Register KV backpressure aggregated metrics for kv tables + kvBackpressureRejectedRequests = new ThreadSafeSimpleCounter(); + counter(MetricNames.KV_BACKPRESSURE_REJECTIONS_TOTAL, kvBackpressureRejectedRequests); + registerKvBackpressureGauges(); } else { // otherwise, create log produce metrics kvMetrics = null; logMetrics = new LogMetricGroup(this, TabletType.LOG); + kvBackpressureRejectedRequests = null; } } @@ -226,6 +237,17 @@ public Counter failedPrefixLookupRequests() { } } + /** + * Increment the table-level counter of write requests rejected by KV backpressure ({@code + * StorageBackpressureException}). Called by the pre-write KV backpressure gate when the storage + * engine has crossed its hard-rejection trigger. No-op for non-KV tables. + */ + public void incKvBackpressureRejectedRequests() { + if (kvBackpressureRejectedRequests != null) { + kvBackpressureRejectedRequests.inc(); + } + } + // ------------------------------------------------------------------------ // bucket groups // ------------------------------------------------------------------------ @@ -330,7 +352,6 @@ private void registerRocksDBMetrics() { .mapToLong(RocksDBStatistics::getCompactionTimeMicros) .max() .orElse(0L)); - // Sum aggregation metrics - track the total value across all buckets gauge( MetricNames.ROCKSDB_BYTES_READ_TOTAL, @@ -390,6 +411,39 @@ private void registerRocksDBMetrics() { .sum()); } + /** + * Register table-level KV backpressure gauges. Reports two orthogonal dimensions of KV + * backpressure state aggregated from per-bucket measurements (KV tables only): + * + *

    + *
  • {@code max-pressure}: peak normalized pressure across all buckets, in {@code [0, + * 1]}. Reflects how close the hottest bucket is to the storage engine's hard-rejection + * trigger. + *
  • {@code affected-buckets}: number of buckets currently under KV backpressure + * (i.e. pressure {@code > 0}). Reflects how broadly backpressure has spread. + *
+ * + *

Per-bucket pressure values are written by the pre-write KV backpressure gate; the gauges + * read them via {@link BucketMetricGroup#getKvBackpressureLevel()} without going through + * RocksDB. + */ + private void registerKvBackpressureGauges() { + gauge( + MetricNames.KV_BACKPRESSURE_MAX_PRESSURE, + () -> + (float) + buckets.values().stream() + .mapToDouble(BucketMetricGroup::getKvBackpressureLevel) + .max() + .orElse(0d)); + gauge( + MetricNames.KV_BACKPRESSURE_AFFECTED_BUCKETS, + () -> + buckets.values().stream() + .filter(b -> b.getKvBackpressureLevel() > 0f) + .count()); + } + /** Metric group for specific kind of tablet of a table. */ private static class TabletMetricGroup extends AbstractMetricGroup { private final TabletType tabletType; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index e11dd69c14..ac851b8958 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -735,7 +735,36 @@ private void dropKv() { private void mayFlushKv(long newHighWatermark) { KvTablet kvTablet = this.kvTablet; if (kvTablet != null) { - kvTablet.flush(newHighWatermark, fatalErrorHandler); + kvTablet.requestFlush(newHighWatermark, fatalErrorHandler); + // If the predictive flush gate rejected this flush, eagerly wake DelayedWrite + // operations waiting on this bucket so they observe the backpressured state and + // surface STORAGE_BACKPRESSURE_EXCEPTION to clients without waiting for the next + // ack-driven tryComplete. + if (kvTablet.isFlushBackpressured()) { + delayedWriteManager.checkAndComplete(new DelayedTableBucketKey(tableBucket)); + } + } + } + + private void onKvFlushComplete() { + boolean leaderHWIncremented = + inWriteLock( + leaderIsrUpdateLock, + () -> { + if (!isLeader()) { + return false; + } + try { + return maybeIncrementLeaderHW(logTablet, clock.milliseconds()); + } catch (IOException e) { + fatalErrorHandler.onFatalError(e); + return false; + } + }); + if (leaderHWIncremented) { + tryCompleteDelayedOperations(); + } else { + delayedWriteManager.checkAndComplete(new DelayedTableBucketKey(tableBucket)); } } @@ -835,7 +864,10 @@ private Optional initKvTablet() { tableBucket, endTime - startTime); - // Register RocksDB statistics to BucketMetricGroup + if (kvTablet != null) { + kvTablet.setFlushCompleteListener(this::onKvFlushComplete); + } + // Register RocksDB statistics now that the kv tablet is fully initialized. if (kvTablet != null && kvTablet.getRocksDBStatistics() != null) { bucketMetricGroup.registerRocksDBStatistics(kvTablet.getRocksDBStatistics()); } @@ -1071,6 +1103,30 @@ public LogAppendInfo appendRecordsToFollower(MemoryLogRecords memoryLogRecords) return logTablet.appendAsFollower(memoryLogRecords); } + /** + * Samples the current backpressure pressure for piggyback on a completed write response. + * Invoked from {@code DelayedWrite#onComplete()} once the bucket's required acks are satisfied; + * the value reflects the post-flush L0 state and is the most accurate snapshot the client can + * act on. Also records the value on this bucket's {@link BucketMetricGroup} for table-level + * aggregation. + */ + public float samplePressureForCompletion() { + return inReadLock( + leaderIsrUpdateLock, + () -> { + if (!isLeader()) { + return 0f; + } + KvTablet kv = this.kvTablet; + if (kv == null) { + return 0f; + } + float pressure = kv.currentPressure(); + bucketMetricGroup.recordKvBackpressureLevel(pressure); + return pressure; + }); + } + public LogAppendInfo putRecordsToLeader( KvRecordBatch kvRecords, @Nullable int[] targetColumns, @@ -1187,11 +1243,15 @@ private boolean maybeIncrementLeaderHW(LogTablet leaderLog, long currentTimeMs) } } - // when the watermark can be advanced, we may need to flush kv first if it's kv replica, - // and then update highWatermark. - // TODO The flushKV and updateHighWatermark need to be atomic operation. See - // https://github.com/apache/fluss/issues/513 - mayFlushKv(newHighWatermark.getMessageOffset()); + KvTablet currentKv = this.kvTablet; + if (currentKv != null + && currentKv.getFlushedLogOffset() < newHighWatermark.getMessageOffset()) { + // The KV view must be flushed before the log high watermark becomes visible. The flush + // itself runs on the shared KV flush scheduler so this RPC worker does not execute + // RocksDB writes. + mayFlushKv(newHighWatermark.getMessageOffset()); + return false; + } Optional oldWatermark = leaderLog.maybeIncrementHighWatermark(newHighWatermark); @@ -1493,6 +1553,16 @@ public LogRecords limitLogScan(int limit) { */ public Tuple2 checkEnoughReplicasReachOffset(long requiredOffset) { if (isLeader()) { + // If the predictive flush gate has rejected the most recent flush, fail fast with a + // backpressure error so the client can throttle before the storage engine stalls. + // Pending data stays in the pre-write buffer; the client retries after backoff. + KvTablet kv = this.kvTablet; + if (kv != null && kv.isFlushBackpressured()) { + bucketMetricGroup.recordKvBackpressureLevel(1f); + bucketMetricGroup.getTableMetricGroup().incKvBackpressureRejectedRequests(); + return Tuple2.of(false, Errors.STORAGE_BACKPRESSURE_EXCEPTION); + } + // Keep the current immutable replica list reference. List curMaximalIsr = isrState.maximalIsr(); if (LOG.isTraceEnabled()) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 280e34124d..865bc92c9e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -158,6 +158,7 @@ public class ReplicaManager implements ServerReconfigurable { private static final Logger LOG = LoggerFactory.getLogger(ReplicaManager.class); public static final String HIGH_WATERMARK_CHECKPOINT_FILE_NAME = "high-watermark-checkpoint"; + private final Configuration conf; private final Scheduler scheduler; private final LogManager logManager; @@ -840,9 +841,11 @@ public void lookups( // the original key bytes are wrapped in KeyRecordBatch, then during putRecordsToKv // they are decoded to rows and immediately re-encoded back to key bytes, causing // redundant encode/decode overhead. + // Use acks=-1 so the callback fires only after async flush completes and + // HW advances, ensuring inserted data is visible in RocksDB for re-lookup. putRecordsToKv( timeoutMs, - requiredAcks, + -1, produceEntryData, schema.getPrimaryKeyIndexes(), MergeMode.DEFAULT, @@ -1355,6 +1358,9 @@ private Map putToLocalKv( tb, appendInfo.firstOffset(), appendInfo.lastOffset()); + // The pressure field is left at its default (0f) here and refreshed once, + // right before the response goes out, by either maybeAddDelayedWrite (acks != -1) + // or DelayedWrite#onComplete (acks == -1) so the client sees the freshest L0 state. putResultForBucketMap.put( tb, new PutKvResultForBucket(tb, appendInfo.lastOffset() + 1)); @@ -1696,6 +1702,21 @@ private void maybeAddDelayedWrite( .map(DelayedTableBucketKey::new) .collect(Collectors.toList())); } else { + // Immediate-response path (acks != -1): refresh KV pressure right before the + // response goes out, mirroring DelayedWrite#onComplete on the acks == -1 path. + writeResults.forEach( + (tb, r) -> { + if (r instanceof PutKvResultForBucket && !r.failed()) { + try { + ((PutKvResultForBucket) r) + .setPressure( + getReplicaOrException(tb) + .samplePressureForCompletion()); + } catch (Exception ignore) { + // leader moved or replica gone, leave default 0f + } + } + }); responseCallback.accept(new ArrayList<>(writeResults.values())); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedWrite.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedWrite.java index 66d2649e78..35a8a251a8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedWrite.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedWrite.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.replica.delay; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.rpc.entity.PutKvResultForBucket; import org.apache.fluss.rpc.entity.WriteResultForBucket; import org.apache.fluss.rpc.messages.ProduceLogRequest; import org.apache.fluss.rpc.messages.PutKvRequest; @@ -166,21 +167,50 @@ public void onExpiration() { @Override public void onComplete() { List result = - delayedWriteMetadata.getBucketStatusMap().values().stream() - // overwrite the write result with the delayed error if there is one. + delayedWriteMetadata.getBucketStatusMap().entrySet().stream() + // overwrite the write result with the delayed error if there is one, + // and refresh the piggyback pressure sample for successful PutKv buckets. .map( - s -> { + entry -> { + TableBucket tableBucket = entry.getKey(); + DelayedBucketStatus s = entry.getValue(); Errors error = s.getDelayedError(); - if (error != null && error != Errors.NONE) { - return s.getWriteResultForBucket().copy(error); - } else { - return s.getWriteResultForBucket(); - } + T writeResult = + (error != null && error != Errors.NONE) + ? s.getWriteResultForBucket().copy(error) + : s.getWriteResultForBucket(); + refreshPressureIfPutKv(tableBucket, error, writeResult); + return writeResult; }) .collect(Collectors.toList()); callback.accept(result); } + /** + * For successful PutKv responses, re-sample the per-bucket pressure right before the response + * is delivered so the client sees the most up-to-date post-flush L0 state. Failures and + * non-PutKv response types are left untouched. + */ + private void refreshPressureIfPutKv(TableBucket tableBucket, Errors error, T writeResult) { + if (!(writeResult instanceof PutKvResultForBucket)) { + return; + } + if (error != null && error != Errors.NONE) { + return; + } + if (writeResult.failed()) { + return; + } + try { + Replica replica = replicaManager.getReplicaOrException(tableBucket); + ((PutKvResultForBucket) writeResult).setPressure(replica.samplePressureForCompletion()); + } catch (Exception e) { + // Replica may have moved or been deleted between the write and the completion. The + // response still succeeds with pressure left at its initial value (0). + LOG.debug("Skip pressure refresh for {} on completion: {}", tableBucket, e.toString()); + } + } + /** DelayedWriteMetadata. */ public static final class DelayedWriteMetadata { private final int requiredAcks; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f7a89828ab..ec39f97653 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -1155,6 +1155,12 @@ public static PutKvResponse makePutKvResponse(Collection k if (logEndOffset >= 0) { putKvBucket.setLogEndOffset(logEndOffset); } + // Backpressure signal: only piggyback when there is actual pressure + // (i.e. L0 has crossed the Fluss-side proactive threshold). + float pressure = bucketResult.getPressure(); + if (pressure > 0f) { + putKvBucket.setPressure(pressure); + } } putKvRespForBucketList.add(putKvBucket); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 3425fa75cf..00f05f166f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -138,7 +138,13 @@ void beforeEach() { } @AfterEach - void afterEach() { + void afterEach() throws Exception { + if (kvTablet != null) { + kvTablet.close(); + } + if (logTablet != null) { + logTablet.close(); + } if (executor != null) { executor.shutdown(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java index cb07c2d66b..3d0777d350 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.kv.prewrite; import org.apache.fluss.server.kv.KvBatchWriter; +import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.PreparedFlush; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; import org.apache.fluss.server.metrics.group.TestingMetricGroups; @@ -253,6 +254,111 @@ private static void bufferDelete( kvPreWriteBuffer.delete(toKey(key), elementCount); } + @Test + void testPrepareAndCompleteFlush() { + KvPreWriteBuffer buffer = + new KvPreWriteBuffer( + new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); + + bufferInsert(buffer, "key1", "value1", 1); + bufferInsert(buffer, "key2", "value2", 2); + bufferInsert(buffer, "key3", "value3", 3); + + PreparedFlush preparedFlush = buffer.prepareFlush(3); + + assertThat(preparedFlush.entries()).hasSize(2); + assertThat(preparedFlush.rowCountDiff()).isEqualTo(2); + assertThat(buffer.activeFlushBytes()).isEqualTo(10); + assertThat(buffer.preparedFlushBytes()).isEqualTo(20); + assertThat(buffer.pendingFlushBytes()).isEqualTo(30); + assertThat(getValue(buffer, "key1")).isEqualTo("value1"); + assertThat(getValue(buffer, "key2")).isEqualTo("value2"); + + assertThat(buffer.completeFlush(preparedFlush)).isEqualTo(2); + + assertThat(buffer.activeFlushBytes()).isEqualTo(10); + assertThat(buffer.preparedFlushBytes()).isEqualTo(0); + assertThat(buffer.pendingFlushBytes()).isEqualTo(10); + assertThat(buffer.getAllKvEntries()).hasSize(1); + assertThat(getValue(buffer, "key1")).isNull(); + assertThat(getValue(buffer, "key2")).isNull(); + assertThat(getValue(buffer, "key3")).isEqualTo("value3"); + } + + @Test + void testAbortPreparedFlush() { + KvPreWriteBuffer buffer = + new KvPreWriteBuffer( + new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); + + bufferInsert(buffer, "key1", "value1", 1); + bufferDelete(buffer, "key2", 2); + + PreparedFlush preparedFlush = buffer.prepareFlush(3); + assertThat(buffer.activeFlushBytes()).isEqualTo(0); + assertThat(buffer.preparedFlushBytes()).isEqualTo(14); + + buffer.abortFlush(preparedFlush); + + assertThat(buffer.activeFlushBytes()).isEqualTo(14); + assertThat(buffer.preparedFlushBytes()).isEqualTo(0); + assertThat(buffer.pendingFlushBytes()).isEqualTo(14); + assertThat(getValue(buffer, "key1")).isEqualTo("value1"); + assertThat(getValue(buffer, "key2")).isNull(); + + PreparedFlush nextPreparedFlush = buffer.prepareFlush(3); + assertThat(nextPreparedFlush.entries()).hasSize(2); + } + + @Test + void testCannotTruncatePreparedFlush() { + KvPreWriteBuffer buffer = + new KvPreWriteBuffer( + new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); + + bufferInsert(buffer, "key1", "value1", 1); + bufferInsert(buffer, "key2", "value2", 2); + buffer.prepareFlush(3); + + assertThatThrownBy(() -> buffer.truncateTo(1, TruncateReason.ERROR)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot truncate prepared pre-write entry."); + } + + @Test + void testPendingFlushBytesTracking() throws Exception { + KvPreWriteBuffer buffer = + new KvPreWriteBuffer( + new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); + + // Initially zero + assertThat(buffer.pendingFlushBytes()).isEqualTo(0); + + // Insert key1=value1 (4 + 6 = 10 bytes) + bufferInsert(buffer, "key1", "value1", 1); + assertThat(buffer.pendingFlushBytes()).isEqualTo(10); + + // Insert key2=value22 (4 + 7 = 11 bytes) + bufferInsert(buffer, "key2", "value22", 2); + assertThat(buffer.pendingFlushBytes()).isEqualTo(21); + + // Delete key3: key bytes only (4 + 0 = 4 bytes for delete with null value) + bufferDelete(buffer, "key3", 3); + assertThat(buffer.pendingFlushBytes()).isEqualTo(25); + + // Flush entries with lsn < 2 (only key1): decreases by 10 + buffer.flush(2); + assertThat(buffer.pendingFlushBytes()).isEqualTo(15); + + // TruncateTo(3) removes entries with lsn >= 3 (key3 delete): decreases by 4 + buffer.truncateTo(3, TruncateReason.ERROR); + assertThat(buffer.pendingFlushBytes()).isEqualTo(11); + + // Flush remaining (key2): decreases by 11 + buffer.flush(Long.MAX_VALUE); + assertThat(buffer.pendingFlushBytes()).isEqualTo(0); + } + private static String getValue(KvPreWriteBuffer preWriteBuffer, String keyStr) { KvPreWriteBuffer.Key key = toKey(keyStr); KvPreWriteBuffer.Value value = preWriteBuffer.get(key); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvTest.java index 80d27f8bd1..1202ccebea 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvTest.java @@ -21,6 +21,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.FlushOptions; import java.io.File; import java.nio.file.Path; @@ -64,4 +66,190 @@ void testRocksDbKv(@TempDir Path tempDir) throws Exception { assertThat(rocksDBKv.multiGet(Arrays.asList(key, key2))).containsExactly(null, val2); } } + + // ------------------------------------------------------------------ + // Backpressure tests + // ------------------------------------------------------------------ + + /** + * Verifies that the L0 property is actually readable on frocksdbjni 6.20.3-ververica-2.0. + * + *

This is a regression test: {@code getLongProperty("rocksdb.num-files-at-level0")} throws + * {@code RocksDBException: NotFound} because the property is parametric (string-type), not an + * int-type property. The fix uses {@code getProperty} + {@code Long.parseLong} instead. + */ + @Test + void testCurrentPressure_l0PropertyReadable(@TempDir Path tempDir) throws Exception { + File instanceBasePath = tempDir.toFile(); + RocksDBResourceContainer container = + new RocksDBResourceContainer(new Configuration(), instanceBasePath); + ColumnFamilyOptions cfOpts = container.getColumnOptions(); + + RocksDBKvBuilder builder = + new RocksDBKvBuilder(instanceBasePath, container, cfOpts) + .setFlussL0SlowdownTrigger(2); + + try (RocksDBKv kv = builder.build()) { + // Initially 0 L0 files, should return 0 pressure. + assertThat(kv.currentPressure()).isEqualTo(0f); + + // Write + flush to produce exactly 1 L0 SST. + kv.put(new byte[] {1}, new byte[] {1}); + try (FlushOptions flushOpts = new FlushOptions()) { + flushOpts.setWaitForFlush(true); + kv.db.flush(flushOpts); + } + + // L0 = 1, still below flussL0SlowdownTrigger (2), should still be 0. + assertThat(kv.currentPressure()).isEqualTo(0f); + } + } + + /** + * Verifies the full backpressure logic across the proactive throttle signal and the predictive + * flush gate. Configuration: flussTrigger=2, rocksdbSlowdownTrigger=5. + */ + @Test + void testPressureCurveAndFlushGate(@TempDir Path tempDir) throws Exception { + File instanceBasePath = tempDir.toFile(); + RocksDBResourceContainer container = + new RocksDBResourceContainer(new Configuration(), instanceBasePath); + ColumnFamilyOptions cfOpts = container.getColumnOptions(); + // slowdownTrigger = 6; with default maxWriteBufferNumber = 2 the gate closes when + // L0 + 2 >= 6, i.e. L0 >= 4. High compaction/stop triggers prevent RocksDB from + // interfering during the test. + cfOpts.setLevel0SlowdownWritesTrigger(6); + cfOpts.setLevel0FileNumCompactionTrigger(100); + cfOpts.setLevel0StopWritesTrigger(100); + + RocksDBKvBuilder builder = + new RocksDBKvBuilder(instanceBasePath, container, cfOpts) + .setFlussL0SlowdownTrigger(2); + + try (RocksDBKv kv = builder.build()) { + // --- L0 = 0: no pressure, flush gate open --- + assertThat(kv.currentPressure()).isEqualTo(0f); + assertThat(kv.wouldExceedSlowdownTriggerOnFlush()).isFalse(); + + // Flush to L0 = 2 (reaches flussTrigger). p = (2-2)/(6-2) = 0. + flushNTimes(kv, 2); + assertThat(kv.currentPressure()).isEqualTo(0f); + assertThat(kv.wouldExceedSlowdownTriggerOnFlush()).isFalse(); + + // Flush to L0 = 3: proactive throttle signal kicks in. p = (3-2)/(6-2) = 1/4. + // Gate still open: 3 + 2 = 5 < 6. + flushNTimes(kv, 1); + float p = kv.currentPressure(); + assertThat(p).isCloseTo(1f / 4f, org.assertj.core.data.Offset.offset(0.01f)); + assertThat(kv.wouldExceedSlowdownTriggerOnFlush()).isFalse(); + + // Flush to L0 = 4: gate closes (4 + 2 = 6 >= slowdownTrigger). + // p = (4-2)/(6-2) = 2/4 = 0.5. + flushNTimes(kv, 1); + p = kv.currentPressure(); + assertThat(p).isCloseTo(0.5f, org.assertj.core.data.Offset.offset(0.01f)); + assertThat(kv.wouldExceedSlowdownTriggerOnFlush()).isTrue(); + + // Flush to L0 = 5: gate remains closed. p clamped to (window-1)/window = 3/4. + flushNTimes(kv, 1); + p = kv.currentPressure(); + assertThat(p).isLessThan(1f); + assertThat(p).isCloseTo(3f / 4f, org.assertj.core.data.Offset.offset(0.01f)); + assertThat(kv.wouldExceedSlowdownTriggerOnFlush()).isTrue(); + } + } + + /** Writes a unique key and flushes the memtable N times to produce N L0 SST files. */ + private static void flushNTimes(RocksDBKv kv, int n) throws Exception { + for (int i = 0; i < n; i++) { + // Write a unique key to dirty the memtable. + byte[] key = ("flush-key-" + System.nanoTime() + "-" + i).getBytes(); + kv.put(key, key); + try (FlushOptions flushOpts = new FlushOptions()) { + flushOpts.setWaitForFlush(true); + kv.db.flush(flushOpts); + } + } + } + + /** + * Verifies that {@link RocksDBKv#wouldExceedFlushBudget(long)} correctly rejects writes when + * the pending buffer bytes would produce enough L0 SSTs to breach the slowdown trigger. + * + *

Configuration: slowdownTrigger=6, maxWriteBufferNumber=2, writeBufferSize=1024. + * remainingSlots = slowdownTrigger - currentL0 - maxWriteBufferNumber - 1. + * + *

    + *
  • L0=0 → remainingSlots=3 → budget=3072 → 3071 admitted, 3072 rejected. + *
  • L0=2 → remainingSlots=1 → budget=1024 → 1023 admitted, 1024 rejected. + *
  • L0=4 → remainingSlots=0 → always rejected (L0 gate). + *
+ */ + @Test + void testFlushBudgetRejectsLargeBuffer(@TempDir Path tempDir) throws Exception { + File instanceBasePath = tempDir.toFile(); + RocksDBResourceContainer container = + new RocksDBResourceContainer(new Configuration(), instanceBasePath); + ColumnFamilyOptions cfOpts = container.getColumnOptions(); + cfOpts.setLevel0SlowdownWritesTrigger(6); + cfOpts.setWriteBufferSize(1024); + cfOpts.setMaxWriteBufferNumber(2); + cfOpts.setLevel0FileNumCompactionTrigger(100); + cfOpts.setLevel0StopWritesTrigger(100); + + RocksDBKvBuilder builder = + new RocksDBKvBuilder(instanceBasePath, container, cfOpts) + .setFlussL0SlowdownTrigger(2); + + try (RocksDBKv kv = builder.build()) { + // L0=0: remainingSlots = 6 - 0 - 2 - 1 = 3, budget = 3 * 1024 = 3072 + assertThat(kv.wouldExceedFlushBudget(0)).isFalse(); + assertThat(kv.wouldExceedFlushBudget(3071)).isFalse(); + assertThat(kv.wouldExceedFlushBudget(3072)).isTrue(); + assertThat(kv.wouldExceedFlushBudget(8192)).isTrue(); + + // Flush to L0=2: remainingSlots = 6 - 2 - 2 - 1 = 1, budget = 1024 + flushNTimes(kv, 2); + assertThat(kv.wouldExceedFlushBudget(0)).isFalse(); + assertThat(kv.wouldExceedFlushBudget(1023)).isFalse(); + assertThat(kv.wouldExceedFlushBudget(1024)).isTrue(); + + // Flush to L0=4: remainingSlots = 6 - 4 - 2 = 0 → L0 gate always rejects + flushNTimes(kv, 2); + assertThat(kv.wouldExceedFlushBudget(0)).isTrue(); + } + } + + /** + * Regression test for the JVM crash (exit 134 / SIGABRT) observed in CI for {@code + * KvReplicaRestoreITCase}: after {@link RocksDBKv#close()} releases the native handle, late + * callers from {@code DelayedWrite#onComplete} → {@code Replica#samplePressureForCompletion} + * could still reach {@link RocksDBKv#currentPressure()} → {@code db.getProperty(...)} and touch + * the disposed native handle. The fence inside {@code currentL0FileCount} must turn those late + * calls into a benign "no pressure" reading instead of a native crash. + */ + @Test + void testPressureQueriesAfterCloseAreSafe(@TempDir Path tempDir) throws Exception { + File instanceBasePath = tempDir.toFile(); + RocksDBResourceContainer container = + new RocksDBResourceContainer(new Configuration(), instanceBasePath); + ColumnFamilyOptions cfOpts = container.getColumnOptions(); + cfOpts.setLevel0SlowdownWritesTrigger(5); + cfOpts.setLevel0FileNumCompactionTrigger(100); + cfOpts.setLevel0StopWritesTrigger(100); + + RocksDBKvBuilder builder = + new RocksDBKvBuilder(instanceBasePath, container, cfOpts) + .setFlussL0SlowdownTrigger(2); + + RocksDBKv kv = builder.build(); + // Produce a couple of L0 SSTs so a non-fenced query would otherwise observe non-zero L0. + flushNTimes(kv, 2); + + kv.close(); + + // Both pressure paths must degrade gracefully once the native handle is gone. + assertThat(kv.currentPressure()).isEqualTo(0f); + assertThat(kv.wouldExceedSlowdownTriggerOnFlush()).isFalse(); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index ff6b0246ee..69fd90d55c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -99,6 +99,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -157,6 +158,7 @@ import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; import static org.apache.fluss.testutils.DataTestUtils.getKeyValuePairs; import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -638,6 +640,9 @@ void testPutKvWithOutOfBatchSequence() throws Exception { future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 5)); + // Wait for async flush so HW advances before reading CDC log. + waitForFlush(tb); + // 2. get the cdc-log of this batch (data1). List> expectedLogForData1 = Arrays.asList( @@ -726,6 +731,9 @@ void testPutKvWithOutOfBatchSequence() throws Exception { future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8)); + // Wait for async flush so HW advances. + waitForFlush(tb); + // 6. get the cdc-log of this batch (data2). List> expectedLogForData2 = Arrays.asList( @@ -789,6 +797,9 @@ void testPutKvWithDeleteNonExistsKey() throws Exception { future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 18)); + // Wait for async flush so HW advances before reading CDC log. + waitForFlush(tb); + // 2. get the cdc-log of these batches. CompletableFuture> future1 = new CompletableFuture<>(); @@ -851,6 +862,9 @@ void testLookup() throws Exception { future1::complete); assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb, 8)); + // Wait for async flush to complete so data is visible in RocksDB. + waitForFlush(tb); + // second lookup key in table, key = 1, value = 1, "a1". Object[] value1 = DATA_1_WITH_KEY_AND_VALUE.get(3).f1; byte[] value1Bytes = @@ -891,6 +905,10 @@ void testLookupWithInsertIfNotExists() throws Exception { List inserted = lookupWithInsert(tb, Arrays.asList(key100, key200)).lookupValues(); assertThat(inserted).hasSize(2).allMatch(Objects::nonNull); + + // Wait for async flush so data is visible via plain lookup. + waitForFlush(tb); + verifyLookup(tb, key100, inserted.get(0)); verifyLookup(tb, key200, inserted.get(1)); @@ -916,6 +934,10 @@ void testLookupWithInsertIfNotExists() throws Exception { List mixed = lookupWithInsert(tb, Arrays.asList(key100, key300)).lookupValues(); assertThat(mixed.get(0)).isEqualTo(inserted.get(0)); // existing assertThat(mixed.get(1)).isNotNull(); // newly inserted + + // Wait for async flush so key300 is visible via plain lookup. + waitForFlush(tb); + verifyLookup(tb, key300, mixed.get(1)); // Verify that only one new log record was created for key300 @@ -966,6 +988,9 @@ void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception { InternalRow row3 = valueDecoder.decodeValue(mixed.get(1)).row; assertThat(row3.getLong(2)).isEqualTo(3L); // continues sequence + // Wait for async flush so HW advances and log is readable. + waitForFlush(tb); + FetchLogResultForBucket logResult = fetchLog(tb, 0L); assertThat(logResult.getHighWatermark()).isEqualTo(3L); LogRecords records = logResult.records(); @@ -1064,6 +1089,9 @@ void testConcurrentLookupWithInsertIfNotExistsAutoIncrement() throws Exception { // Values should be 1, 2, 3 (in any order due to concurrency) assertThat(autoIncrementValues).containsExactlyInAnyOrder(1L, 2L, 3L); + // Wait for async flush so HW advances. + waitForFlush(tb); + // Verify exactly 3 changelog entries were written (one per unique key) FetchLogResultForBucket logResult = fetchLog(tb, 0L); // Only the first upsert for a given primary key generates changelog records. Subsequent @@ -1109,6 +1137,10 @@ void testLookupWithInsertIfNotExistsMultiBucket() throws Exception { byte[] value0 = inserted.get(tb0).lookupValues().get(0); byte[] value1 = inserted.get(tb1).lookupValues().get(0); + // Wait for async flush so data is visible via plain lookup. + waitForFlush(tb0); + waitForFlush(tb1); + // Verify inserted values via lookup verifyLookup(tb0, key0, value0); verifyLookup(tb1, key1, value1); @@ -1187,6 +1219,10 @@ void testPrefixLookup() throws Exception { PUT_KV_VERSION, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 4)); + + // Wait for async flush so data is visible for prefix lookup. + waitForFlush(tb); + // second prefix lookup in table, prefix key = (1, "a"). Object[] prefixKey1 = new Object[] {1, "a"}; CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(rowType, new int[] {0, 1}); @@ -1269,6 +1305,9 @@ void testLimitScanPrimaryKeyTable() throws Exception { future1::complete); assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb, 8)); + // Wait for async flush so data is visible in RocksDB for limit scan. + waitForFlush(tb); + // second, limit scan from table with limit builder.append(DEFAULT_SCHEMA_ID, compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a1"})); future = new CompletableFuture<>(); @@ -2585,4 +2624,18 @@ void testStopReplicaSweepsOrphanDirsForNoneReplica() throws Exception { // LogManager should no longer hold the log. assertThat(logManager.getLog(tb)).isNotPresent(); } + + /** + * Wait for the async KV flush to complete and high watermark to advance to at least the current + * log end offset. This is needed because the KV flush scheduler runs on background threads. + */ + private void waitForFlush(TableBucket tb) { + Replica replica = replicaManager.getReplicaOrException(tb); + long expectedOffset = replica.getLocalLogEndOffset(); + retry( + Duration.ofSeconds(10), + () -> + assertThat(replica.getLogHighWatermark()) + .isGreaterThanOrEqualTo(expectedOffset)); + } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index 11287fb7d4..e4498d8ae8 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -106,16 +106,37 @@ class FlussSparkTestBase extends QueryTest with SharedSparkSession { (0 until table.getTableInfo.getNumBuckets).foreach(i => ls.subscribeFromBeginning(i)) ls } - val scanRecords = logScanner.poll(Duration.ofSeconds(1)) - scanRecords - .iterator() - .asScala - .map(record => (record.getChangeType.shortString(), record.getRow)) - .toArray + // Poll in a loop to accumulate all available records. With async KV flush, + // the high watermark may advance in stages, so a single poll may not + // return all records that were written in the same client flush() call. + // We keep polling until we have received at least some records and then + // get an empty poll (indicating no more data), or until the deadline. + val result = scala.collection.mutable.ArrayBuffer[(String, InternalRow)]() + val deadline = System.currentTimeMillis() + 10000 + var hasReceivedAny = false + var done = false + while (!done && System.currentTimeMillis() < deadline) { + val scanRecords = logScanner.poll(Duration.ofSeconds(1)) + val records = scanRecords + .iterator() + .asScala + .map(record => (record.getChangeType.shortString(), record.getRow)) + .toArray + if (records.nonEmpty) { + hasReceivedAny = true + result ++= records + } else if (hasReceivedAny) { + // We already received records and now got empty - all caught up. + done = true + } + // else: no records received yet, keep polling (HW may not have advanced) + } + result.toArray } protected def flussConf: Configuration = { val conf = new Configuration conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) + conf } } diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index ea3fec7fd4..4ffd80a244 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -172,6 +172,7 @@ during the Fluss cluster working. | kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0 | Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0. | | kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is `false`. | | kv.rocksdb.shared-rate-limiter-bytes-per-sec | MemorySize | Long.MAX_VALUE | The bytes per second rate limit for RocksDB flush and compaction operations shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to limit the rate. This configuration can be updated dynamically without server restart. See [Updating Configs](operations/updating-configs.md) for more details. | +| kv.backpressure.l0-slowdown-trigger | Integer | 8 | The L0 file count at which Fluss starts emitting proactive backpressure signals to clients. This value should be lower than the underlying storage engine's own L0 slowdown trigger (RocksDB `level0_slowdown_writes_trigger`, default `20`), so that clients begin throttling before the storage engine is forced to throttle itself. The gap between this value and the storage trigger forms the throttle ramp-up window. The default value is `8`. | | kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. | | kv.recover.remote-log.prefetch-num | Integer | 4 | The maximum number of remote log segments that can be downloaded but not yet consumed during KV recovery. A larger value overlaps more remote storage downloads with consumption, at the cost of extra local disk usage. Setting to 1 keeps the historical behavior (one-step prefetch). The default value is 4. | | kv.recover.remote-log.download-threads | Integer | 3 | The number of threads used to download remote log segments during KV recovery. Should be less than or equal to `kv.recover.remote-log.prefetch-num`. The default value is 3. | diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 4fcc836872..b68341c200 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -1092,6 +1092,41 @@ These metrics use Sum aggregation to show the total value across all tables in a +### KV Backpressure + +KV backpressure metrics report the cooperative backpressure signal computed by primary-key tables on top of RocksDB. These metrics are aggregated from all buckets of a table and expose three orthogonal dimensions: peak intensity, affected breadth, and cumulative hard rejections. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ScopeInfixMetricsDescriptionType
tabletservertablekvBackpressureMaxPressureMaximum normalized backpressure value across all buckets of this table, in [0, 1]. A value approaching 1 indicates the hottest bucket is close to the storage engine's hard-rejection trigger.Gauge
kvBackpressureAffectedBucketsNumber of buckets of this table currently under backpressure (pressure greater than 0).Gauge
kvBackpressureRejectionsTotalTotal number of write requests rejected with StorageBackpressureException on this table since process start.Counter
+ ### Flink connector standard metrics When using Flink to read and write, Fluss has implemented some key standard Flink connector metrics