diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index f7b6de9eb8..1cb0750fba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -17,6 +17,7 @@ package org.apache.fluss.server; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.cluster.ConfigEntry; @@ -86,6 +87,7 @@ import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.metadata.MetadataProvider; import org.apache.fluss.server.metadata.PartitionMetadata; +import org.apache.fluss.server.metadata.PartitionNegativeCache; import org.apache.fluss.server.metadata.ServerMetadataCache; import org.apache.fluss.server.metadata.TableMetadata; import org.apache.fluss.server.tablet.TabletService; @@ -143,6 +145,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR protected final MetadataManager metadataManager; protected final @Nullable Authorizer authorizer; protected final DynamicConfigManager dynamicConfigManager; + protected final PartitionNegativeCache partitionNegativeCache; private long tokenLastUpdateTimeMs = 0; private ObtainedSecurityToken securityToken = null; @@ -164,9 +167,15 @@ public RpcServiceBase( this.metadataManager = metadataManager; this.authorizer = authorizer; this.dynamicConfigManager = dynamicConfigManager; + this.partitionNegativeCache = new PartitionNegativeCache(); this.ioExecutor = ioExecutor; } + @VisibleForTesting + public PartitionNegativeCache getPartitionNegativeCache() { + return partitionNegativeCache; + } + @Override public ServerType providerType() { return provider; @@ -618,6 +627,14 @@ protected MetadataResponse processMetadataRequest( metadataProvider.getPhysicalTablePathFromCache(partitionId); if (physicalTablePath.isPresent()) { partitionPaths.add(physicalTablePath.get()); + } else if (partitionNegativeCache.isKnownNonExistent(partitionId)) { + // Fast-path only after the positive metadata cache misses. A stale negative-cache + // entry must not hide a partition that has already been synced into metadata cache. + throw new PartitionNotExistException( + String.format( + "The partition id '%d' does not exist or you don't have" + + " permission to access it.", + partitionId)); } else { partitionIdsNotExistsInCache.add(partitionId); } @@ -634,6 +651,12 @@ protected MetadataResponse processMetadataRequest( if (partitionIdAndPaths.containsKey(partitionId)) { partitionPaths.add(partitionIdAndPaths.get(partitionId)); } else { + // Only cache when the authoritative partition assignment is also gone. A miss + // from the scoped table-path lookup may simply mean that the request omitted + // the owning table or the session is not authorized for it. + if (isPartitionAssignmentMissingFromZk(partitionId)) { + partitionNegativeCache.markNonExistent(partitionId); + } throw new PartitionNotExistException( String.format( "The partition id '%d' does not exist or you don't have permission to access it.", @@ -674,4 +697,15 @@ protected MetadataResponse processMetadataRequest( return buildMetadataResponse( coordinatorServer, aliveTabletServers, tablesMetadata, partitionsMetadata); } + + private boolean isPartitionAssignmentMissingFromZk(long partitionId) { + try { + return !zkClient.getPartitionAssignment(partitionId).isPresent(); + } catch (Exception e) { + throw new FlussRuntimeException( + String.format( + "Failed to get partition assignment for partition %d.", partitionId), + e); + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java new file mode 100644 index 0000000000..c4ddd260e7 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.shaded.guava32.com.google.common.base.Ticker; +import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache; +import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; + +import javax.annotation.concurrent.ThreadSafe; + +import java.time.Duration; + +/** + * A thread-safe negative cache for partition IDs that are known to not exist in ZooKeeper. + * + *

This cache helps reduce ZooKeeper pressure when clients repeatedly request metadata for + * partitions that have been deleted (e.g., during hourly partition rotation). Instead of querying + * ZK every time, we cache the "not exist" result and return it directly. + * + *

The cache uses access-time-based TTL: entries are evicted after a configurable duration of no + * access. As long as clients keep asking for the same non-existent partition, the cache entry stays + * alive and protects ZK. + */ +@ThreadSafe +public class PartitionNegativeCache { + + /** Default TTL for negative cache entries: 10 minutes of no access. */ + private static final Duration DEFAULT_TTL = Duration.ofMinutes(10); + + /** Default maximum number of negative cache entries. */ + private static final long DEFAULT_MAXIMUM_SIZE = 10_000L; + + private final Cache cache; + + public PartitionNegativeCache() { + this(DEFAULT_TTL, DEFAULT_MAXIMUM_SIZE); + } + + public PartitionNegativeCache(Duration ttl, long maximumSize) { + this(ttl, maximumSize, SystemClock.getInstance()); + } + + @VisibleForTesting + PartitionNegativeCache(Duration ttl, long maximumSize, Clock clock) { + if (ttl.isZero() || ttl.isNegative()) { + throw new IllegalArgumentException("TTL must be positive."); + } + if (maximumSize <= 0) { + throw new IllegalArgumentException("Maximum size must be positive."); + } + this.cache = + CacheBuilder.newBuilder() + .maximumSize(maximumSize) + .expireAfterAccess(ttl) + .ticker( + new Ticker() { + @Override + public long read() { + return clock.nanoseconds(); + } + }) + .build(); + } + + /** + * Checks if the given partition ID is known to not exist. + * + *

If the entry exists and is not expired, Guava refreshes its access time and this method + * returns {@code true}. If the entry is expired or doesn't exist, returns {@code false}. + * + * @param partitionId the partition ID to check + * @return {@code true} if the partition is known to not exist, {@code false} otherwise + */ + public boolean isKnownNonExistent(long partitionId) { + return cache.getIfPresent(partitionId) != null; + } + + /** + * Marks the given partition ID as known to not exist. The entry will remain in the cache until + * it hasn't been accessed for the configured TTL duration or the bounded cache evicts it. + * + * @param partitionId the partition ID to mark as non-existent + */ + public void markNonExistent(long partitionId) { + cache.put(partitionId, Boolean.TRUE); + } + + /** + * Returns the current number of entries in the cache. Includes potentially expired entries that + * haven't been cleaned up yet. + */ + @VisibleForTesting + public long size() { + cache.cleanUp(); + return cache.size(); + } + + /** Removes all entries from the cache. */ + @VisibleForTesting + public void clear() { + cache.invalidateAll(); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java new file mode 100644 index 0000000000..498db45716 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.server.testutils.FlussClusterExtension; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Collections; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createPartition; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropPartitionRequest; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for {@link PartitionNegativeCache} integration with metadata request processing. */ +class PartitionNegativeCacheITCase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder().setNumOfTabletServers(1).build(); + + private static CoordinatorGateway coordinatorGateway; + + @BeforeAll + static void setup() { + coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + } + + @Test + void testNegativeCacheForDeletedPartition() throws Exception { + FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata(); + + // Create a partitioned table with auto-partition disabled. + TablePath tablePath = TablePath.of("test_db_neg_cache", "partitioned_table"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(1) + .partitionedBy("b") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, false) + .build(); + createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + + // Create a partition and get its ID. + String partitionName = "p_to_delete"; + long partitionId = + createPartition( + FLUSS_CLUSTER_EXTENSION, + tablePath, + new PartitionSpec(Collections.singletonMap("b", partitionName)), + false); + + PartitionNegativeCache negativeCache = + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorService() + .getPartitionNegativeCache(); + + // A stale negative-cache entry must not hide a partition that exists in metadata cache. + MetadataRequest existingPartitionRequest = new MetadataRequest(); + existingPartitionRequest.addPartitionsId(partitionId); + existingPartitionRequest + .addTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + negativeCache.markNonExistent(partitionId); + retry( + Duration.ofMinutes(1), + () -> coordinatorGateway.metadata(existingPartitionRequest).get()); + negativeCache.clear(); + + // Drop the partition. + coordinatorGateway + .dropPartition( + newDropPartitionRequest( + tablePath, + new PartitionSpec(Collections.singletonMap("b", partitionName)), + false)) + .get(); + + // Wait until the partition is actually deleted from ZK (metadata cache update). + retry( + Duration.ofMinutes(1), + () -> { + // Request metadata with the deleted partition ID - should throw. + MetadataRequest request = new MetadataRequest(); + request.addPartitionsId(partitionId); + request.addTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + assertThatThrownBy(() -> coordinatorGateway.metadata(request).get()) + .cause() + .isInstanceOf(PartitionNotExistException.class); + assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue(); + }); + + // At this point, the partition ID should be in the negative cache. + // Verify the negative cache is populated on the coordinator. + assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue(); + + // Second request should also fail (served from negative cache, no ZK query). + MetadataRequest secondRequest = new MetadataRequest(); + secondRequest.addPartitionsId(partitionId); + secondRequest + .addTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + assertThatThrownBy(() -> coordinatorGateway.metadata(secondRequest).get()) + .cause() + .isInstanceOf(PartitionNotExistException.class); + + // Negative cache should still have the entry. + assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue(); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java new file mode 100644 index 0000000000..7aeec015ce --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.utils.clock.ManualClock; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PartitionNegativeCache}. */ +class PartitionNegativeCacheTest { + + private static final Duration TTL = Duration.ofMinutes(10); + private static final long TTL_MS = TTL.toMillis(); + private static final long MAXIMUM_SIZE = 100L; + + @Test + void testMarkAndQueryNonExistent() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + // Initially, partition is not known as non-existent + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + + // Mark it as non-existent + cache.markNonExistent(100L); + + // Now it should be known as non-existent + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + assertThat(cache.size()).isEqualTo(1); + } + + @Test + void testUnknownPartitionReturnsFalse() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + // Never marked partition should return false + assertThat(cache.isKnownNonExistent(999L)).isFalse(); + assertThat(cache.isKnownNonExistent(0L)).isFalse(); + assertThat(cache.isKnownNonExistent(-1L)).isFalse(); + } + + @Test + void testMultiplePartitions() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + cache.markNonExistent(1L); + cache.markNonExistent(2L); + cache.markNonExistent(3L); + + assertThat(cache.isKnownNonExistent(1L)).isTrue(); + assertThat(cache.isKnownNonExistent(2L)).isTrue(); + assertThat(cache.isKnownNonExistent(3L)).isTrue(); + assertThat(cache.isKnownNonExistent(4L)).isFalse(); + assertThat(cache.size()).isEqualTo(3); + } + + @Test + void testTtlExpiration() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + cache.markNonExistent(100L); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // Advance time past TTL from last access time (which is 0) + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + + // Entry should be removed from cache + assertThat(cache.size()).isEqualTo(0); + } + + @Test + void testExpiresAtTtlBoundary() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + cache.markNonExistent(100L); + + clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // Guava expireAfterAccess expires the entry once the TTL boundary is reached. + cache.markNonExistent(100L); + clock.advanceTime(TTL_MS, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + } + + @Test + void testAccessTimeRefreshKeepsEntryAlive() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + cache.markNonExistent(100L); + + // Access at t = TTL - 1 (within TTL window), refreshes access time to TTL - 1 + clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // Now TTL is measured from last access (TTL_MS - 1). + // At t = 2*TTL - 2, elapsed = TTL - 1, NOT > TTL, still valid. + clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); // refreshes to 2*TTL - 2 + + // Access again at t = 3*TTL - 3 to prove indefinite survival + clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); // refreshes to 3*TTL - 3 + + // Now stop accessing. Advance past TTL from last access (3*TTL - 3). + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + } + + @Test + void testClear() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + cache.markNonExistent(1L); + cache.markNonExistent(2L); + cache.markNonExistent(3L); + assertThat(cache.size()).isEqualTo(3); + + cache.clear(); + assertThat(cache.size()).isEqualTo(0); + assertThat(cache.isKnownNonExistent(1L)).isFalse(); + assertThat(cache.isKnownNonExistent(2L)).isFalse(); + assertThat(cache.isKnownNonExistent(3L)).isFalse(); + } + + @Test + void testRemarkAfterExpiration() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + cache.markNonExistent(100L); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // Expire the entry + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + + // Re-mark at new time should work + cache.markNonExistent(100L); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // The new entry expires TTL after re-mark time (TTL_MS + 1) + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + } + + @Test + void testIndependentExpiration() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + // Mark partition 1 at t=0 + cache.markNonExistent(1L); + + // Mark partition 2 at t=5min + clock.advanceTime(TTL_MS / 2, TimeUnit.MILLISECONDS); + cache.markNonExistent(2L); + + // At t=TTL+1: partition 1 should expire, partition 2 should still be alive + clock.advanceTime(TTL_MS / 2 + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(1L)).isFalse(); + assertThat(cache.isKnownNonExistent(2L)).isTrue(); // refreshes access time to TTL_MS+1 + + // Partition 2's access time was refreshed to TTL_MS+1 above. + // To expire it, advance past TTL from that refreshed time. + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(2L)).isFalse(); + } + + @Test + void testMaximumSizeBoundsCache() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, 2L, clock); + + cache.markNonExistent(1L); + cache.markNonExistent(2L); + cache.markNonExistent(3L); + + assertThat(cache.size()).isLessThanOrEqualTo(2); + } +}