Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.server;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.cluster.ConfigEntry;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.metadata.MetadataProvider;
import org.apache.fluss.server.metadata.PartitionMetadata;
import org.apache.fluss.server.metadata.PartitionNegativeCache;
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.metadata.TableMetadata;
import org.apache.fluss.server.tablet.TabletService;
Expand Down Expand Up @@ -143,6 +145,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR
protected final MetadataManager metadataManager;
protected final @Nullable Authorizer authorizer;
protected final DynamicConfigManager dynamicConfigManager;
protected final PartitionNegativeCache partitionNegativeCache;

private long tokenLastUpdateTimeMs = 0;
private ObtainedSecurityToken securityToken = null;
Expand All @@ -164,9 +167,15 @@ public RpcServiceBase(
this.metadataManager = metadataManager;
this.authorizer = authorizer;
this.dynamicConfigManager = dynamicConfigManager;
this.partitionNegativeCache = new PartitionNegativeCache();
this.ioExecutor = ioExecutor;
}

@VisibleForTesting
public PartitionNegativeCache getPartitionNegativeCache() {
return partitionNegativeCache;
}

@Override
public ServerType providerType() {
return provider;
Expand Down Expand Up @@ -618,6 +627,14 @@ protected MetadataResponse processMetadataRequest(
metadataProvider.getPhysicalTablePathFromCache(partitionId);
if (physicalTablePath.isPresent()) {
partitionPaths.add(physicalTablePath.get());
} else if (partitionNegativeCache.isKnownNonExistent(partitionId)) {
// Fast-path only after the positive metadata cache misses. A stale negative-cache
// entry must not hide a partition that has already been synced into metadata cache.
throw new PartitionNotExistException(
String.format(
"The partition id '%d' does not exist or you don't have"
+ " permission to access it.",
partitionId));
} else {
partitionIdsNotExistsInCache.add(partitionId);
}
Expand All @@ -634,6 +651,12 @@ protected MetadataResponse processMetadataRequest(
if (partitionIdAndPaths.containsKey(partitionId)) {
partitionPaths.add(partitionIdAndPaths.get(partitionId));
} else {
// Only cache when the authoritative partition assignment is also gone. A miss
// from the scoped table-path lookup may simply mean that the request omitted
// the owning table or the session is not authorized for it.
if (isPartitionAssignmentMissingFromZk(partitionId)) {
partitionNegativeCache.markNonExistent(partitionId);
}
throw new PartitionNotExistException(
String.format(
"The partition id '%d' does not exist or you don't have permission to access it.",
Expand Down Expand Up @@ -674,4 +697,15 @@ protected MetadataResponse processMetadataRequest(
return buildMetadataResponse(
coordinatorServer, aliveTabletServers, tablesMetadata, partitionsMetadata);
}

private boolean isPartitionAssignmentMissingFromZk(long partitionId) {
try {
return !zkClient.getPartitionAssignment(partitionId).isPresent();
} catch (Exception e) {
throw new FlussRuntimeException(
String.format(
"Failed to get partition assignment for partition %d.", partitionId),
e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.metadata;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.shaded.guava32.com.google.common.base.Ticker;
import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache;
import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.clock.SystemClock;

import javax.annotation.concurrent.ThreadSafe;

import java.time.Duration;

/**
* A thread-safe negative cache for partition IDs that are known to not exist in ZooKeeper.
*
* <p>This cache helps reduce ZooKeeper pressure when clients repeatedly request metadata for
* partitions that have been deleted (e.g., during hourly partition rotation). Instead of querying
* ZK every time, we cache the "not exist" result and return it directly.
*
* <p>The cache uses access-time-based TTL: entries are evicted after a configurable duration of no
* access. As long as clients keep asking for the same non-existent partition, the cache entry stays
* alive and protects ZK.
*/
@ThreadSafe
public class PartitionNegativeCache {

/** Default TTL for negative cache entries: 10 minutes of no access. */
private static final Duration DEFAULT_TTL = Duration.ofMinutes(10);

/** Default maximum number of negative cache entries. */
private static final long DEFAULT_MAXIMUM_SIZE = 10_000L;

private final Cache<Long, Boolean> cache;

public PartitionNegativeCache() {
this(DEFAULT_TTL, DEFAULT_MAXIMUM_SIZE);
}

public PartitionNegativeCache(Duration ttl, long maximumSize) {
this(ttl, maximumSize, SystemClock.getInstance());
}

@VisibleForTesting
PartitionNegativeCache(Duration ttl, long maximumSize, Clock clock) {
if (ttl.isZero() || ttl.isNegative()) {
throw new IllegalArgumentException("TTL must be positive.");
}
if (maximumSize <= 0) {
throw new IllegalArgumentException("Maximum size must be positive.");
}
this.cache =
CacheBuilder.newBuilder()
.maximumSize(maximumSize)
.expireAfterAccess(ttl)
.ticker(
new Ticker() {
@Override
public long read() {
return clock.nanoseconds();
}
})
.build();
}

/**
* Checks if the given partition ID is known to not exist.
*
* <p>If the entry exists and is not expired, Guava refreshes its access time and this method
* returns {@code true}. If the entry is expired or doesn't exist, returns {@code false}.
*
* @param partitionId the partition ID to check
* @return {@code true} if the partition is known to not exist, {@code false} otherwise
*/
public boolean isKnownNonExistent(long partitionId) {
return cache.getIfPresent(partitionId) != null;
}

/**
* Marks the given partition ID as known to not exist. The entry will remain in the cache until
* it hasn't been accessed for the configured TTL duration or the bounded cache evicts it.
*
* @param partitionId the partition ID to mark as non-existent
*/
public void markNonExistent(long partitionId) {
cache.put(partitionId, Boolean.TRUE);
}

/**
* Returns the current number of entries in the cache. Includes potentially expired entries that
* haven't been cleaned up yet.
*/
@VisibleForTesting
public long size() {
cache.cleanUp();
return cache.size();
}

/** Removes all entries from the cache. */
@VisibleForTesting
public void clear() {
cache.invalidateAll();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.metadata;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.server.testutils.FlussClusterExtension;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;
import java.util.Collections;

import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createPartition;
import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropPartitionRequest;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** ITCase for {@link PartitionNegativeCache} integration with metadata request processing. */
class PartitionNegativeCacheITCase {

@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
FlussClusterExtension.builder().setNumOfTabletServers(1).build();

private static CoordinatorGateway coordinatorGateway;

@BeforeAll
static void setup() {
coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
}

@Test
void testNegativeCacheForDeletedPartition() throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();

// Create a partitioned table with auto-partition disabled.
TablePath tablePath = TablePath.of("test_db_neg_cache", "partitioned_table");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA1_SCHEMA)
.distributedBy(1)
.partitionedBy("b")
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, false)
.build();
createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor);

// Create a partition and get its ID.
String partitionName = "p_to_delete";
long partitionId =
createPartition(
FLUSS_CLUSTER_EXTENSION,
tablePath,
new PartitionSpec(Collections.singletonMap("b", partitionName)),
false);

PartitionNegativeCache negativeCache =
FLUSS_CLUSTER_EXTENSION
.getCoordinatorServer()
.getCoordinatorService()
.getPartitionNegativeCache();

// A stale negative-cache entry must not hide a partition that exists in metadata cache.
MetadataRequest existingPartitionRequest = new MetadataRequest();
existingPartitionRequest.addPartitionsId(partitionId);
existingPartitionRequest
.addTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
negativeCache.markNonExistent(partitionId);
retry(
Duration.ofMinutes(1),
() -> coordinatorGateway.metadata(existingPartitionRequest).get());
negativeCache.clear();

// Drop the partition.
coordinatorGateway
.dropPartition(
newDropPartitionRequest(
tablePath,
new PartitionSpec(Collections.singletonMap("b", partitionName)),
false))
.get();

// Wait until the partition is actually deleted from ZK (metadata cache update).
retry(
Duration.ofMinutes(1),
() -> {
// Request metadata with the deleted partition ID - should throw.
MetadataRequest request = new MetadataRequest();
request.addPartitionsId(partitionId);
request.addTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
assertThatThrownBy(() -> coordinatorGateway.metadata(request).get())
.cause()
.isInstanceOf(PartitionNotExistException.class);
assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue();
});

// At this point, the partition ID should be in the negative cache.
// Verify the negative cache is populated on the coordinator.
assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue();

// Second request should also fail (served from negative cache, no ZK query).
MetadataRequest secondRequest = new MetadataRequest();
secondRequest.addPartitionsId(partitionId);
secondRequest
.addTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
assertThatThrownBy(() -> coordinatorGateway.metadata(secondRequest).get())
.cause()
.isInstanceOf(PartitionNotExistException.class);

// Negative cache should still have the entry.
assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue();
}
}
Loading