Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -122,6 +123,18 @@ public final class RecordAccumulator {
private final Clock clock;
private final DynamicWriteBatchSizeEstimator batchSizeEstimator;

// Per-bucket backpressure throttle expiry timestamp. Accessed strictly by key on
// hot paths (get / put / remove); writes happen on every backpressure signal and
// every eviction, so the container is sized for lock-striped O(1) updates without
// any whole-map snapshot cost.
private final ConcurrentMap<TableBucket, Long> throttleExpiryMs = new ConcurrentHashMap<>();
private final long maxThrottleMs;

// Latest Cluster snapshot fed to the metadata-driven throttle sweep. Identity
// equality against this reference short-circuits the sweep when metadata hasn't
// changed.
private volatile Cluster lastClusterRef = Cluster.empty();

// TODO add retryBackoffMs to retry the produce request upon receiving an error.
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
// TODO add nextBatchExpiryTimeMs
Expand Down Expand Up @@ -155,6 +168,7 @@ public final class RecordAccumulator {
(int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes());
this.idempotenceManager = idempotenceManager;
this.clock = clock;
this.maxThrottleMs = conf.get(ConfigOptions.CLIENT_WRITER_KV_BACKPRESSURE_MAX_THROTTLE_MS);
registerMetrics(writerMetricGroup);
}

Expand Down Expand Up @@ -520,6 +534,22 @@ private long bucketReady(
} else {
TableBucket tableBucket =
cluster.getTableBucket(tableIdOpt.get(), physicalTablePath, bucketId);

// If this bucket is throttled, don't mark its node as ready.
// Instead, factor the remaining throttle time into the next check delay.
Long throttleExpiry = throttleExpiryMs.get(tableBucket);
if (throttleExpiry != null) {
long now = clock.milliseconds();
if (now < throttleExpiry) {
nextReadyCheckDelayMs =
Math.min(nextReadyCheckDelayMs, throttleExpiry - now);
continue;
}
// Expired — evict here to reclaim entries for buckets whose deque
// has gone empty and won't reach the drain-time throttle check.
throttleExpiryMs.remove(tableBucket);
}

Integer leader = cluster.leaderFor(tableBucket);
if (leader == null) {
// This is a bucket for which leader is not known, but messages are
Expand Down Expand Up @@ -762,7 +792,7 @@ private List<ReadyWriteBatch> drainBatchesForOneNode(Cluster cluster, Integer no
// request size due to compression; in this case we will still
// eventually send this batch in a single request.
break;
} else if (shouldStopDrainBatchesForBucket(first, tableBucket)) {
} else if (shouldSkipBucket(first, tableBucket)) {
// Buckets are independent — skip this one, keep draining others.
continue;
}
Expand All @@ -777,7 +807,7 @@ private List<ReadyWriteBatch> drainBatchesForOneNode(Cluster cluster, Integer no
// we update it and reset the batch sequence. This should be only done when
// all
// its in-flight batches have completed. This is guarantee in
// `shouldStopDrainBatchesForBucket`.
// `shouldSkipBucket`.
idempotenceManager.maybeUpdateWriterId(tableBucket);

// If the batch already has an assigned batch sequence, then we should not
Expand Down Expand Up @@ -845,7 +875,11 @@ private List<ReadyWriteBatch> drainBatchesForOneNode(Cluster cluster, Integer no
return ready;
}

private boolean shouldStopDrainBatchesForBucket(WriteBatch first, TableBucket tableBucket) {
private boolean shouldSkipBucket(WriteBatch first, TableBucket tableBucket) {
// Backpressure throttle check: skip this bucket if still under throttle
if (isThrottled(tableBucket)) {
return true;
}
if (idempotenceManager.idempotenceEnabled()) {
if (!idempotenceManager.isWriterIdValid()) {
// we cannot send the batch until we have refreshed writer id.
Expand Down Expand Up @@ -883,6 +917,81 @@ private boolean shouldStopDrainBatchesForBucket(WriteBatch first, TableBucket ta
return false;
}

// ---- Backpressure throttle methods ----

/**
* Check if a bucket is currently under backpressure throttle.
*
* <p>Performs lazy eviction: if the throttle has expired, the entry is removed from the map to
* prevent unbounded growth.
*
* @return true if the bucket should be skipped during drain
*/
boolean isThrottled(TableBucket tableBucket) {
Long expiry = throttleExpiryMs.get(tableBucket);
if (expiry == null) {
return false;
}
if (clock.milliseconds() < expiry) {
return true;
}
// Expired — evict to prevent map leak
throttleExpiryMs.remove(tableBucket);
return false;
}

/**
* Update the throttle state for a bucket based on the received pressure signal.
*
* <p>The delay grows quadratically with pressure: {@code delay = maxThrottleMs * p^2}, where
* {@code p ∈ [0, 1)}. This provides meaningful throttling across the full ramp-up window while
* remaining gentle at low pressure.
*
* @param tableBucket the bucket to update
* @param pressure value in {@code [0, 1)}; {@code 0} means recovered, positive values trigger a
* throttle window
*/
void updateThrottle(TableBucket tableBucket, float pressure) {
if (pressure > 0f) {
long delay = (long) (maxThrottleMs * pressure * pressure);
if (delay > 0) {
throttleExpiryMs.put(tableBucket, clock.milliseconds() + delay);
return;
}
}
// Recovered or below the meaningful resolution: remove throttle
throttleExpiryMs.remove(tableBucket);
}

/**
* Apply a hard back-off after the server rejected the write with a {@code
* StorageBackpressureException}. This signals that the storage engine has reached its own
* slowdown threshold; we stall the bucket for the full {@link #maxThrottleMs} window so the
* standard retry path can take effect.
*/
void applyStorageBackpressureBackoff(TableBucket tableBucket) {
throttleExpiryMs.put(tableBucket, clock.milliseconds() + maxThrottleMs);
}

/**
* Evict throttle entries whose buckets no longer exist in the given cluster (leader unknown,
* partition dropped, table dropped).
*
* <p>Invoked on every Sender loop with the current cluster snapshot. The identity short-circuit
* makes this an O(1) no-op when metadata hasn't changed, so the actual O(N) walk only runs once
* per real metadata refresh.
*/
void maybeEvictStaleThrottles(Cluster cluster) {
if (cluster == lastClusterRef) {
return;
}
lastClusterRef = cluster;
if (throttleExpiryMs.isEmpty()) {
return;
}
throttleExpiryMs.keySet().removeIf(tb -> cluster.leaderFor(tb) == null);
}

private int getDrainIndex(int id) {
return nodesDrainIndex.computeIfAbsent(id, s -> 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fluss.exception.OutOfOrderSequenceException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RetriableException;
import org.apache.fluss.exception.StorageBackpressureException;
import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
Expand Down Expand Up @@ -209,6 +210,10 @@ private void addToInflightBatches(Map<Integer, List<ReadyWriteBatch>> batches) {
private void sendWriteData() throws Exception {
Cluster clusterSnapshot = metadataUpdater.getCluster();

// Refresh per-bucket throttle entries against the current cluster snapshot,
// dropping any whose bucket has disappeared from metadata.
accumulator.maybeEvictStaleThrottles(clusterSnapshot);

// get the list of buckets with data ready to send.
ReadyCheckResult readyCheckResult = accumulator.ready(clusterSnapshot);

Expand Down Expand Up @@ -498,7 +503,16 @@ private void handlePutKvResponse(
tableId,
respForBucket.hasPartitionId() ? respForBucket.getPartitionId() : null,
respForBucket.getBucketId());

// Update backpressure throttle from pressure signal
if (respForBucket.hasPressure()) {
accumulator.updateThrottle(tb, respForBucket.getPressure());
}

ReadyWriteBatch writeBatch = recordsByBucket.get(tb);
if (writeBatch == null) {
continue;
}
if (respForBucket.hasErrorCode()) {
Set<PhysicalTablePath> invalidMetadataTables =
handleWriteBatchException(
Expand Down Expand Up @@ -534,6 +548,12 @@ private Set<PhysicalTablePath> handleWriteBatchException(
ReadyWriteBatch readyWriteBatch, ApiError error) {
Set<PhysicalTablePath> invalidMetadataTables = new HashSet<>();
WriteBatch writeBatch = readyWriteBatch.writeBatch();
if (error.exception() instanceof StorageBackpressureException) {
// Hard rejection: the storage engine reached its slowdown trigger and rejected the
// write. Apply a hard back-off equal to the configured max throttle window before
// letting the standard retry path re-enqueue the batch.
accumulator.applyStorageBackpressureBackoff(readyWriteBatch.tableBucket());
}
if (error.error() == Errors.DUPLICATE_SEQUENCE_EXCEPTION) {
// If we have received a duplicate batch sequence error, it means that the batch
// sequence has advanced beyond the sequence of the current batch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ private RecordAccumulator createTestRecordAccumulator(
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new MemorySize(totalSize));
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(pageSize));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(batchSize));
// Use a small max throttle to keep test latencies tight; quadratic curve applies.
conf.set(ConfigOptions.CLIENT_WRITER_KV_BACKPRESSURE_MAX_THROTTLE_MS, 1000L);
return new RecordAccumulator(
conf,
new IdempotenceManager(
Expand Down Expand Up @@ -701,4 +703,96 @@ private int getBatchNumInAccum(RecordAccumulator accum) {
+ (bucketBatches2 == null ? 0 : bucketBatches2.size())
+ (bucketBatches3 == null ? 0 : bucketBatches3.size());
}

// ---- Backpressure throttle tests ----

@Test
void testIsThrottledReturnsFalseWhenNoThrottle() {
RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE);
assertThat(accum.isThrottled(tb1)).isFalse();
}

@Test
void testUpdateThrottleWithPositivePressure() {
RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE);
// pressure = 0.5 -> delay = 1000 * 0.5^2 = 250ms
accum.updateThrottle(tb1, 0.5f);
assertThat(accum.isThrottled(tb1)).isTrue();

// Advance time past the throttle
clock.advanceTime(Duration.ofMillis(251));
assertThat(accum.isThrottled(tb1)).isFalse();
}

@Test
void testUpdateThrottleWithHighPressure() {
RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE);
// pressure = 0.9 -> delay = 1000 * 0.9^2 = 810ms
accum.updateThrottle(tb1, 0.9f);
assertThat(accum.isThrottled(tb1)).isTrue();

clock.advanceTime(Duration.ofMillis(500));
assertThat(accum.isThrottled(tb1)).isTrue();

clock.advanceTime(Duration.ofMillis(311));
assertThat(accum.isThrottled(tb1)).isFalse();
}

@Test
void testApplyStorageBackpressureBackoff() {
RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE);
// StorageBackpressureException back-off: stalls the bucket for the full max throttle
// window.
accum.applyStorageBackpressureBackoff(tb1);
assertThat(accum.isThrottled(tb1)).isTrue();

// Still throttled within the configured max throttle window.
clock.advanceTime(Duration.ofMillis(999));
assertThat(accum.isThrottled(tb1)).isTrue();

// Released exactly when the window elapses.
clock.advanceTime(Duration.ofMillis(2));
assertThat(accum.isThrottled(tb1)).isFalse();
}

@Test
void testUpdateThrottleRecovery() {
RecordAccumulator accum = createTestRecordAccumulator(1024, Integer.MAX_VALUE);
// First apply pressure
accum.updateThrottle(tb1, 0.5f);
assertThat(accum.isThrottled(tb1)).isTrue();

// Then recover with pressure=0
accum.updateThrottle(tb1, 0f);
assertThat(accum.isThrottled(tb1)).isFalse();
}

@Test
void testThrottledBucketSkippedInDrain() throws Exception {
IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
long batchSize = getTestBatchSize(row);
RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, Integer.MAX_VALUE);
cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3));

// Append records to tb1 and tb2
accum.append(createRecord(row), writeCallback, cluster, 0, false);
accum.append(createRecord(row), writeCallback, cluster, 1, false);

// Throttle tb1
accum.updateThrottle(tb1, 0.5f);

// Drain should only produce batches for tb2 (tb1 is throttled)
Map<Integer, List<ReadyWriteBatch>> batches =
accum.drain(
cluster,
new HashSet<>(Collections.singletonList(node1.id())),
Integer.MAX_VALUE);
// tb2 should be in the batch, tb1 should not
List<ReadyWriteBatch> node1Batches = batches.get(node1.id());
if (node1Batches != null) {
for (ReadyWriteBatch b : node1Batches) {
assertThat(b.tableBucket()).isNotEqualTo(tb1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,19 @@ public class ConfigOptions {
+ " Dynamic partition strategy refers to creating partitions based on the data "
+ "being written for partitioned table if the wrote partition don't exists.");

public static final ConfigOption<Long> CLIENT_WRITER_KV_BACKPRESSURE_MAX_THROTTLE_MS =
key("client.writer.kv-backpressure.max-throttle-ms")
.longType()
.defaultValue(3000L)
.withDescription(
"The upper bound of the per-bucket throttle delay the client applies in response to "
+ "KV backpressure signals piggybacked by the server. "
+ "Throttle delay is computed as: max_throttle_ms * p^2, where p is the "
+ "normalized pressure in [0, 1) reported by the server. "
+ "Once the server rejects a write with StorageBackpressureException, "
+ "the client also uses this value as the initial retry backoff so that the "
+ "transition between throttle and rejection is continuous.");

public static final ConfigOption<Duration> CLIENT_REQUEST_TIMEOUT =
key("client.request-timeout")
.durationType()
Expand Down Expand Up @@ -2063,6 +2076,21 @@ public class ConfigOptions {
+ "KV recovery. Should be less than or equal to "
+ "'kv.recover.remote-log.prefetch-num'.");

// ------------------------------------------------------------------------
// ConfigOptions for KV backpressure
// ------------------------------------------------------------------------

public static final ConfigOption<Integer> KV_BACKPRESSURE_L0_SLOWDOWN_TRIGGER =
key("kv.backpressure.l0-slowdown-trigger")
.intType()
.defaultValue(8)
.withDescription(
"The L0 file count at which Fluss starts emitting proactive backpressure to clients. "
+ "This value should be lower than the underlying storage engine's own "
+ "L0 slowdown trigger (RocksDB level0_slowdown_writes_trigger, default 20), "
+ "so that clients begin throttling before the storage engine is forced to throttle itself. "
+ "The gap between this value and the storage trigger forms the throttle ramp-up window.");

// ------------------------------------------------------------------------
// ConfigOptions for metrics
// ------------------------------------------------------------------------
Expand Down
Loading
Loading