Skip to content

Add store-backed FlowStore to datasourcex-util#36

Merged
rossdanderson merged 13 commits into
mainfrom
flow-store
Jun 4, 2026
Merged

Add store-backed FlowStore to datasourcex-util#36
rossdanderson merged 13 commits into
mainfrom
flow-store

Conversation

@rossdanderson
Copy link
Copy Markdown
Collaborator

@rossdanderson rossdanderson commented Jun 2, 2026

FlowStore — A store-backed map that gives you a live, version-stamped delta stream plus read-through access to current values, with a bounded Caffeine cache as the hot set. Owners write through a transaction (MutableFlowStore) and consumers converge by following the stream and reading through on a miss.

SharedFlowCache — A keyed cache of SharedFlows that shares one upstream collection per key across its subscribers. Each key is evicted once its upstream ends (completes, errors, or loses all subscribers), so the cache can't grow without bound.

A map whose entries are backed by an external store with a bounded in-memory hot set, read-through on miss, and changes exposed as a coroutine Flow of versioned deltas.
Comment thread util/build.gradle.kts
Comment thread util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/CacheLoader.kt Outdated
Comment thread util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/CacheWriter.kt Outdated
Comment thread util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/FlowStore.kt Outdated
Comment thread util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/TxContext.kt Outdated
Comment thread util/src/main/kotlin/com/caplin/integration/datasourcex/util/store/CacheLoader.kt Outdated
rossdanderson and others added 12 commits June 2, 2026 17:10
Source versions from the writer (DB), not an app-side VersionSource: the
version is now commit-ordered and durable, fixing same-key write ordering and
removing the in-memory counter that reset on restart. CacheWriter.write/delete
return the assigned version; VersionSource and its implementations are deleted.

- All cache writes are version-gated via Caffeine's per-key atomic compute
  (lock-free), so a read-through, owner commit, or out-of-order delta cannot
  clobber a newer entry. Owner commits emit the delta before refreshing the
  cache to close a cancellation gap.
- Add tombstones (CacheEntry: Live/Tombstone) so a removal cannot be resurrected
  by a stale older read-through.
- Bound the signal SharedFlow buffer (default 256, tunable) instead of
  Int.MAX_VALUE.
- Reject null/non-integral version in both Jackson deserializers; document the
  String/JSON-native key/value round-trip limitation.
- AutoCommitTxContext is one-shot (reuse throws; rollback-after-commit no-ops).
- deleteAll versions per key; document loadAll/loadAllKeys defaults; delegate the
  CacheLoaderWriter factory overload.
- De-flake FlowStoreTest (subscription synchronisation instead of delays); add
  tombstone, version-parsing, and tx-guard coverage.
- Centralize the VersionedMapEvent -> CacheEntry mapping in one toEntry() helper
  instead of restating it across the owner and consumer paths.
- Add AbstractFlowStore.cacheReflectIfNewer and collapse the consumer's
  hand-rolled version-gated computeIfPresent onto it.
- Drop the redundant loader param from MutableFlowStoreImpl (always the writer).
- Rewrite KDoc/sample docs to describe behaviour without rejected-alternative
  framing, and drop stale references to the removed delta channel.
asFlow() is delta-only (replay = 0), so a subscriber never sees current
state. The new overload runs a bulk query for the current matching
entries, emits them, then follows the live stream — version-gated per key
so the snapshot and the tail never conflict.

The predicate scopes the live tail to the same logical set: a matching
upsert enters/updates the view, one that stops matching leaves it as a
Removed, and removals forward only for keys in the view (continuous query).
The query runs blocking on the store's dispatcher and is not written to
the cache. Defaults to always-true, so asFlow(query) follows the whole
store. Additive public API.
@rossdanderson rossdanderson merged commit 344b3f7 into main Jun 4, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant