diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index f7b6de9eb8..1cb0750fba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -17,6 +17,7 @@ package org.apache.fluss.server; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.cluster.ConfigEntry; @@ -86,6 +87,7 @@ import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.metadata.MetadataProvider; import org.apache.fluss.server.metadata.PartitionMetadata; +import org.apache.fluss.server.metadata.PartitionNegativeCache; import org.apache.fluss.server.metadata.ServerMetadataCache; import org.apache.fluss.server.metadata.TableMetadata; import org.apache.fluss.server.tablet.TabletService; @@ -143,6 +145,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR protected final MetadataManager metadataManager; protected final @Nullable Authorizer authorizer; protected final DynamicConfigManager dynamicConfigManager; + protected final PartitionNegativeCache partitionNegativeCache; private long tokenLastUpdateTimeMs = 0; private ObtainedSecurityToken securityToken = null; @@ -164,9 +167,15 @@ public RpcServiceBase( this.metadataManager = metadataManager; this.authorizer = authorizer; this.dynamicConfigManager = dynamicConfigManager; + this.partitionNegativeCache = new PartitionNegativeCache(); this.ioExecutor = ioExecutor; } + @VisibleForTesting + public PartitionNegativeCache getPartitionNegativeCache() { + return partitionNegativeCache; + } + @Override public ServerType providerType() { return provider; @@ -618,6 +627,14 @@ protected MetadataResponse processMetadataRequest( metadataProvider.getPhysicalTablePathFromCache(partitionId); if (physicalTablePath.isPresent()) { partitionPaths.add(physicalTablePath.get()); + } else if (partitionNegativeCache.isKnownNonExistent(partitionId)) { + // Fast-path only after the positive metadata cache misses. A stale negative-cache + // entry must not hide a partition that has already been synced into metadata cache. + throw new PartitionNotExistException( + String.format( + "The partition id '%d' does not exist or you don't have" + + " permission to access it.", + partitionId)); } else { partitionIdsNotExistsInCache.add(partitionId); } @@ -634,6 +651,12 @@ protected MetadataResponse processMetadataRequest( if (partitionIdAndPaths.containsKey(partitionId)) { partitionPaths.add(partitionIdAndPaths.get(partitionId)); } else { + // Only cache when the authoritative partition assignment is also gone. A miss + // from the scoped table-path lookup may simply mean that the request omitted + // the owning table or the session is not authorized for it. + if (isPartitionAssignmentMissingFromZk(partitionId)) { + partitionNegativeCache.markNonExistent(partitionId); + } throw new PartitionNotExistException( String.format( "The partition id '%d' does not exist or you don't have permission to access it.", @@ -674,4 +697,15 @@ protected MetadataResponse processMetadataRequest( return buildMetadataResponse( coordinatorServer, aliveTabletServers, tablesMetadata, partitionsMetadata); } + + private boolean isPartitionAssignmentMissingFromZk(long partitionId) { + try { + return !zkClient.getPartitionAssignment(partitionId).isPresent(); + } catch (Exception e) { + throw new FlussRuntimeException( + String.format( + "Failed to get partition assignment for partition %d.", partitionId), + e); + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java new file mode 100644 index 0000000000..c4ddd260e7 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.shaded.guava32.com.google.common.base.Ticker; +import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache; +import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; + +import javax.annotation.concurrent.ThreadSafe; + +import java.time.Duration; + +/** + * A thread-safe negative cache for partition IDs that are known to not exist in ZooKeeper. + * + *
This cache helps reduce ZooKeeper pressure when clients repeatedly request metadata for + * partitions that have been deleted (e.g., during hourly partition rotation). Instead of querying + * ZK every time, we cache the "not exist" result and return it directly. + * + *
The cache uses access-time-based TTL: entries are evicted after a configurable duration of no
+ * access. As long as clients keep asking for the same non-existent partition, the cache entry stays
+ * alive and protects ZK.
+ */
+@ThreadSafe
+public class PartitionNegativeCache {
+
+ /** Default TTL for negative cache entries: 10 minutes of no access. */
+ private static final Duration DEFAULT_TTL = Duration.ofMinutes(10);
+
+ /** Default maximum number of negative cache entries. */
+ private static final long DEFAULT_MAXIMUM_SIZE = 10_000L;
+
+ private final Cache If the entry exists and is not expired, Guava refreshes its access time and this method
+ * returns {@code true}. If the entry is expired or doesn't exist, returns {@code false}.
+ *
+ * @param partitionId the partition ID to check
+ * @return {@code true} if the partition is known to not exist, {@code false} otherwise
+ */
+ public boolean isKnownNonExistent(long partitionId) {
+ return cache.getIfPresent(partitionId) != null;
+ }
+
+ /**
+ * Marks the given partition ID as known to not exist. The entry will remain in the cache until
+ * it hasn't been accessed for the configured TTL duration or the bounded cache evicts it.
+ *
+ * @param partitionId the partition ID to mark as non-existent
+ */
+ public void markNonExistent(long partitionId) {
+ cache.put(partitionId, Boolean.TRUE);
+ }
+
+ /**
+ * Returns the current number of entries in the cache. Includes potentially expired entries that
+ * haven't been cleaned up yet.
+ */
+ @VisibleForTesting
+ public long size() {
+ cache.cleanUp();
+ return cache.size();
+ }
+
+ /** Removes all entries from the cache. */
+ @VisibleForTesting
+ public void clear() {
+ cache.invalidateAll();
+ }
+}
diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java
new file mode 100644
index 0000000000..498db45716
--- /dev/null
+++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metadata;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.exception.PartitionNotExistException;
+import org.apache.fluss.metadata.PartitionSpec;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.gateway.CoordinatorGateway;
+import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Collections;
+
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
+import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createPartition;
+import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
+import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropPartitionRequest;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for {@link PartitionNegativeCache} integration with metadata request processing. */
+class PartitionNegativeCacheITCase {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder().setNumOfTabletServers(1).build();
+
+ private static CoordinatorGateway coordinatorGateway;
+
+ @BeforeAll
+ static void setup() {
+ coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
+ }
+
+ @Test
+ void testNegativeCacheForDeletedPartition() throws Exception {
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();
+
+ // Create a partitioned table with auto-partition disabled.
+ TablePath tablePath = TablePath.of("test_db_neg_cache", "partitioned_table");
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(DATA1_SCHEMA)
+ .distributedBy(1)
+ .partitionedBy("b")
+ .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, false)
+ .build();
+ createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor);
+
+ // Create a partition and get its ID.
+ String partitionName = "p_to_delete";
+ long partitionId =
+ createPartition(
+ FLUSS_CLUSTER_EXTENSION,
+ tablePath,
+ new PartitionSpec(Collections.singletonMap("b", partitionName)),
+ false);
+
+ PartitionNegativeCache negativeCache =
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getPartitionNegativeCache();
+
+ // A stale negative-cache entry must not hide a partition that exists in metadata cache.
+ MetadataRequest existingPartitionRequest = new MetadataRequest();
+ existingPartitionRequest.addPartitionsId(partitionId);
+ existingPartitionRequest
+ .addTablePath()
+ .setDatabaseName(tablePath.getDatabaseName())
+ .setTableName(tablePath.getTableName());
+ negativeCache.markNonExistent(partitionId);
+ retry(
+ Duration.ofMinutes(1),
+ () -> coordinatorGateway.metadata(existingPartitionRequest).get());
+ negativeCache.clear();
+
+ // Drop the partition.
+ coordinatorGateway
+ .dropPartition(
+ newDropPartitionRequest(
+ tablePath,
+ new PartitionSpec(Collections.singletonMap("b", partitionName)),
+ false))
+ .get();
+
+ // Wait until the partition is actually deleted from ZK (metadata cache update).
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ // Request metadata with the deleted partition ID - should throw.
+ MetadataRequest request = new MetadataRequest();
+ request.addPartitionsId(partitionId);
+ request.addTablePath()
+ .setDatabaseName(tablePath.getDatabaseName())
+ .setTableName(tablePath.getTableName());
+ assertThatThrownBy(() -> coordinatorGateway.metadata(request).get())
+ .cause()
+ .isInstanceOf(PartitionNotExistException.class);
+ assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue();
+ });
+
+ // At this point, the partition ID should be in the negative cache.
+ // Verify the negative cache is populated on the coordinator.
+ assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue();
+
+ // Second request should also fail (served from negative cache, no ZK query).
+ MetadataRequest secondRequest = new MetadataRequest();
+ secondRequest.addPartitionsId(partitionId);
+ secondRequest
+ .addTablePath()
+ .setDatabaseName(tablePath.getDatabaseName())
+ .setTableName(tablePath.getTableName());
+ assertThatThrownBy(() -> coordinatorGateway.metadata(secondRequest).get())
+ .cause()
+ .isInstanceOf(PartitionNotExistException.class);
+
+ // Negative cache should still have the entry.
+ assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue();
+ }
+}
diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java
new file mode 100644
index 0000000000..7aeec015ce
--- /dev/null
+++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metadata;
+
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PartitionNegativeCache}. */
+class PartitionNegativeCacheTest {
+
+ private static final Duration TTL = Duration.ofMinutes(10);
+ private static final long TTL_MS = TTL.toMillis();
+ private static final long MAXIMUM_SIZE = 100L;
+
+ @Test
+ void testMarkAndQueryNonExistent() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ // Initially, partition is not known as non-existent
+ assertThat(cache.isKnownNonExistent(100L)).isFalse();
+
+ // Mark it as non-existent
+ cache.markNonExistent(100L);
+
+ // Now it should be known as non-existent
+ assertThat(cache.isKnownNonExistent(100L)).isTrue();
+ assertThat(cache.size()).isEqualTo(1);
+ }
+
+ @Test
+ void testUnknownPartitionReturnsFalse() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ // Never marked partition should return false
+ assertThat(cache.isKnownNonExistent(999L)).isFalse();
+ assertThat(cache.isKnownNonExistent(0L)).isFalse();
+ assertThat(cache.isKnownNonExistent(-1L)).isFalse();
+ }
+
+ @Test
+ void testMultiplePartitions() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ cache.markNonExistent(1L);
+ cache.markNonExistent(2L);
+ cache.markNonExistent(3L);
+
+ assertThat(cache.isKnownNonExistent(1L)).isTrue();
+ assertThat(cache.isKnownNonExistent(2L)).isTrue();
+ assertThat(cache.isKnownNonExistent(3L)).isTrue();
+ assertThat(cache.isKnownNonExistent(4L)).isFalse();
+ assertThat(cache.size()).isEqualTo(3);
+ }
+
+ @Test
+ void testTtlExpiration() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ cache.markNonExistent(100L);
+ assertThat(cache.isKnownNonExistent(100L)).isTrue();
+
+ // Advance time past TTL from last access time (which is 0)
+ clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isFalse();
+
+ // Entry should be removed from cache
+ assertThat(cache.size()).isEqualTo(0);
+ }
+
+ @Test
+ void testExpiresAtTtlBoundary() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ cache.markNonExistent(100L);
+
+ clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isTrue();
+
+ // Guava expireAfterAccess expires the entry once the TTL boundary is reached.
+ cache.markNonExistent(100L);
+ clock.advanceTime(TTL_MS, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isFalse();
+ }
+
+ @Test
+ void testAccessTimeRefreshKeepsEntryAlive() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ cache.markNonExistent(100L);
+
+ // Access at t = TTL - 1 (within TTL window), refreshes access time to TTL - 1
+ clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isTrue();
+
+ // Now TTL is measured from last access (TTL_MS - 1).
+ // At t = 2*TTL - 2, elapsed = TTL - 1, NOT > TTL, still valid.
+ clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isTrue(); // refreshes to 2*TTL - 2
+
+ // Access again at t = 3*TTL - 3 to prove indefinite survival
+ clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isTrue(); // refreshes to 3*TTL - 3
+
+ // Now stop accessing. Advance past TTL from last access (3*TTL - 3).
+ clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isFalse();
+ }
+
+ @Test
+ void testClear() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ cache.markNonExistent(1L);
+ cache.markNonExistent(2L);
+ cache.markNonExistent(3L);
+ assertThat(cache.size()).isEqualTo(3);
+
+ cache.clear();
+ assertThat(cache.size()).isEqualTo(0);
+ assertThat(cache.isKnownNonExistent(1L)).isFalse();
+ assertThat(cache.isKnownNonExistent(2L)).isFalse();
+ assertThat(cache.isKnownNonExistent(3L)).isFalse();
+ }
+
+ @Test
+ void testRemarkAfterExpiration() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ cache.markNonExistent(100L);
+ assertThat(cache.isKnownNonExistent(100L)).isTrue();
+
+ // Expire the entry
+ clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isFalse();
+
+ // Re-mark at new time should work
+ cache.markNonExistent(100L);
+ assertThat(cache.isKnownNonExistent(100L)).isTrue();
+
+ // The new entry expires TTL after re-mark time (TTL_MS + 1)
+ clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(100L)).isFalse();
+ }
+
+ @Test
+ void testIndependentExpiration() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock);
+
+ // Mark partition 1 at t=0
+ cache.markNonExistent(1L);
+
+ // Mark partition 2 at t=5min
+ clock.advanceTime(TTL_MS / 2, TimeUnit.MILLISECONDS);
+ cache.markNonExistent(2L);
+
+ // At t=TTL+1: partition 1 should expire, partition 2 should still be alive
+ clock.advanceTime(TTL_MS / 2 + 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(1L)).isFalse();
+ assertThat(cache.isKnownNonExistent(2L)).isTrue(); // refreshes access time to TTL_MS+1
+
+ // Partition 2's access time was refreshed to TTL_MS+1 above.
+ // To expire it, advance past TTL from that refreshed time.
+ clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS);
+ assertThat(cache.isKnownNonExistent(2L)).isFalse();
+ }
+
+ @Test
+ void testMaximumSizeBoundsCache() {
+ ManualClock clock = new ManualClock();
+ PartitionNegativeCache cache = new PartitionNegativeCache(TTL, 2L, clock);
+
+ cache.markNonExistent(1L);
+ cache.markNonExistent(2L);
+ cache.markNonExistent(3L);
+
+ assertThat(cache.size()).isLessThanOrEqualTo(2);
+ }
+}