diff --git a/README.md b/README.md index 6cb6c3a..ae26b2d 100644 --- a/README.md +++ b/README.md @@ -201,17 +201,17 @@ graph BT ## Performance -Throughput baseline (single instance, sync sequential delivery, MacBook M3 Max, JDK 25 LTS, April 2026): +Throughput on a single instance (MacBook M3 Max, JDK 25 LTS, May 2026): | Transport | batchSize=10 | batchSize=100 | |-----------|--------------|----------------| -| Kafka (`acks=all`, localhost broker) | ~110 msg/s | ~115 msg/s | -| HTTP @ webhook latency 20 ms | ~33 msg/s | ~36 msg/s | -| HTTP @ webhook latency 100 ms | ~9 msg/s | ~9 msg/s | +| Kafka (`acks=all`, localhost broker, async batch via `deliverBatch`) | **~1,470 msg/s** | **~4,720 msg/s** | +| HTTP @ webhook latency 20 ms (sync sequential — KOJAK-74 in progress) | ~33 msg/s | ~36 msg/s | +| HTTP @ webhook latency 100 ms (sync sequential — KOJAK-74 in progress) | ~9 msg/s | ~9 msg/s | -These numbers reflect the current sync-sequential delivery model. Throughput is bounded by per-message round-trip time × batch size. Performance work to lift these limits (async batch delivery, multi-threaded scheduler) is tracked under the [KOJAK-14 epic](https://softwaremill.atlassian.net/browse/KOJAK-14). +Kafka throughput jumped 13-41× over the original sync-sequential baseline thanks to the [KOJAK-73](https://softwaremill.atlassian.net/browse/KOJAK-73) `deliverBatch` fire-flush-await pattern. HTTP is next ([KOJAK-74](https://softwaremill.atlassian.net/browse/KOJAK-74)) and multi-threaded scheduler scaling ([KOJAK-77](https://softwaremill.atlassian.net/browse/KOJAK-77)) is in the roadmap. The full optimization plan lives under the [KOJAK-14 epic](https://softwaremill.atlassian.net/browse/KOJAK-14). -Full methodology, raw JMH results, and reproduction instructions: [`benchmarks/`](benchmarks/). +Full methodology, raw JMH results, before/after per change: [`benchmarks/`](benchmarks/). ## Build diff --git a/benchmarks/kojak-73-kafka.json b/benchmarks/kojak-73-kafka.json new file mode 100644 index 0000000..907ced0 --- /dev/null +++ b/benchmarks/kojak-73-kafka.json @@ -0,0 +1,157 @@ +[ + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "10" + }, + "primaryMetric" : { + "score" : 0.680696006383041, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.6561445592105263, + "50.0" : 0.680696006383041, + "90.0" : 0.7052474535555555, + "95.0" : 0.7052474535555555, + "99.0" : 0.7052474535555555, + "99.9" : 0.7052474535555555, + "99.99" : 0.7052474535555555, + "99.999" : 0.7052474535555555, + "99.9999" : 0.7052474535555555, + "100.0" : 0.7052474535555555 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.7052474535555555, + 0.6561445592105263 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "50" + }, + "primaryMetric" : { + "score" : 0.26791908345521237, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.2562269965675676, + "50.0" : 0.26791908345521237, + "90.0" : 0.27961117034285715, + "95.0" : 0.27961117034285715, + "99.0" : 0.27961117034285715, + "99.9" : 0.27961117034285715, + "99.99" : 0.27961117034285715, + "99.999" : 0.27961117034285715, + "99.9999" : 0.27961117034285715, + "100.0" : 0.27961117034285715 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.27961117034285715, + 0.2562269965675676 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "100" + }, + "primaryMetric" : { + "score" : 0.21151745586904763, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.21086217661904763, + "50.0" : 0.21151745586904763, + "90.0" : 0.2121727351190476, + "95.0" : 0.2121727351190476, + "99.0" : 0.2121727351190476, + "99.9" : 0.2121727351190476, + "99.99" : 0.2121727351190476, + "99.999" : 0.2121727351190476, + "99.9999" : 0.2121727351190476, + "100.0" : 0.2121727351190476 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.2121727351190476, + 0.21086217661904763 + ] + ] + }, + "secondaryMetrics" : { + } + } +] + + diff --git a/benchmarks/results-postopt-kojak-73.md b/benchmarks/results-postopt-kojak-73.md new file mode 100644 index 0000000..c6c23c6 --- /dev/null +++ b/benchmarks/results-postopt-kojak-73.md @@ -0,0 +1,67 @@ +# KOJAK-73: Kafka deliverBatch fire-flush-await — Results + +Measured 2026-05-04 on the same hardware as the KOJAK-68 baseline (MacBook M3 Max, +JDK 25 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers, smoke-run JMH config: +`fork=1, warmup=1, iter=2, warmup=10s, measurement=15s`). + +## Headline numbers — Kafka throughput + +| batchSize | Baseline (ms/op) | KOJAK-73 (ms/op) | **Improvement** | +|-----------|------------------|------------------|-----------------| +| 10 | 9.168 | 0.681 | **13.5×** | +| 50 | 8.665 | 0.268 | **32.3×** | +| 100 | 8.701 | 0.212 | **41.0×** | + +Translated to msg/s: + +| batchSize | Baseline | KOJAK-73 | Improvement | +|-----------|----------|--------------|-------------| +| 10 | ~109 | **~1,468** | 13.5× | +| 50 | ~115 | **~3,731** | 32.3× | +| 100 | ~115 | **~4,717** | 41.0× | + +Raw JSON: [`kojak-73-kafka.json`](kojak-73-kafka.json). + +## What changed + +`KafkaMessageDeliverer.deliverBatch` now uses fire-flush-await: +1. **Fire** — call `producer.send()` for every entry (non-blocking; records go to producer's internal buffer) +2. **Flush** — single `producer.flush()` call drives all queued records to the broker in one batched network round-trip (bypasses `linger.ms`) +3. **Await** — `Future.get()` per entry returns immediately because completion is settled by `flush()` + +Previously, each entry incurred a full `producer.send().get()` round-trip sequentially. With ~9 ms localhost Kafka RTT (`acks=all`), 1000 entries × 9 ms = ~9 s regardless of `batchSize`. + +## Reading the table + +- **`batchSize` is now load-bearing.** Pre-KOJAK-73 throughput was flat across `batchSize` + values (109 → 115 → 115 msg/s) — confirming the bottleneck was per-record blocking I/O. + Post-KOJAK-73 throughput scales with `batchSize` (1,468 → 3,731 → 4,717), proving that + Kafka's internal record batching is now being exploited. +- **Sublinear scaling 50 → 100** (32× → 41× vs expected ~2× more). Indicates that DB UPDATE + overhead per entry is now significant relative to the (now-fast) Kafka path. This is exactly + what motivates KOJAK-75 (batch UPDATE via `executeBatch`) — at small batch sizes the + per-message DB cost was hidden by 9 ms Kafka RTT; with Kafka latency removed, the N + individual UPDATE statements become the next bottleneck to attack. +- **batchSize=10 lowest gain (13.5×)** — at that batch size only 10 records can amortize + one RTT, so the per-batch overhead (claimPending, transaction begin/commit, 10 UPDATEs) is + proportionally larger. + +## Verification context + +- Unit tests: `KafkaMessageDelivererBatchTest` covers empty input, all-success ordering, + single flush call (verified via flush counter), synchronous send exception (Permanent + + Retriable variants), and future-based async exception (driven via `MockProducer` override + that completes/errors per-position inside flush). +- Integration tests in `okapi-integration-tests` continue to pass with real Postgres + Kafka. +- ktlint clean, configuration cache reuses across modules. + +## What's next + +1. **KOJAK-74** — analogous fire-all-await for HTTP via parallel `httpClient.sendAsync`. + Expected impact at realistic webhook latency (`httpLatencyMs ∈ {20, 100}`): + from ~33 / ~9 msg/s baseline to **~500-2,000 msg/s** range, depending on host/connection + pool reuse. +2. **KOJAK-75** — batch UPDATE via `executeBatch`. Now load-bearing: at `batchSize=100` + the N individual UPDATE statements have become the dominant per-batch cost. Expected + to shift `batchSize=100` Kafka throughput from ~4,700 toward the ~10,000 msg/s range. +3. **KOJAK-77** — `concurrency` fan-out. Multiplies all of the above by N workers. diff --git a/okapi-kafka/build.gradle.kts b/okapi-kafka/build.gradle.kts index 3318514..9c035ad 100644 --- a/okapi-kafka/build.gradle.kts +++ b/okapi-kafka/build.gradle.kts @@ -9,6 +9,7 @@ dependencies { implementation(project(":okapi-core")) implementation(libs.jacksonModuleKotlin) implementation(libs.jacksonDatatypeJsr310) + implementation(libs.slf4jApi) compileOnly(libs.kafkaClients) testImplementation(libs.kafkaClients) diff --git a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt index 31b99b2..0a32eaf 100644 --- a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt +++ b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt @@ -5,13 +5,16 @@ import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.RetriableException +import org.slf4j.LoggerFactory import java.util.concurrent.ExecutionException +import java.util.concurrent.Future /** * [MessageDeliverer] that publishes outbox entries to Kafka topics. * - * Uses the provided [Producer] to send records synchronously. * Kafka [RetriableException]s map to [DeliveryResult.RetriableFailure]; * all other errors map to [DeliveryResult.PermanentFailure]. */ @@ -20,26 +23,96 @@ class KafkaMessageDeliverer( ) : MessageDeliverer { override val type: String = KafkaDeliveryInfo.TYPE - override fun deliver(entry: OutboxEntry): DeliveryResult { - val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata) - val record = - ProducerRecord(info.topic, info.partitionKey, entry.payload).apply { - info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) } - } + override fun deliver(entry: OutboxEntry): DeliveryResult = try { + producer.send(buildRecord(entry)).get() + DeliveryResult.Success + } catch (e: ExecutionException) { + classifyException(e.cause ?: e) + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + DeliveryResult.RetriableFailure(e.message ?: e.javaClass.simpleName) + } catch (e: Exception) { + classifyException(e) + } + + /** + * Uses fire-flush-await: send all entries, then a single `flush()` (which + * bypasses `linger.ms`), then collect outcomes via non-blocking `Future.get()` + * since completion is already settled. A failing `send()` does not abort the + * batch; the result list mirrors input order. + * + * If `flush()` itself fails (interrupt, fatal producer state), per-entry + * futures still surface their own exception via `get()` and are classified + * individually — the batch as a whole is never abandoned. + */ + override fun deliverBatch(entries: List): List> { + if (entries.isEmpty()) return emptyList() - return try { - producer.send(record).get() + val inflight: List> = entries.map { entry -> + entry to fireOne(entry) + } + + try { + producer.flush() + } catch (e: InterruptException) { + Thread.currentThread().interrupt() + logger.warn("Kafka producer.flush() interrupted; per-entry futures will surface the cause", e) + } catch (e: Exception) { + logger.warn("Kafka producer.flush() failed for batch of {}; classifying per-entry from future state", entries.size, e) + } + + return inflight.map { (entry, outcome) -> entry to awaitOne(outcome) } + } + + private fun fireOne(entry: OutboxEntry): SendOutcome = try { + SendOutcome.Sent(producer.send(buildRecord(entry))) + } catch (e: Exception) { + val classified = classifyException(e) + logger.debug("Kafka send rejected synchronously for entry {}: {}", entry.outboxId, e.toString()) + SendOutcome.ImmediateFailure(classified) + } + + private fun awaitOne(outcome: SendOutcome): DeliveryResult = when (outcome) { + is SendOutcome.ImmediateFailure -> outcome.result + is SendOutcome.Sent -> try { + outcome.future.get() DeliveryResult.Success } catch (e: ExecutionException) { classifyException(e.cause ?: e) + } catch (e: InterruptedException) { + // Thread was interrupted while waiting for an in-flight future. The interrupt may have + // come from flush() (already restored the flag) or from an outer cancellation; either + // way, the entry is unsent — retry semantics, not a poison pill. + Thread.currentThread().interrupt() + DeliveryResult.RetriableFailure(e.message ?: e.javaClass.simpleName) } catch (e: Exception) { classifyException(e) } } - private fun classifyException(e: Throwable): DeliveryResult = if (e is RetriableException) { - DeliveryResult.RetriableFailure(e.message ?: "Retriable Kafka error") - } else { - DeliveryResult.PermanentFailure(e.message ?: "Permanent Kafka error") + private fun buildRecord(entry: OutboxEntry): ProducerRecord { + val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata) + return ProducerRecord(info.topic, info.partitionKey, entry.payload).apply { + info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) } + } + } + + private fun classifyException(e: Throwable): DeliveryResult { + val message = e.message ?: e.javaClass.simpleName + return if (e is RetriableException) { + DeliveryResult.RetriableFailure(message) + } else { + DeliveryResult.PermanentFailure(message) + } + } + + private sealed interface SendOutcome { + @JvmInline + value class Sent(val future: Future) : SendOutcome + data class ImmediateFailure(val result: DeliveryResult) : SendOutcome + } + + companion object { + private val logger = LoggerFactory.getLogger(KafkaMessageDeliverer::class.java) } } diff --git a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt new file mode 100644 index 0000000..3c5178c --- /dev/null +++ b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt @@ -0,0 +1,204 @@ +package com.softwaremill.okapi.kafka + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxMessage +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import org.apache.kafka.clients.producer.MockProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.errors.InterruptException +import org.apache.kafka.common.errors.NetworkException +import org.apache.kafka.common.serialization.StringSerializer +import java.time.Instant +import java.util.concurrent.Future + +private fun entry(suffix: String, metadataOverride: String? = null): OutboxEntry { + val info = kafkaDeliveryInfo { topic = "topic-$suffix" } + val baseEntry = OutboxEntry.createPending(OutboxMessage("evt-$suffix", """{"k":"v-$suffix"}"""), info, Instant.now()) + return if (metadataOverride != null) baseEntry.copy(deliveryMetadata = metadataOverride) else baseEntry +} + +class KafkaMessageDelivererBatchTest : FunSpec({ + test("deliverBatch on empty input returns empty list and does not invoke producer") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + + deliverer.deliverBatch(emptyList()) shouldBe emptyList() + producer.history().size shouldBe 0 + } + + test("deliverBatch with all-success preserves input order and reports all entries delivered") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + results.map { it.first } shouldBe entries + results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } + producer.history().size shouldBe 3 + } + + test("deliverBatch fires all sends before flushing — single flush call") { + // MockProducer with autoComplete=false: futures stay pending until completeNext/errorNext, + // so flush() is the only way settlement can happen — verifies the fire-flush-await sequence. + var flushCount = 0 + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + flushCount++ + while (completeNext()) Unit + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + flushCount shouldBe 1 + results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } + } + + test("deliverBatch maps synchronous PermanentFailure for all entries when sendException is global") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = AuthenticationException("bad creds") + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results.forEach { (_, r) -> + r.shouldBeInstanceOf() + (r as DeliveryResult.PermanentFailure).error shouldContain "bad creds" + } + } + + test("deliverBatch maps synchronous RetriableFailure when send throws RetriableException") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = NetworkException("broker temporarily unreachable") + val deliverer = KafkaMessageDeliverer(producer) + + val results = deliverer.deliverBatch(listOf(entry("a"))) + + results[0].second.shouldBeInstanceOf() + } + + test("deliverBatch with future-based RetriableException classifies as RetriableFailure") { + // Drive mixed outcomes from inside flush(): entry 0 completes OK, entry 1 errors. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + completeNext() + errorNext(NetworkException("transient")) + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + (results[1].second as DeliveryResult.RetriableFailure).error shouldContain "transient" + } + + test("deliverBatch handles mixed sync-throw + async outcomes in one batch with positional integrity") { + // Throw synchronously on the 2nd send only; first goes async-success, third goes async-fail. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + private var sendCount = 0 + + override fun send(record: ProducerRecord): Future { + sendCount++ + if (sendCount == 2) throw AuthenticationException("forbidden on send #$sendCount") + return super.send(record) + } + + override fun flush() { + completeNext() // entry 0 -> Success + errorNext(NetworkException("async fail")) // entry 2 -> Retriable + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + // Positional integrity: result[i] corresponds to entries[i] regardless of outcome variant + results.map { it.first } shouldBe entries + + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + (results[1].second as DeliveryResult.PermanentFailure).error shouldContain "forbidden" + results[2].second.shouldBeInstanceOf() + (results[2].second as DeliveryResult.RetriableFailure).error shouldContain "async fail" + } + + test("deliverBatch poison-pill metadata yields PermanentFailure for bad entry, others unaffected") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + val good1 = entry("good1") + val poisoned = entry("bad", metadataOverride = "{not valid kafka info json}") + val good2 = entry("good2") + + val results = deliverer.deliverBatch(listOf(good1, poisoned, good2)) + + results.size shouldBe 3 + results.map { it.first } shouldBe listOf(good1, poisoned, good2) + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + results[2].second shouldBe DeliveryResult.Success + // Only the good entries actually reached the producer + producer.history().size shouldBe 2 + } + + test("deliverBatch survives flush throwing non-Interrupt exception by classifying per-entry futures") { + // Flush blows up; each per-entry future has been settled by completeNext/errorNext just before. + // Contract: deliverBatch never re-throws — it always returns one DeliveryResult per input entry. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + completeNext() + errorNext(NetworkException("via future")) + throw IllegalStateException("producer fatally borked") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + } + + test("deliverBatch interrupted during flush re-arms interrupt flag and classifies pending futures as Retriable") { + // flush() throws Kafka's InterruptException without settling the futures. Our awaitOne + // then encounters Future.get() on an interrupted thread; for incomplete futures this + // raises InterruptedException which we explicitly classify as RetriableFailure (so the + // outbox reschedules instead of marking PermanentFailure). + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + throw InterruptException("interrupted") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a")) + + val results: List> + try { + results = deliverer.deliverBatch(entries) + } finally { + // Drain the interrupt status so it doesn't leak to the next test (Thread.interrupted clears it). + Thread.interrupted() + } + + results.size shouldBe 1 + results[0].second.shouldBeInstanceOf() + } +})