Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
447 changes: 309 additions & 138 deletions docs/config-app.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -55,83 +55,50 @@ public void disableScan() {
}

public Future<BidsScanResult> submitBids(RedisBidsData bids) {
final Promise<BidsScanResult> scanResult = Promise.promise();

final RedisAPI readRedisNodeAPI = this.readRedisNode.getRedisAPI();
final boolean shouldSubmit = !isScanDisabled
&& readRedisNodeAPI != null && !bids.getBresps().isEmpty();

if (shouldSubmit) {
readRedisNodeAPI.get("function_submit_bids", submitHash -> {
final Object submitHashResult = submitHash.result();
if (submitHashResult != null) {
final List<String> readArgs = List.of(
submitHashResult.toString(),
"0",
toBidsAsJson(bids),
apiKey,
"true");

readRedisNodeAPI.evalsha(readArgs, response -> {
if (response.result() != null) {
final BidsScanResult parserResult = redisParser
.parseBidsScanResult(response.result().toString());
final boolean isAnyRoSkipped = parserResult.getBidScanResults()
.stream().anyMatch(BidScanResult::isRoSkipped);

if (isAnyRoSkipped) {
reSubmitBidsToWriteNode(readArgs, scanResult);
} else {
scanResult.complete(parserResult);
}
} else {
scanResult.complete(getEmptyScanResult());
}
});
} else {
scanResult.complete(getEmptyScanResult());
}
});

return scanResult.future();
if (isScanDisabled || readRedisNodeAPI == null || bids.getBresps().isEmpty()) {
return Future.succeededFuture(getEmptyScanResult());
}

return Future.succeededFuture(getEmptyScanResult());
return readRedisNodeAPI.get("function_submit_bids")
.map(Response::toString)
.map(response -> List.of(response, "0", toBidsAsJson(bids), apiKey, "true"))
.compose(args -> scanBids(args, readRedisNodeAPI))
.otherwise(ignored -> getEmptyScanResult());
}

private Future<BidsScanResult> scanBids(List<String> args, RedisAPI redisAPI) {
return redisAPI.evalsha(args)
.map(Response::toString)
.map(redisParser::parseBidsScanResult)
.compose(parsedResult -> parsedResult.getBidScanResults()
.stream().anyMatch(BidScanResult::isRoSkipped)
? reSubmitBidsToWriteNode(args)
: Future.succeededFuture(parsedResult));
}

private void reSubmitBidsToWriteNode(List<String> readArgs, Promise<BidsScanResult> scanResult) {
private Future<BidsScanResult> reSubmitBidsToWriteNode(List<String> readArgs) {
final RedisAPI writeRedisAPI = this.writeRedisNode.getRedisAPI();
if (writeRedisAPI != null) {
final List<String> writeArgs = readArgs.stream().limit(4).toList();
writeRedisAPI.evalsha(writeArgs, response -> {
if (response.result() != null) {
final BidsScanResult parserResult = redisParser
.parseBidsScanResult(response.result().toString());

scanResult.complete(parserResult);
} else {
scanResult.complete(getEmptyScanResult());
}
});
} else {
scanResult.complete(getEmptyScanResult());
if (writeRedisAPI == null) {
return Future.succeededFuture(getEmptyScanResult());
}

final List<String> writeArgs = readArgs.stream().limit(4).toList();
return writeRedisAPI.evalsha(writeArgs)
.map(Response::toString)
.map(redisParser::parseBidsScanResult);
}

public Future<Boolean> isScanDisabledFlag() {
final RedisAPI redisAPI = this.readRedisNode.getRedisAPI();
final Promise<Boolean> isDisabled = Promise.promise();

if (redisAPI != null) {
redisAPI.get("scan-disabled", scanDisabledValue -> {
final Response scanDisabled = scanDisabledValue.result();
isDisabled.complete(scanDisabled != null && "true".equals(scanDisabled.toString()));
});

return isDisabled.future();
if (redisAPI == null) {
return Future.succeededFuture(true);
}

return Future.succeededFuture(true);
return redisAPI.get("scan-disabled")
.map(Response::toString)
.map(Boolean::parseBoolean)
.otherwise(false);
}

private String toBidsAsJson(RedisBidsData bids) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public RedisAPI getRedisAPI() {
*/
private void createRedisClient(Handler<AsyncResult<RedisConnection>> handler, boolean isReconnect) {
Redis.createClient(vertx, options)
.connect(onConnect -> {
.connect()
.onComplete(onConnect -> {
if (onConnect.succeeded()) {
connection = onConnect.result();
connection.exceptionHandler(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.http.HttpHeaders;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.execution.timeout.Timeout;
Expand Down Expand Up @@ -69,7 +69,7 @@ private String resolveEndpoint(String tenant, String origin) {
}

private static MultiMap headers(OptableTargetingProperties properties, List<String> ips, String userAgent) {
final MultiMap headers = HeadersMultiMap.headers()
final MultiMap headers = HttpHeaders.headers()
.add(HttpUtil.ACCEPT_HEADER, "application/json");

if (userAgent != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.iab.openrtb.response.BidResponse;
import com.iab.openrtb.response.SeatBid;
import io.vertx.core.MultiMap;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.http.HttpHeaders;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
import org.prebid.server.activity.infrastructure.ActivityInfrastructure;
Expand Down Expand Up @@ -193,7 +193,7 @@ protected Device givenDevice() {
}

protected HttpClientResponse givenSuccessHttpResponse(String fileName) {
final MultiMap headers = HeadersMultiMap.headers().add("Content-Type", "application/json");
final MultiMap headers = HttpHeaders.headers().add("Content-Type", "application/json");
return HttpClientResponse.of(HttpStatus.SC_OK, headers, givenBodyFromFile(fileName));
}

Expand Down
2 changes: 1 addition & 1 deletion extra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

<!-- Project production dependency versions -->
<spring.boot.version>3.5.10</spring.boot.version>
<vertx.version>4.5.20</vertx.version>
<vertx.version>5.1.0</vertx.version>
<validation-api.version>2.0.1.Final</validation-api.version>
<commons.collections.version>4.4</commons.collections.version>
<commons.compress.version>1.27.1</commons.compress.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.prebid.server.analytics.reporter.pubstack;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -119,19 +118,20 @@ public String name() {
@Override
public void initialize(Promise<Void> initializePromise) {
vertx.setPeriodic(configurationRefreshDelay, id -> fetchRemoteConfig());
fetchRemoteConfig();
initializePromise.tryComplete();
fetchRemoteConfig().onComplete(initializePromise);
}

void shutdown() {
eventHandlers.values().forEach(PubstackEventHandler::reportEvents);
}

private void fetchRemoteConfig() {
private Future<Void> fetchRemoteConfig() {
logger.info("[pubstack] Updating config: {}", pubstackConfig);
httpClient.get(makeEventEndpointUrl(pubstackConfig.getEndpoint(), pubstackConfig.getScopeId()), timeout)
return httpClient.get(makeEventEndpointUrl(pubstackConfig.getEndpoint(), pubstackConfig.getScopeId()), timeout)
.map(this::processRemoteConfigurationResponse)
.onComplete(this::updateConfigsOnChange);
.map(this::updateConfigsOnChange)
.onFailure(PubstackAnalyticsReporter::logError)
.mapEmpty();
}

private PubstackConfig processRemoteConfigurationResponse(HttpClientResponse response) {
Expand All @@ -148,15 +148,14 @@ private PubstackConfig processRemoteConfigurationResponse(HttpClientResponse res
}
}

private void updateConfigsOnChange(AsyncResult<PubstackConfig> asyncConfigResult) {
if (asyncConfigResult.failed()) {
logger.error("[pubstask] Fail to fetch remote configuration: {}", asyncConfigResult.cause().getMessage());
} else if (!Objects.equals(pubstackConfig, asyncConfigResult.result())) {
final PubstackConfig pubstackConfig = asyncConfigResult.result();
private Void updateConfigsOnChange(PubstackConfig config) {
if (!Objects.equals(pubstackConfig, config)) {
eventHandlers.values().forEach(PubstackEventHandler::reportEvents);
this.pubstackConfig = pubstackConfig;
this.pubstackConfig = config;
updateHandlers(pubstackConfig);
}

return null;
}

private void updateHandlers(PubstackConfig pubstackConfig) {
Expand Down Expand Up @@ -187,4 +186,8 @@ private String makeEventHandlerEndpoint(String endpoint, EventType eventType) {
throw new PreBidException(message);
}
}

private static void logError(Throwable throwable) {
logger.error("[pubstask] Fail to fetch remote configuration: {}", throwable.getCause().getMessage());
}
}
4 changes: 2 additions & 2 deletions src/main/java/org/prebid/server/floors/PriceFloorFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.impl.ConcurrentHashSet;
import lombok.Value;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.ObjectUtils;
Expand Down Expand Up @@ -37,6 +36,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -79,7 +79,7 @@ public PriceFloorFetcher(ApplicationSettings applicationSettings,
this.debugProperties = debugProperties;
this.mapper = Objects.requireNonNull(mapper);

fetchInProgress = new ConcurrentHashSet<>();
fetchInProgress = ConcurrentHashMap.newKeySet();
fetchedData = Caffeine.newBuilder()
.maximumSize(MAXIMUM_CACHE_SIZE)
.<String, AccountFetchContext>build()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
package org.prebid.server.geolocation;

import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.circuitbreaker.CircuitBreakerState;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import org.prebid.server.execution.timeout.Timeout;
import org.prebid.server.geolocation.model.GeoInfo;
import org.prebid.server.log.ConditionalLogger;
import org.prebid.server.log.Logger;
import org.prebid.server.log.LoggerFactory;
import org.prebid.server.metric.Metrics;
import org.prebid.server.vertx.CircuitBreaker;

import java.time.Clock;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Wrapper for geolocation service with circuit breaker.
*/
public class CircuitBreakerSecuredGeoLocationService implements GeoLocationService {

private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerSecuredGeoLocationService.class);
private static final ConditionalLogger conditionalLogger = new ConditionalLogger(logger);
private static final int LOG_PERIOD_SECONDS = 5;

private final GeoLocationService geoLocationService;
private final CircuitBreaker breaker;
Expand All @@ -31,39 +28,26 @@ public CircuitBreakerSecuredGeoLocationService(Vertx vertx,
Metrics metrics,
int openingThreshold,
long openingIntervalMs,
long closingIntervalMs,
Clock clock) {
long closingIntervalMs) {

this.geoLocationService = Objects.requireNonNull(geoLocationService);

breaker = new CircuitBreaker("geo_cb", Objects.requireNonNull(vertx),
openingThreshold, openingIntervalMs, closingIntervalMs, Objects.requireNonNull(clock))
.openHandler(ignored -> circuitOpened())
.halfOpenHandler(ignored -> circuitHalfOpened())
.closeHandler(ignored -> circuitClosed());
breaker = CircuitBreaker.create(
"geo_cb",
Objects.requireNonNull(vertx),
new CircuitBreakerOptions()
.setNotificationPeriod(0)
.setMaxFailures(openingThreshold)
.setFailuresRollingWindow(openingIntervalMs)
.setResetTimeout(closingIntervalMs));

metrics.createGeoLocationCircuitBreakerGauge(breaker::isOpen);
metrics.createGeoLocationCircuitBreakerGauge(() -> breaker.state() != CircuitBreakerState.CLOSED);

logger.info("Initialized GeoLocation service with Circuit Breaker");
}

@Override
public Future<GeoInfo> lookup(String ip, Timeout timeout) {
return breaker.execute(promise -> geoLocationService.lookup(ip, timeout).onComplete(promise));
}

private void circuitOpened() {
conditionalLogger.warn(
"GeoLocation service is unavailable, circuit opened.",
LOG_PERIOD_SECONDS,
TimeUnit.SECONDS);
}

private void circuitHalfOpened() {
logger.warn("GeoLocation service is ready to try again, circuit half-opened.");
}

private void circuitClosed() {
logger.warn("GeoLocation service becomes working, circuit closed.");
return breaker.execute(() -> geoLocationService.lookup(ip, timeout));
}
}
7 changes: 1 addition & 6 deletions src/main/java/org/prebid/server/handler/SetuidHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
Expand Down Expand Up @@ -333,7 +332,7 @@ private void respondWithCookie(SetuidContext setuidContext) {
setuidContext.getUidsCookie(), bidder, uid);

uidsCookieService.splitUidsIntoCookies(uidsCookieUpdateResult.getValue())
.forEach(cookie -> addCookie(routingContext, cookie));
.forEach(routingContext.response()::addCookie);

if (uidsCookieUpdateResult.isUpdated()) {
metrics.updateUserSyncSetsMetric(bidder);
Expand Down Expand Up @@ -412,8 +411,4 @@ private void handleErrors(Throwable error, RoutingContext routingContext, TcfCon
analyticsDelegator.processEvent(setuidEvent, tcfContext);
}
}

private void addCookie(RoutingContext routingContext, Cookie cookie) {
routingContext.response().headers().add(HttpUtil.SET_COOKIE_HEADER, cookie.encode());
}
}
5 changes: 3 additions & 2 deletions src/main/java/org/prebid/server/json/JacksonMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBufInputStream;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.buffer.BufferInternal;
import org.prebid.server.proto.openrtb.ext.FlexibleExtension;

import java.io.IOException;
Expand Down Expand Up @@ -66,15 +67,15 @@ public <T> T decodeValue(String str, TypeReference<T> type) throws DecodeExcepti

public <T> T decodeValue(Buffer buf, Class<T> clazz) throws DecodeException {
try {
return mapper.readValue((InputStream) new ByteBufInputStream(buf.getByteBuf()), clazz);
return mapper.readValue((InputStream) new ByteBufInputStream(((BufferInternal) buf).getByteBuf()), clazz);
} catch (IOException e) {
throw new DecodeException(FAILED_TO_DECODE.formatted(e.getMessage()), e);
}
}

public <T> T decodeValue(Buffer buf, TypeReference<T> type) throws DecodeException {
try {
return mapper.readValue(new ByteBufInputStream(buf.getByteBuf()), type);
return mapper.readValue(new ByteBufInputStream(((BufferInternal) buf).getByteBuf()), type);
} catch (IOException e) {
throw new DecodeException(FAILED_TO_DECODE.formatted(e.getMessage()), e);
}
Expand Down
Loading
Loading