diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt index 460f7a7..39eb076 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt @@ -17,4 +17,30 @@ class CompositeMessageDeliverer(deliverers: List) : MessageDel ?: return DeliveryResult.PermanentFailure("No deliverer registered for type '${entry.deliveryType}'") return messageDeliverer.deliver(entry) } + + /** + * Groups entries by [OutboxEntry.deliveryType] and delegates each sub-batch + * to the matching deliverer's [MessageDeliverer.deliverBatch]. Results are + * re-assembled in original input order. + * + * Entries whose type has no registered deliverer are mapped to + * [DeliveryResult.PermanentFailure] (consistent with [deliver]). + */ + override fun deliverBatch(entries: List): List> { + if (entries.isEmpty()) return emptyList() + + val resultByEntry: Map = entries + .groupBy { it.deliveryType } + .flatMap { (type, group) -> + val deliverer = registry[type] + if (deliverer != null) { + deliverer.deliverBatch(group) + } else { + group.map { it to DeliveryResult.PermanentFailure("No deliverer registered for type '$type'") } + } + } + .toMap() + + return entries.map { entry -> entry to (resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) } + } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt index 7aa20ef..38f1bfe 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt @@ -10,4 +10,18 @@ interface MessageDeliverer { val type: String fun deliver(entry: OutboxEntry): DeliveryResult + + /** + * Delivers a batch of entries, returning per-entry results in the same order + * as the input list. + * + * The default implementation delegates to [deliver] sequentially and is + * appropriate for any transport. Implementations whose underlying I/O can + * be overlapped (e.g. Kafka's internal record batching, parallel HTTP + * `sendAsync`) should override this method to exploit that. + * + * Per-entry result classification (Success / RetriableFailure / PermanentFailure) + * is preserved — callers receive one [DeliveryResult] per input entry. + */ + fun deliverBatch(entries: List): List> = entries.map { entry -> entry to deliver(entry) } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxEntryProcessor.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxEntryProcessor.kt index 539dc92..896abd9 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxEntryProcessor.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxEntryProcessor.kt @@ -11,17 +11,27 @@ class OutboxEntryProcessor( private val retryPolicy: RetryPolicy, private val clock: Clock, ) { - fun process(entry: OutboxEntry): OutboxEntry { + fun process(entry: OutboxEntry): OutboxEntry = applyResult(entry, deliverer.deliver(entry), clock.instant()) + + /** + * Processes a batch of entries via [MessageDeliverer.deliverBatch], applying + * the retry policy per result. Returns processed entries in the same order + * as the input list. + */ + fun processBatch(entries: List): List { + if (entries.isEmpty()) return emptyList() val now = clock.instant() - return when (val result = deliverer.deliver(entry)) { - is DeliveryResult.Success -> entry.toDelivered(now) - is DeliveryResult.RetriableFailure -> - if (retryPolicy.shouldRetry(entry.retries)) { - entry.retry(now, result.error) - } else { - entry.toFailed(now, result.error) - } - is DeliveryResult.PermanentFailure -> entry.toFailed(now, result.error) - } + return deliverer.deliverBatch(entries).map { (entry, result) -> applyResult(entry, result, now) } + } + + private fun applyResult(entry: OutboxEntry, result: DeliveryResult, now: java.time.Instant): OutboxEntry = when (result) { + is DeliveryResult.Success -> entry.toDelivered(now) + is DeliveryResult.RetriableFailure -> + if (retryPolicy.shouldRetry(entry.retries)) { + entry.retry(now, result.error) + } else { + entry.toFailed(now, result.error) + } + is DeliveryResult.PermanentFailure -> entry.toFailed(now, result.error) } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt index 5dbfe71..84cc82d 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxProcessor.kt @@ -6,11 +6,17 @@ import java.time.Duration /** * Orchestrates a single processing cycle: claims pending entries from [OutboxStore], - * delegates each to [OutboxEntryProcessor], and persists the result. + * delegates batch delivery to [OutboxEntryProcessor], and persists each result. * * An optional [OutboxProcessorListener] is notified after each entry and after the * full batch. Exceptions in the listener are caught and logged — they never break * processing. Transaction management is the caller's responsibility. + * + * In the batch processing path, the per-entry `duration` reported in + * [OutboxProcessingEvent] reflects the **wall-clock duration of the whole batch** + * (because transports may overlap their per-entry I/O internally — e.g. Kafka's + * fire-flush-await — making per-entry timing meaningless). Use + * [OutboxProcessorListener.onBatchProcessed] when you need batch-level timing. */ class OutboxProcessor( private val store: OutboxStore, @@ -18,19 +24,28 @@ class OutboxProcessor( private val listener: OutboxProcessorListener? = null, private val clock: Clock = Clock.systemUTC(), ) { + /** + * Claims up to [limit] pending entries, processes them as a batch, and persists + * each result. Returns the number of entries processed (0 if the store had nothing). + */ @JvmOverloads - fun processNext(limit: Int = 10) { + fun processNext(limit: Int = 10): Int { val batchStart = clock.instant() - var count = 0 - store.claimPending(limit).forEach { entry -> - val entryStart = clock.instant() - val updated = entryProcessor.process(entry) - val deliveryDuration = Duration.between(entryStart, clock.instant()) + val claimed = store.claimPending(limit) + if (claimed.isEmpty()) { + notifyBatch(0, Duration.between(batchStart, clock.instant())) + return 0 + } + + val processed = entryProcessor.processBatch(claimed) + val batchDuration = Duration.between(batchStart, clock.instant()) + + processed.forEach { updated -> store.updateAfterProcessing(updated) - count++ - notifyEntry(updated, deliveryDuration) + notifyEntry(updated, batchDuration) } - notifyBatch(count, Duration.between(batchStart, clock.instant())) + notifyBatch(processed.size, Duration.between(batchStart, clock.instant())) + return processed.size } private fun notifyEntry(updated: OutboxEntry, duration: Duration) { diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt new file mode 100644 index 0000000..4716327 --- /dev/null +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt @@ -0,0 +1,101 @@ +package com.softwaremill.okapi.core + +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import java.time.Instant + +private fun deliveryInfo(t: String) = object : DeliveryInfo { + override val type = t + override fun serialize(): String = """{"type":"$t"}""" +} + +private fun entryOfType(t: String, id: Int): OutboxEntry = + OutboxEntry.createPending(OutboxMessage("evt-$id", "{}"), deliveryInfo(t), Instant.EPOCH) + +private fun fixedDeliverer(t: String, result: DeliveryResult) = object : MessageDeliverer { + override val type = t + override fun deliver(entry: OutboxEntry): DeliveryResult = result +} + +class CompositeMessageDelivererTest : FunSpec({ + test("deliverBatch groups entries by type, delegates to each transport, preserves input order") { + val composite = CompositeMessageDeliverer( + listOf( + fixedDeliverer("kafka", DeliveryResult.Success), + fixedDeliverer("http", DeliveryResult.RetriableFailure("503")), + ), + ) + val entries = listOf( + entryOfType("kafka", 1), + entryOfType("http", 2), + entryOfType("kafka", 3), + entryOfType("http", 4), + ) + + val results = composite.deliverBatch(entries) + + results.size shouldBe 4 + results.map { it.first } shouldBe entries + results[0].second shouldBe DeliveryResult.Success + results[1].second shouldBe DeliveryResult.RetriableFailure("503") + results[2].second shouldBe DeliveryResult.Success + results[3].second shouldBe DeliveryResult.RetriableFailure("503") + } + + test("deliverBatch fails permanently for entries with no registered deliverer") { + val composite = CompositeMessageDeliverer( + listOf(fixedDeliverer("kafka", DeliveryResult.Success)), + ) + val entries = listOf( + entryOfType("kafka", 1), + entryOfType("missing", 2), + ) + + val results = composite.deliverBatch(entries) + + results.size shouldBe 2 + results[0].second shouldBe DeliveryResult.Success + results[1].second.shouldBeInstanceOf() + (results[1].second as DeliveryResult.PermanentFailure).error shouldContain "missing" + } + + test("deliverBatch with empty input returns empty list") { + val composite = CompositeMessageDeliverer(emptyList()) + composite.deliverBatch(emptyList()) shouldBe emptyList() + } + + test("deliverBatch uses each transport's overridden deliverBatch (not just deliver)") { + var batchCallsKafka = 0 + var batchCallsHttp = 0 + val kafkaDeliverer = object : MessageDeliverer { + override val type = "kafka" + override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success + override fun deliverBatch(entries: List): List> { + batchCallsKafka++ + return entries.map { it to DeliveryResult.Success } + } + } + val httpDeliverer = object : MessageDeliverer { + override val type = "http" + override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success + override fun deliverBatch(entries: List): List> { + batchCallsHttp++ + return entries.map { it to DeliveryResult.Success } + } + } + val composite = CompositeMessageDeliverer(listOf(kafkaDeliverer, httpDeliverer)) + + composite.deliverBatch( + listOf( + entryOfType("kafka", 1), + entryOfType("http", 2), + entryOfType("kafka", 3), + ), + ) + + batchCallsKafka shouldBe 1 + batchCallsHttp shouldBe 1 + } +}) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt new file mode 100644 index 0000000..201900c --- /dev/null +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt @@ -0,0 +1,53 @@ +package com.softwaremill.okapi.core + +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import java.time.Instant + +private val stubDeliveryInfo = object : DeliveryInfo { + override val type = "stub" + override fun serialize(): String = """{"type":"stub"}""" +} + +private fun stubEntry(id: Int) = OutboxEntry.createPending(OutboxMessage("evt-$id", "{}"), stubDeliveryInfo, Instant.EPOCH) + +private fun delivererReturning(vararg results: DeliveryResult) = object : MessageDeliverer { + override val type = "stub" + private var idx = 0 + override fun deliver(entry: OutboxEntry): DeliveryResult = results[idx++] +} + +class MessageDelivererTest : FunSpec({ + test("default deliverBatch delegates to deliver and preserves input order") { + val deliverer = delivererReturning( + DeliveryResult.Success, + DeliveryResult.RetriableFailure("err1"), + DeliveryResult.PermanentFailure("err2"), + ) + val entries = listOf(stubEntry(1), stubEntry(2), stubEntry(3)) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + results[0].first shouldBe entries[0] + results[0].second shouldBe DeliveryResult.Success + results[1].first shouldBe entries[1] + results[1].second shouldBe DeliveryResult.RetriableFailure("err1") + results[2].first shouldBe entries[2] + results[2].second shouldBe DeliveryResult.PermanentFailure("err2") + } + + test("default deliverBatch on empty input returns empty list without calling deliver") { + var deliverCalls = 0 + val deliverer = object : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry): DeliveryResult { + deliverCalls++ + return DeliveryResult.Success + } + } + + deliverer.deliverBatch(emptyList()) shouldBe emptyList() + deliverCalls shouldBe 0 + } +}) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxEntryProcessorTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxEntryProcessorTest.kt index e968973..aa370a2 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxEntryProcessorTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxEntryProcessorTest.kt @@ -90,6 +90,50 @@ class OutboxEntryProcessorTest : } } + given("processBatch() — mixed results") { + `when`("called with three entries returning Success, RetriableFailure, PermanentFailure") { + val deliverer = object : MessageDeliverer { + override val type = "stub" + private val results = listOf( + DeliveryResult.Success, + DeliveryResult.RetriableFailure("retry me"), + DeliveryResult.PermanentFailure("never"), + ) + private var idx = 0 + override fun deliver(entry: OutboxEntry): DeliveryResult = results[idx++] + } + val processor = OutboxEntryProcessor(deliverer, retryPolicy, fixedClock) + val entries = listOf(pendingEntry(), pendingEntry(), pendingEntry()) + val results = processor.processBatch(entries) + + then("preserves input order") { + results.size shouldBe 3 + } + then("first entry is DELIVERED") { + results[0].status shouldBe OutboxStatus.DELIVERED + } + then("second entry is PENDING (retriable, retries remaining)") { + results[1].status shouldBe OutboxStatus.PENDING + results[1].lastError shouldBe "retry me" + } + then("third entry is FAILED (permanent)") { + results[2].status shouldBe OutboxStatus.FAILED + results[2].lastError shouldBe "never" + } + } + } + + given("processBatch() — empty input") { + `when`("called with empty list") { + val processor = OutboxEntryProcessor(stubDeliverer(DeliveryResult.Success), retryPolicy, fixedClock) + val results = processor.processBatch(emptyList()) + + then("returns empty list without invoking deliverer") { + results shouldBe emptyList() + } + } + } + given("process() — PermanentFailure") { `when`("called with retries=0") { val processor = diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt index 167b6a5..b0ce32b 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt @@ -59,7 +59,7 @@ class OutboxProcessorTest : retryPolicy = RetryPolicy(maxRetries = 3), clock = fixedClock, ) - OutboxProcessor(store, entryProcessor).processNext(limit = 10) + val returnedCount = OutboxProcessor(store, entryProcessor).processNext(limit = 10) val results = processedEntries.toList() then("both entries are updated") { @@ -68,6 +68,25 @@ class OutboxProcessorTest : then("both are DELIVERED") { results.all { it.status == OutboxStatus.DELIVERED } shouldBe true } + then("processNext returns the count of processed entries") { + returnedCount shouldBe 2 + } + } + } + + given("processNext() with empty store — return value") { + `when`("called") { + pendingEntries = emptyList() + val entryProcessor = OutboxEntryProcessor( + deliverer = stubDeliverer(DeliveryResult.Success), + retryPolicy = RetryPolicy(maxRetries = 3), + clock = fixedClock, + ) + val returnedCount = OutboxProcessor(store, entryProcessor).processNext() + + then("returns 0") { + returnedCount shouldBe 0 + } } }