From 8f95f355b68f60d1d5367732e5df2d35082a96b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Mon, 4 May 2026 11:45:49 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20Kafka=20deliverBatch=20fire-flush-a?= =?UTF-8?q?wait=20=E2=80=94=2013-41x=20throughput=20(KOJAK-73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Overrides KafkaMessageDeliverer.deliverBatch with the fire-flush-await pattern. Replaces N sequential producer.send().get() round-trips per batch with one batched network round-trip: Phase 1 (fire) producer.send() per entry — non-blocking, records queue in producer's internal buffer Phase 2 (flush) one producer.flush() — bypasses linger.ms, drains all in-flight records to broker in one batched RTT Phase 3 (await) Future.get() per entry — non-blocking after flush, just reads settled completion Per-entry classification preserved via SendOutcome sealed type — synchronous exceptions in send() (BufferExhaustedException, SerializationException) are caught individually so one failing send never aborts the batch. flush() InterruptException restores the interrupt flag and lets remaining futures surface their own outcome. deliver() refactored to share buildRecord/classifyException helpers with deliverBatch — no behavior change for single-entry path. Benchmark results vs KOJAK-68 baseline (smoke run, MacBook M3 Max, Postgres 16 + Kafka 3.8.1 in Testcontainers): batchSize baseline KOJAK-73 improvement 10 ~109 msg/s ~1,468 msg/s 13.5x 50 ~115 msg/s ~3,731 msg/s 32.3x 100 ~115 msg/s ~4,717 msg/s 41.0x Throughput now SCALES with batchSize (was flat ~115 msg/s pre-KOJAK-73), proving Kafka's internal record batching is being exploited. Sublinear scaling 50 -> 100 indicates DB UPDATE per entry is becoming the next bottleneck — exactly what KOJAK-75 (batch UPDATE via executeBatch) addresses next. Tests: - KafkaMessageDelivererBatchTest covers empty input, all-success ordering, flush-counted-once verification, synchronous Permanent + Retriable send exceptions, and future-based async exception (driven via MockProducer override that completeNext/errorNext per position inside flush) - Existing KafkaMessageDelivererTest unchanged — single-entry path still works identically - Integration tests (real Kafka via Testcontainers) pass Documents results in benchmarks/results-postopt-kojak-73.md and bumps README headline numbers to reflect new Kafka throughput. --- README.md | 12 +- benchmarks/kojak-73-kafka.json | 157 ++++++++++++++++++ benchmarks/results-postopt-kojak-73.md | 67 ++++++++ .../okapi/kafka/KafkaMessageDeliverer.kt | 86 ++++++++-- .../kafka/KafkaMessageDelivererBatchTest.kt | 114 +++++++++++++ 5 files changed, 419 insertions(+), 17 deletions(-) create mode 100644 benchmarks/kojak-73-kafka.json create mode 100644 benchmarks/results-postopt-kojak-73.md create mode 100644 okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt diff --git a/README.md b/README.md index 6cb6c3a..ae26b2d 100644 --- a/README.md +++ b/README.md @@ -201,17 +201,17 @@ graph BT ## Performance -Throughput baseline (single instance, sync sequential delivery, MacBook M3 Max, JDK 25 LTS, April 2026): +Throughput on a single instance (MacBook M3 Max, JDK 25 LTS, May 2026): | Transport | batchSize=10 | batchSize=100 | |-----------|--------------|----------------| -| Kafka (`acks=all`, localhost broker) | ~110 msg/s | ~115 msg/s | -| HTTP @ webhook latency 20 ms | ~33 msg/s | ~36 msg/s | -| HTTP @ webhook latency 100 ms | ~9 msg/s | ~9 msg/s | +| Kafka (`acks=all`, localhost broker, async batch via `deliverBatch`) | **~1,470 msg/s** | **~4,720 msg/s** | +| HTTP @ webhook latency 20 ms (sync sequential — KOJAK-74 in progress) | ~33 msg/s | ~36 msg/s | +| HTTP @ webhook latency 100 ms (sync sequential — KOJAK-74 in progress) | ~9 msg/s | ~9 msg/s | -These numbers reflect the current sync-sequential delivery model. Throughput is bounded by per-message round-trip time × batch size. Performance work to lift these limits (async batch delivery, multi-threaded scheduler) is tracked under the [KOJAK-14 epic](https://softwaremill.atlassian.net/browse/KOJAK-14). +Kafka throughput jumped 13-41× over the original sync-sequential baseline thanks to the [KOJAK-73](https://softwaremill.atlassian.net/browse/KOJAK-73) `deliverBatch` fire-flush-await pattern. HTTP is next ([KOJAK-74](https://softwaremill.atlassian.net/browse/KOJAK-74)) and multi-threaded scheduler scaling ([KOJAK-77](https://softwaremill.atlassian.net/browse/KOJAK-77)) is in the roadmap. The full optimization plan lives under the [KOJAK-14 epic](https://softwaremill.atlassian.net/browse/KOJAK-14). -Full methodology, raw JMH results, and reproduction instructions: [`benchmarks/`](benchmarks/). +Full methodology, raw JMH results, before/after per change: [`benchmarks/`](benchmarks/). ## Build diff --git a/benchmarks/kojak-73-kafka.json b/benchmarks/kojak-73-kafka.json new file mode 100644 index 0000000..907ced0 --- /dev/null +++ b/benchmarks/kojak-73-kafka.json @@ -0,0 +1,157 @@ +[ + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "10" + }, + "primaryMetric" : { + "score" : 0.680696006383041, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.6561445592105263, + "50.0" : 0.680696006383041, + "90.0" : 0.7052474535555555, + "95.0" : 0.7052474535555555, + "99.0" : 0.7052474535555555, + "99.9" : 0.7052474535555555, + "99.99" : 0.7052474535555555, + "99.999" : 0.7052474535555555, + "99.9999" : 0.7052474535555555, + "100.0" : 0.7052474535555555 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.7052474535555555, + 0.6561445592105263 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "50" + }, + "primaryMetric" : { + "score" : 0.26791908345521237, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.2562269965675676, + "50.0" : 0.26791908345521237, + "90.0" : 0.27961117034285715, + "95.0" : 0.27961117034285715, + "99.0" : 0.27961117034285715, + "99.9" : 0.27961117034285715, + "99.99" : 0.27961117034285715, + "99.999" : 0.27961117034285715, + "99.9999" : 0.27961117034285715, + "100.0" : 0.27961117034285715 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.27961117034285715, + 0.2562269965675676 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "100" + }, + "primaryMetric" : { + "score" : 0.21151745586904763, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.21086217661904763, + "50.0" : 0.21151745586904763, + "90.0" : 0.2121727351190476, + "95.0" : 0.2121727351190476, + "99.0" : 0.2121727351190476, + "99.9" : 0.2121727351190476, + "99.99" : 0.2121727351190476, + "99.999" : 0.2121727351190476, + "99.9999" : 0.2121727351190476, + "100.0" : 0.2121727351190476 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.2121727351190476, + 0.21086217661904763 + ] + ] + }, + "secondaryMetrics" : { + } + } +] + + diff --git a/benchmarks/results-postopt-kojak-73.md b/benchmarks/results-postopt-kojak-73.md new file mode 100644 index 0000000..c6c23c6 --- /dev/null +++ b/benchmarks/results-postopt-kojak-73.md @@ -0,0 +1,67 @@ +# KOJAK-73: Kafka deliverBatch fire-flush-await — Results + +Measured 2026-05-04 on the same hardware as the KOJAK-68 baseline (MacBook M3 Max, +JDK 25 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers, smoke-run JMH config: +`fork=1, warmup=1, iter=2, warmup=10s, measurement=15s`). + +## Headline numbers — Kafka throughput + +| batchSize | Baseline (ms/op) | KOJAK-73 (ms/op) | **Improvement** | +|-----------|------------------|------------------|-----------------| +| 10 | 9.168 | 0.681 | **13.5×** | +| 50 | 8.665 | 0.268 | **32.3×** | +| 100 | 8.701 | 0.212 | **41.0×** | + +Translated to msg/s: + +| batchSize | Baseline | KOJAK-73 | Improvement | +|-----------|----------|--------------|-------------| +| 10 | ~109 | **~1,468** | 13.5× | +| 50 | ~115 | **~3,731** | 32.3× | +| 100 | ~115 | **~4,717** | 41.0× | + +Raw JSON: [`kojak-73-kafka.json`](kojak-73-kafka.json). + +## What changed + +`KafkaMessageDeliverer.deliverBatch` now uses fire-flush-await: +1. **Fire** — call `producer.send()` for every entry (non-blocking; records go to producer's internal buffer) +2. **Flush** — single `producer.flush()` call drives all queued records to the broker in one batched network round-trip (bypasses `linger.ms`) +3. **Await** — `Future.get()` per entry returns immediately because completion is settled by `flush()` + +Previously, each entry incurred a full `producer.send().get()` round-trip sequentially. With ~9 ms localhost Kafka RTT (`acks=all`), 1000 entries × 9 ms = ~9 s regardless of `batchSize`. + +## Reading the table + +- **`batchSize` is now load-bearing.** Pre-KOJAK-73 throughput was flat across `batchSize` + values (109 → 115 → 115 msg/s) — confirming the bottleneck was per-record blocking I/O. + Post-KOJAK-73 throughput scales with `batchSize` (1,468 → 3,731 → 4,717), proving that + Kafka's internal record batching is now being exploited. +- **Sublinear scaling 50 → 100** (32× → 41× vs expected ~2× more). Indicates that DB UPDATE + overhead per entry is now significant relative to the (now-fast) Kafka path. This is exactly + what motivates KOJAK-75 (batch UPDATE via `executeBatch`) — at small batch sizes the + per-message DB cost was hidden by 9 ms Kafka RTT; with Kafka latency removed, the N + individual UPDATE statements become the next bottleneck to attack. +- **batchSize=10 lowest gain (13.5×)** — at that batch size only 10 records can amortize + one RTT, so the per-batch overhead (claimPending, transaction begin/commit, 10 UPDATEs) is + proportionally larger. + +## Verification context + +- Unit tests: `KafkaMessageDelivererBatchTest` covers empty input, all-success ordering, + single flush call (verified via flush counter), synchronous send exception (Permanent + + Retriable variants), and future-based async exception (driven via `MockProducer` override + that completes/errors per-position inside flush). +- Integration tests in `okapi-integration-tests` continue to pass with real Postgres + Kafka. +- ktlint clean, configuration cache reuses across modules. + +## What's next + +1. **KOJAK-74** — analogous fire-all-await for HTTP via parallel `httpClient.sendAsync`. + Expected impact at realistic webhook latency (`httpLatencyMs ∈ {20, 100}`): + from ~33 / ~9 msg/s baseline to **~500-2,000 msg/s** range, depending on host/connection + pool reuse. +2. **KOJAK-75** — batch UPDATE via `executeBatch`. Now load-bearing: at `batchSize=100` + the N individual UPDATE statements have become the dominant per-batch cost. Expected + to shift `batchSize=100` Kafka throughput from ~4,700 toward the ~10,000 msg/s range. +3. **KOJAK-77** — `concurrency` fan-out. Multiplies all of the above by N workers. diff --git a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt index 31b99b2..82aad47 100644 --- a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt +++ b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt @@ -5,30 +5,82 @@ import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.RetriableException import java.util.concurrent.ExecutionException +import java.util.concurrent.Future /** * [MessageDeliverer] that publishes outbox entries to Kafka topics. * - * Uses the provided [Producer] to send records synchronously. - * Kafka [RetriableException]s map to [DeliveryResult.RetriableFailure]; - * all other errors map to [DeliveryResult.PermanentFailure]. + * - Single-entry [deliver] uses a synchronous send-and-wait round trip. + * - [deliverBatch] uses **fire-flush-await**: all entries are enqueued via + * `producer.send()` (non-blocking), then a single `producer.flush()` call + * drives them out in one batched network round trip, then per-entry + * `Future.get()` collects results without further blocking. + * + * Kafka [RetriableException]s (including [InterruptException] and + * `BufferExhaustedException`/`TimeoutException` since Kafka 3.0) map to + * [DeliveryResult.RetriableFailure]; all other errors map to + * [DeliveryResult.PermanentFailure]. */ class KafkaMessageDeliverer( private val producer: Producer, ) : MessageDeliverer { override val type: String = KafkaDeliveryInfo.TYPE - override fun deliver(entry: OutboxEntry): DeliveryResult { - val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata) - val record = - ProducerRecord(info.topic, info.partitionKey, entry.payload).apply { - info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) } - } + override fun deliver(entry: OutboxEntry): DeliveryResult = try { + producer.send(buildRecord(entry)).get() + DeliveryResult.Success + } catch (e: ExecutionException) { + classifyException(e.cause ?: e) + } catch (e: Exception) { + classifyException(e) + } + + /** + * Sends all entries in a fire-flush-await pattern: + * + * 1. **Fire** — call `producer.send()` for every entry, capturing either the + * in-flight `Future` or a synchronous exception (e.g. `BufferExhaustedException`, + * `SerializationException`). One failing send does not abort the batch. + * 2. **Flush** — `producer.flush()` waits for all in-flight records to be + * acknowledged or fail, in a single call regardless of `linger.ms`. + * 3. **Await** — call `Future.get()` per entry to read the outcome. After + * `flush()` these calls are non-blocking — completion is already settled. + * + * Per-entry classification preserved; the result list mirrors the input order. + * If the calling thread is interrupted during `flush()`, the interrupt flag is + * restored and processing continues — incomplete futures will surface as + * `RetriableFailure` via `InterruptException` from `get()`. + */ + override fun deliverBatch(entries: List): List> { + if (entries.isEmpty()) return emptyList() + + val inflight: List> = entries.map { entry -> + entry to fireOne(entry) + } + + try { + producer.flush() + } catch (e: InterruptException) { + Thread.currentThread().interrupt() + } + + return inflight.map { (entry, outcome) -> entry to awaitOne(outcome) } + } + + private fun fireOne(entry: OutboxEntry): SendOutcome = try { + SendOutcome.Sent(producer.send(buildRecord(entry))) + } catch (e: Exception) { + SendOutcome.ImmediateFailure(classifyException(e)) + } - return try { - producer.send(record).get() + private fun awaitOne(outcome: SendOutcome): DeliveryResult = when (outcome) { + is SendOutcome.ImmediateFailure -> outcome.result + is SendOutcome.Sent -> try { + outcome.future.get() DeliveryResult.Success } catch (e: ExecutionException) { classifyException(e.cause ?: e) @@ -37,9 +89,21 @@ class KafkaMessageDeliverer( } } + private fun buildRecord(entry: OutboxEntry): ProducerRecord { + val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata) + return ProducerRecord(info.topic, info.partitionKey, entry.payload).apply { + info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) } + } + } + private fun classifyException(e: Throwable): DeliveryResult = if (e is RetriableException) { DeliveryResult.RetriableFailure(e.message ?: "Retriable Kafka error") } else { DeliveryResult.PermanentFailure(e.message ?: "Permanent Kafka error") } + + private sealed interface SendOutcome { + data class Sent(val future: Future) : SendOutcome + data class ImmediateFailure(val result: DeliveryResult) : SendOutcome + } } diff --git a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt new file mode 100644 index 0000000..7cd7cef --- /dev/null +++ b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt @@ -0,0 +1,114 @@ +package com.softwaremill.okapi.kafka + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxMessage +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import org.apache.kafka.clients.producer.MockProducer +import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.errors.NetworkException +import org.apache.kafka.common.serialization.StringSerializer +import java.time.Instant + +private fun entry(suffix: String): OutboxEntry { + val info = kafkaDeliveryInfo { topic = "topic-$suffix" } + return OutboxEntry.createPending(OutboxMessage("evt-$suffix", """{"k":"v-$suffix"}"""), info, Instant.now()) +} + +class KafkaMessageDelivererBatchTest : FunSpec({ + test("deliverBatch on empty input returns empty list and does not invoke producer") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + + deliverer.deliverBatch(emptyList()) shouldBe emptyList() + producer.history().size shouldBe 0 + } + + test("deliverBatch with all-success preserves input order and reports all entries delivered") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + results.map { it.first } shouldBe entries + results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } + producer.history().size shouldBe 3 + } + + test("deliverBatch fires all sends BEFORE flushing — flush count incremented exactly once") { + // Drives the producer in non-auto mode: futures are pending until completeNext/errorNext, + // and flush() will complete them. Verifies the fire-flush-await sequence: + // fire 3 sends -> flush completes them in one shot -> get() returns Success for all. + var flushCount = 0 + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + flushCount++ + while (completeNext()) { + // drain remaining + } + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + flushCount shouldBe 1 + results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } + } + + test("deliverBatch maps synchronous send exception to PermanentFailure for ALL entries (sendException is global)") { + // MockProducer.sendException makes producer.send() throw synchronously for every call. + // Each entry hits the fire-phase try/catch and gets classified individually. + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = AuthenticationException("bad creds") + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results.forEach { (_, r) -> + r.shouldBeInstanceOf() + (r as DeliveryResult.PermanentFailure).error shouldContain "bad creds" + } + } + + test("deliverBatch maps synchronous retriable exception to RetriableFailure") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = NetworkException("broker temporarily unreachable") + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 1 + results[0].second.shouldBeInstanceOf() + } + + test("deliverBatch with future-based RetriableException classifies as RetriableFailure") { + // Drive mixed outcomes from inside flush(): first send completes OK, second errors. + // This simulates the Future-based failure path (vs synchronous send throw, covered above) + // and exercises awaitOne's ExecutionException unwrap. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + completeNext() + errorNext(NetworkException("transient")) + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + (results[1].second as DeliveryResult.RetriableFailure).error shouldContain "transient" + } +}) From a6f688cc177466cc7f987f5eaf1a09728996b2c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Mon, 4 May 2026 19:28:00 +0200 Subject: [PATCH 2/2] review: harden flush exception handling, slf4j logging, expanded tests (KOJAK-73) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses cross-cutting findings from PR #40 review (5 agents converged on the interrupt/flush error handling). Critical fix — flush() / interrupt path: - Broaden flush() exception handling: previously only InterruptException was caught, so a fatal KafkaException or IllegalStateException (e.g. closed producer) would propagate out of deliverBatch and abandon all in-flight futures uncollected, contradicting the documented per-entry classification contract. Now any exception from flush() is logged and swallowed; awaitOne classifies each entry from its own future state. - Add explicit InterruptedException handling in awaitOne and deliver. Without this, an interrupt during flush() would translate into PermanentFailure on pending futures (since java.lang.InterruptedException is NOT a Kafka RetriableException), incorrectly marking transient interrupts as terminal. Now classified as RetriableFailure with the interrupt flag re-armed. Important — observability: - Add SLF4J logger (implementation dep on slf4j-api added to okapi-kafka, consistent with okapi-core's pattern). Logs warn on flush failure (with batch size) and debug on synchronous send rejection (with outboxId). - classifyException fallback for null exception messages now uses e.javaClass.simpleName instead of generic "Permanent/Retriable Kafka error", preserving debuggability when an exception's message is null. Type design polish: - SendOutcome.Sent is now @JvmInline value class instead of data class — Future has reference-only equals which a data class would falsely advertise. Zero runtime cost, removes the misleading promise. Comments: - Condensed class-level KDoc, removed unverified "since Kafka 3.0" claim about exception hierarchy. - Reworded deliverBatch KDoc to drop numbered-step restatement and clarify that flush() failures never abandon the batch. - Trimmed redundant test inline comments per "no redundant comments" rule. New tests (cover gaps the reviews flagged): - mixed sync-throw + async outcomes in one batch with positional integrity - poison-pill malformed deliveryMetadata in fire phase - flush() throws non-Interrupt exception → per-entry classification preserved - flush() throws InterruptException → pending futures classified as Retriable Verification: - All unit tests pass (4 new + existing batch + single-entry tests) - All integration tests pass (real Postgres + Kafka via Testcontainers) - ktlint clean - Smoke benchmark Kafka batchSize=50: 0.260 ms/op vs 0.268 pre-review (within noise; logger overhead is on failure paths only, happy path unaffected). 32x improvement vs KOJAK-68 baseline preserved. --- okapi-kafka/build.gradle.kts | 1 + .../okapi/kafka/KafkaMessageDeliverer.kt | 67 +++++---- .../kafka/KafkaMessageDelivererBatchTest.kt | 128 +++++++++++++++--- 3 files changed, 148 insertions(+), 48 deletions(-) diff --git a/okapi-kafka/build.gradle.kts b/okapi-kafka/build.gradle.kts index 3318514..9c035ad 100644 --- a/okapi-kafka/build.gradle.kts +++ b/okapi-kafka/build.gradle.kts @@ -9,6 +9,7 @@ dependencies { implementation(project(":okapi-core")) implementation(libs.jacksonModuleKotlin) implementation(libs.jacksonDatatypeJsr310) + implementation(libs.slf4jApi) compileOnly(libs.kafkaClients) testImplementation(libs.kafkaClients) diff --git a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt index 82aad47..0a32eaf 100644 --- a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt +++ b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt @@ -8,22 +8,15 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.RetriableException +import org.slf4j.LoggerFactory import java.util.concurrent.ExecutionException import java.util.concurrent.Future /** * [MessageDeliverer] that publishes outbox entries to Kafka topics. * - * - Single-entry [deliver] uses a synchronous send-and-wait round trip. - * - [deliverBatch] uses **fire-flush-await**: all entries are enqueued via - * `producer.send()` (non-blocking), then a single `producer.flush()` call - * drives them out in one batched network round trip, then per-entry - * `Future.get()` collects results without further blocking. - * - * Kafka [RetriableException]s (including [InterruptException] and - * `BufferExhaustedException`/`TimeoutException` since Kafka 3.0) map to - * [DeliveryResult.RetriableFailure]; all other errors map to - * [DeliveryResult.PermanentFailure]. + * Kafka [RetriableException]s map to [DeliveryResult.RetriableFailure]; + * all other errors map to [DeliveryResult.PermanentFailure]. */ class KafkaMessageDeliverer( private val producer: Producer, @@ -35,25 +28,22 @@ class KafkaMessageDeliverer( DeliveryResult.Success } catch (e: ExecutionException) { classifyException(e.cause ?: e) + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + DeliveryResult.RetriableFailure(e.message ?: e.javaClass.simpleName) } catch (e: Exception) { classifyException(e) } /** - * Sends all entries in a fire-flush-await pattern: - * - * 1. **Fire** — call `producer.send()` for every entry, capturing either the - * in-flight `Future` or a synchronous exception (e.g. `BufferExhaustedException`, - * `SerializationException`). One failing send does not abort the batch. - * 2. **Flush** — `producer.flush()` waits for all in-flight records to be - * acknowledged or fail, in a single call regardless of `linger.ms`. - * 3. **Await** — call `Future.get()` per entry to read the outcome. After - * `flush()` these calls are non-blocking — completion is already settled. + * Uses fire-flush-await: send all entries, then a single `flush()` (which + * bypasses `linger.ms`), then collect outcomes via non-blocking `Future.get()` + * since completion is already settled. A failing `send()` does not abort the + * batch; the result list mirrors input order. * - * Per-entry classification preserved; the result list mirrors the input order. - * If the calling thread is interrupted during `flush()`, the interrupt flag is - * restored and processing continues — incomplete futures will surface as - * `RetriableFailure` via `InterruptException` from `get()`. + * If `flush()` itself fails (interrupt, fatal producer state), per-entry + * futures still surface their own exception via `get()` and are classified + * individually — the batch as a whole is never abandoned. */ override fun deliverBatch(entries: List): List> { if (entries.isEmpty()) return emptyList() @@ -66,6 +56,9 @@ class KafkaMessageDeliverer( producer.flush() } catch (e: InterruptException) { Thread.currentThread().interrupt() + logger.warn("Kafka producer.flush() interrupted; per-entry futures will surface the cause", e) + } catch (e: Exception) { + logger.warn("Kafka producer.flush() failed for batch of {}; classifying per-entry from future state", entries.size, e) } return inflight.map { (entry, outcome) -> entry to awaitOne(outcome) } @@ -74,7 +67,9 @@ class KafkaMessageDeliverer( private fun fireOne(entry: OutboxEntry): SendOutcome = try { SendOutcome.Sent(producer.send(buildRecord(entry))) } catch (e: Exception) { - SendOutcome.ImmediateFailure(classifyException(e)) + val classified = classifyException(e) + logger.debug("Kafka send rejected synchronously for entry {}: {}", entry.outboxId, e.toString()) + SendOutcome.ImmediateFailure(classified) } private fun awaitOne(outcome: SendOutcome): DeliveryResult = when (outcome) { @@ -84,6 +79,12 @@ class KafkaMessageDeliverer( DeliveryResult.Success } catch (e: ExecutionException) { classifyException(e.cause ?: e) + } catch (e: InterruptedException) { + // Thread was interrupted while waiting for an in-flight future. The interrupt may have + // come from flush() (already restored the flag) or from an outer cancellation; either + // way, the entry is unsent — retry semantics, not a poison pill. + Thread.currentThread().interrupt() + DeliveryResult.RetriableFailure(e.message ?: e.javaClass.simpleName) } catch (e: Exception) { classifyException(e) } @@ -96,14 +97,22 @@ class KafkaMessageDeliverer( } } - private fun classifyException(e: Throwable): DeliveryResult = if (e is RetriableException) { - DeliveryResult.RetriableFailure(e.message ?: "Retriable Kafka error") - } else { - DeliveryResult.PermanentFailure(e.message ?: "Permanent Kafka error") + private fun classifyException(e: Throwable): DeliveryResult { + val message = e.message ?: e.javaClass.simpleName + return if (e is RetriableException) { + DeliveryResult.RetriableFailure(message) + } else { + DeliveryResult.PermanentFailure(message) + } } private sealed interface SendOutcome { - data class Sent(val future: Future) : SendOutcome + @JvmInline + value class Sent(val future: Future) : SendOutcome data class ImmediateFailure(val result: DeliveryResult) : SendOutcome } + + companion object { + private val logger = LoggerFactory.getLogger(KafkaMessageDeliverer::class.java) + } } diff --git a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt index 7cd7cef..3c5178c 100644 --- a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt +++ b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt @@ -8,14 +8,18 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.string.shouldContain import io.kotest.matchers.types.shouldBeInstanceOf import org.apache.kafka.clients.producer.MockProducer +import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.NetworkException import org.apache.kafka.common.serialization.StringSerializer import java.time.Instant +import java.util.concurrent.Future -private fun entry(suffix: String): OutboxEntry { +private fun entry(suffix: String, metadataOverride: String? = null): OutboxEntry { val info = kafkaDeliveryInfo { topic = "topic-$suffix" } - return OutboxEntry.createPending(OutboxMessage("evt-$suffix", """{"k":"v-$suffix"}"""), info, Instant.now()) + val baseEntry = OutboxEntry.createPending(OutboxMessage("evt-$suffix", """{"k":"v-$suffix"}"""), info, Instant.now()) + return if (metadataOverride != null) baseEntry.copy(deliveryMetadata = metadataOverride) else baseEntry } class KafkaMessageDelivererBatchTest : FunSpec({ @@ -40,17 +44,14 @@ class KafkaMessageDelivererBatchTest : FunSpec({ producer.history().size shouldBe 3 } - test("deliverBatch fires all sends BEFORE flushing — flush count incremented exactly once") { - // Drives the producer in non-auto mode: futures are pending until completeNext/errorNext, - // and flush() will complete them. Verifies the fire-flush-await sequence: - // fire 3 sends -> flush completes them in one shot -> get() returns Success for all. + test("deliverBatch fires all sends before flushing — single flush call") { + // MockProducer with autoComplete=false: futures stay pending until completeNext/errorNext, + // so flush() is the only way settlement can happen — verifies the fire-flush-await sequence. var flushCount = 0 val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { override fun flush() { flushCount++ - while (completeNext()) { - // drain remaining - } + while (completeNext()) Unit } } val deliverer = KafkaMessageDeliverer(producer) @@ -62,9 +63,7 @@ class KafkaMessageDelivererBatchTest : FunSpec({ results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } } - test("deliverBatch maps synchronous send exception to PermanentFailure for ALL entries (sendException is global)") { - // MockProducer.sendException makes producer.send() throw synchronously for every call. - // Each entry hits the fire-phase try/catch and gets classified individually. + test("deliverBatch maps synchronous PermanentFailure for all entries when sendException is global") { val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) producer.sendException = AuthenticationException("bad creds") val deliverer = KafkaMessageDeliverer(producer) @@ -79,22 +78,18 @@ class KafkaMessageDelivererBatchTest : FunSpec({ } } - test("deliverBatch maps synchronous retriable exception to RetriableFailure") { + test("deliverBatch maps synchronous RetriableFailure when send throws RetriableException") { val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) producer.sendException = NetworkException("broker temporarily unreachable") val deliverer = KafkaMessageDeliverer(producer) - val entries = listOf(entry("a")) - val results = deliverer.deliverBatch(entries) + val results = deliverer.deliverBatch(listOf(entry("a"))) - results.size shouldBe 1 results[0].second.shouldBeInstanceOf() } test("deliverBatch with future-based RetriableException classifies as RetriableFailure") { - // Drive mixed outcomes from inside flush(): first send completes OK, second errors. - // This simulates the Future-based failure path (vs synchronous send throw, covered above) - // and exercises awaitOne's ExecutionException unwrap. + // Drive mixed outcomes from inside flush(): entry 0 completes OK, entry 1 errors. val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { override fun flush() { completeNext() @@ -111,4 +106,99 @@ class KafkaMessageDelivererBatchTest : FunSpec({ results[1].second.shouldBeInstanceOf() (results[1].second as DeliveryResult.RetriableFailure).error shouldContain "transient" } + + test("deliverBatch handles mixed sync-throw + async outcomes in one batch with positional integrity") { + // Throw synchronously on the 2nd send only; first goes async-success, third goes async-fail. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + private var sendCount = 0 + + override fun send(record: ProducerRecord): Future { + sendCount++ + if (sendCount == 2) throw AuthenticationException("forbidden on send #$sendCount") + return super.send(record) + } + + override fun flush() { + completeNext() // entry 0 -> Success + errorNext(NetworkException("async fail")) // entry 2 -> Retriable + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + // Positional integrity: result[i] corresponds to entries[i] regardless of outcome variant + results.map { it.first } shouldBe entries + + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + (results[1].second as DeliveryResult.PermanentFailure).error shouldContain "forbidden" + results[2].second.shouldBeInstanceOf() + (results[2].second as DeliveryResult.RetriableFailure).error shouldContain "async fail" + } + + test("deliverBatch poison-pill metadata yields PermanentFailure for bad entry, others unaffected") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + val good1 = entry("good1") + val poisoned = entry("bad", metadataOverride = "{not valid kafka info json}") + val good2 = entry("good2") + + val results = deliverer.deliverBatch(listOf(good1, poisoned, good2)) + + results.size shouldBe 3 + results.map { it.first } shouldBe listOf(good1, poisoned, good2) + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + results[2].second shouldBe DeliveryResult.Success + // Only the good entries actually reached the producer + producer.history().size shouldBe 2 + } + + test("deliverBatch survives flush throwing non-Interrupt exception by classifying per-entry futures") { + // Flush blows up; each per-entry future has been settled by completeNext/errorNext just before. + // Contract: deliverBatch never re-throws — it always returns one DeliveryResult per input entry. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + completeNext() + errorNext(NetworkException("via future")) + throw IllegalStateException("producer fatally borked") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + } + + test("deliverBatch interrupted during flush re-arms interrupt flag and classifies pending futures as Retriable") { + // flush() throws Kafka's InterruptException without settling the futures. Our awaitOne + // then encounters Future.get() on an interrupted thread; for incomplete futures this + // raises InterruptedException which we explicitly classify as RetriableFailure (so the + // outbox reschedules instead of marking PermanentFailure). + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + throw InterruptException("interrupted") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a")) + + val results: List> + try { + results = deliverer.deliverBatch(entries) + } finally { + // Drain the interrupt status so it doesn't leak to the next test (Thread.interrupted clears it). + Thread.interrupted() + } + + results.size shouldBe 1 + results[0].second.shouldBeInstanceOf() + } })