Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
<td><h5>blob-write-null-on-missing-file</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to write NULL for a descriptor BLOB value when the referenced file does not exist during Flink writes. When false, the write fails when the descriptor is read.</td>
<td>Whether to write NULL for a descriptor BLOB value when the referenced file or HTTP resource does not exist during Flink writes. When false, the write fails when the descriptor is read.</td>
</tr>
<tr>
<td><h5>blob.split-by-file-size</h5></td>
Expand Down Expand Up @@ -903,6 +903,24 @@
<td>Integer</td>
<td>Level threshold of lookup to generate remote lookup files. Level files below this threshold will not generate remote lookup files.</td>
</tr>
<tr>
<td><h5>manifest-sort.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to invoke manifest sort rewrite during commit.<br />Note: enabling this changes the semantics of 'manifest.merge-min-count'. In the sort rewrite path, small manifest files within the rewrite budget are sorted and merged directly, so the minimum-count gate no longer prevents merging a small number of under-budget manifest files when full compaction is not triggered.</td>
</tr>
<tr>
<td><h5>manifest-sort.max-rewrite-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting. The cap only limits the sorted rewrite portion and full/minor cleanup may still happen beyond it.</td>
</tr>
<tr>
<td><h5>manifest-sort.partition-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition field name to sort manifest entries by. Validated by schema validation, if not configured, defaults to the first partition field.</td>
</tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
Expand Down Expand Up @@ -933,24 +951,6 @@
<td>Integer</td>
<td>To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.<br />Note: when 'manifest-sort.enabled' is true, this minimum-count gate is only applied to the trailing sub-segment of a section that exceeds 'manifest-sort.max-rewrite-size'. Small under-budget sections are sorted and rewritten directly, so two small manifest files may be merged into one even when their count is below this threshold and full compaction is not triggered.</td>
</tr>
<tr>
<td><h5>manifest-sort.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to invoke manifest sort rewrite during commit.<br />Note: enabling this changes the semantics of 'manifest.merge-min-count'. In the sort rewrite path, small manifest files within the rewrite budget are sorted and merged directly, so the minimum-count gate no longer prevents merging a small number of under-budget manifest files when full compaction is not triggered.</td>
</tr>
<tr>
<td><h5>manifest-sort.partition-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition field name to sort manifest entries by. Validated by schema validation, if not configured, defaults to the first partition field.</td>
</tr>
<tr>
<td><h5>manifest-sort.max-rewrite-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting. The cap only limits the sorted rewrite portion and full/minor cleanup may still happen beyond it.</td>
</tr>
<tr>
<td><h5>manifest.target-file-size</h5></td>
<td style="word-wrap: break-word;">8 mb</td>
Expand Down
5 changes: 3 additions & 2 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2453,8 +2453,9 @@ public InlineElement getDescription() {
.defaultValue(false)
.withDescription(
"Whether to write NULL for a descriptor BLOB value when the "
+ "referenced file does not exist during Flink writes. When "
+ "false, the write fails when the descriptor is read.");
+ "referenced file or HTTP resource does not exist during Flink "
+ "writes. When false, the write fails when the descriptor is "
+ "read.");

@Immutable
public static final ConfigOption<String> BLOB_EXTERNAL_STORAGE_PATH =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.rest.interceptor.TimingInterceptor;

import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpHead;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
Expand All @@ -31,6 +32,7 @@
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.client5.http.ssl.HttpsSupport;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;
Expand Down Expand Up @@ -86,9 +88,65 @@ private static HttpClientConnectionManager configureConnectionManager() {
public static InputStream getAsInputStream(String uri) throws IOException {
HttpGet httpGet = new HttpGet(uri);
CloseableHttpResponse response = DEFAULT_HTTP_CLIENT.execute(httpGet);
if (response.getCode() != 200) {
throw new RuntimeException("HTTP error code: " + response.getCode());
int statusCode = response.getCode();
if (statusCode != HttpStatus.SC_OK) {
throw httpError(statusCode);
}
return response.getEntity().getContent();
}

/**
* Checks whether an HTTP resource exists. HEAD is attempted first; when HEAD does not return
* 200, a lightweight GET with {@code Range: bytes=0-0} is used to verify readability. This
* avoids treating signed or GET-only URLs as missing when HEAD is rejected or returns a
* different status than GET.
*/
public static boolean exists(String uri) throws IOException {
int headStatusCode = headStatusCode(uri);
if (headStatusCode == HttpStatus.SC_OK) {
return true;
}
int rangeStatusCode = getRangeStatusCode(uri);
if (rangeStatusCode == HttpStatus.SC_OK
|| rangeStatusCode == HttpStatus.SC_PARTIAL_CONTENT) {
return true;
}
if (rangeStatusCode == HttpStatus.SC_NOT_FOUND) {
return false;
}
throw new IOException(
"Unexpected HTTP status code: " + rangeStatusCode + " for uri: " + uri);
}

public static boolean isNotFoundError(Throwable throwable) {
Throwable current = throwable;
while (current != null) {
if (current instanceof RuntimeException
&& current.getMessage() != null
&& current.getMessage().startsWith("HTTP error code: 404")) {
return true;
}
current = current.getCause();
}
return false;
}

private static int headStatusCode(String uri) throws IOException {
HttpHead httpHead = new HttpHead(uri);
try (CloseableHttpResponse response = DEFAULT_HTTP_CLIENT.execute(httpHead)) {
return response.getCode();
}
}

private static int getRangeStatusCode(String uri) throws IOException {
HttpGet httpGet = new HttpGet(uri);
httpGet.addHeader("Range", "bytes=0-0");
try (CloseableHttpResponse response = DEFAULT_HTTP_CLIENT.execute(httpGet)) {
return response.getCode();
}
}

private static RuntimeException httpError(int statusCode) {
return new RuntimeException("HTTP error code: " + statusCode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link HttpClientUtils}. */
public class HttpClientUtilsTest {

private HttpServer server;
private int port;

@BeforeEach
public void setUp() throws Exception {
server = HttpServer.create(new InetSocketAddress(0), 0);
port = server.getAddress().getPort();
server.start();
}

@AfterEach
public void tearDown() {
if (server != null) {
server.stop(0);
}
}

@Test
public void testExistsReturnsTrueForAvailableResource() throws Exception {
registerHandler(
"/ok",
exchange -> {
respond(exchange, 200, "abc".getBytes());
});

assertThat(HttpClientUtils.exists(url("/ok"))).isTrue();
}

@Test
public void testExistsReturnsFalseForMissingResource() throws Exception {
registerHandler(
"/missing",
exchange -> {
respond(exchange, 404, new byte[0]);
});

assertThat(HttpClientUtils.exists(url("/missing"))).isFalse();
}

@Test
public void testExistsFallsBackToRangeGetWhenHeadNotAllowed() throws Exception {
registerHandler(
"/no-head",
exchange -> {
if ("HEAD".equals(exchange.getRequestMethod())) {
respond(exchange, 405, new byte[0]);
return;
}
respond(exchange, 200, "abc".getBytes());
});

assertThat(HttpClientUtils.exists(url("/no-head"))).isTrue();
}

@Test
public void testExistsFallsBackToRangeGetWhenHeadReturnsNotFound() throws Exception {
registerHandler(
"/head-404-get-ok",
exchange -> {
if ("HEAD".equals(exchange.getRequestMethod())) {
respond(exchange, 404, new byte[0]);
return;
}
if ("GET".equals(exchange.getRequestMethod())
&& exchange.getRequestHeaders().getFirst("Range") != null) {
respond(exchange, 206, "abc".getBytes());
return;
}
respond(exchange, 404, new byte[0]);
});

assertThat(HttpClientUtils.exists(url("/head-404-get-ok"))).isTrue();
}

@Test
public void testExistsFallsBackToRangeGetWhenHeadReturnsForbidden() throws Exception {
registerHandler(
"/head-403-get-ok",
exchange -> {
if ("HEAD".equals(exchange.getRequestMethod())) {
respond(exchange, 403, new byte[0]);
return;
}
if ("GET".equals(exchange.getRequestMethod())
&& exchange.getRequestHeaders().getFirst("Range") != null) {
respond(exchange, 200, "abc".getBytes());
return;
}
respond(exchange, 403, new byte[0]);
});

assertThat(HttpClientUtils.exists(url("/head-403-get-ok"))).isTrue();
}

@Test
public void testExistsReturnsFalseOnlyWhenRangeGetAlsoNotFound() throws Exception {
registerHandler(
"/head-404-get-404",
exchange -> {
if ("HEAD".equals(exchange.getRequestMethod())) {
respond(exchange, 404, new byte[0]);
return;
}
respond(exchange, 404, new byte[0]);
});

assertThat(HttpClientUtils.exists(url("/head-404-get-404"))).isFalse();
}

@Test
public void testGetAsInputStreamThrowsForNotFound() {
registerHandler(
"/get-missing",
exchange -> {
respond(exchange, 404, new byte[0]);
});

assertThatThrownBy(() -> HttpClientUtils.getAsInputStream(url("/get-missing")))
.isInstanceOf(RuntimeException.class)
.hasMessage("HTTP error code: 404");
}

@Test
public void testIsNotFoundError() {
RuntimeException exception =
new RuntimeException("wrapper", new RuntimeException("HTTP error code: 404"));
assertThat(HttpClientUtils.isNotFoundError(exception)).isTrue();
assertThat(HttpClientUtils.isNotFoundError(new RuntimeException("HTTP error code: 500")))
.isFalse();
}

private void registerHandler(String path, HttpHandler handler) {
server.createContext(path, handler);
}

private String url(String path) {
return "http://127.0.0.1:" + port + path;
}

private static void respond(HttpExchange exchange, int statusCode, byte[] body)
throws IOException {
boolean headRequest = "HEAD".equals(exchange.getRequestMethod());
long responseLength = headRequest ? -1 : body.length;
exchange.sendResponseHeaders(statusCode, responseLength);
if (!headRequest && body.length > 0) {
try (OutputStream outputStream = exchange.getResponseBody()) {
outputStream.write(body);
}
} else {
exchange.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,9 @@ class HttpUriReader implements UriReader {
public SeekableInputStream newInputStream(String uri) throws IOException {
return SeekableInputStream.wrap(HttpClientUtils.getAsInputStream(uri));
}

public boolean exists(String uri) throws IOException {
return HttpClientUtils.exists(uri);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ public UriReader create(String input) {

public boolean exists(String input) throws IOException {
UriReader reader = create(input);
return !(reader instanceof UriReader.FileUriReader)
|| ((UriReader.FileUriReader) reader).exists(input);
if (reader instanceof UriReader.FileUriReader) {
return ((UriReader.FileUriReader) reader).exists(input);
}
if (reader instanceof UriReader.HttpUriReader) {
return ((UriReader.HttpUriReader) reader).exists(input);
}
return true;
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
Expand Down
Loading
Loading