Add store-backed FlowStore to datasourcex-util#36
Merged
Conversation
A map whose entries are backed by an external store with a bounded in-memory hot set, read-through on miss, and changes exposed as a coroutine Flow of versioned deltas.
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
rossdanderson
commented
Jun 2, 2026
Source versions from the writer (DB), not an app-side VersionSource: the version is now commit-ordered and durable, fixing same-key write ordering and removing the in-memory counter that reset on restart. CacheWriter.write/delete return the assigned version; VersionSource and its implementations are deleted. - All cache writes are version-gated via Caffeine's per-key atomic compute (lock-free), so a read-through, owner commit, or out-of-order delta cannot clobber a newer entry. Owner commits emit the delta before refreshing the cache to close a cancellation gap. - Add tombstones (CacheEntry: Live/Tombstone) so a removal cannot be resurrected by a stale older read-through. - Bound the signal SharedFlow buffer (default 256, tunable) instead of Int.MAX_VALUE. - Reject null/non-integral version in both Jackson deserializers; document the String/JSON-native key/value round-trip limitation. - AutoCommitTxContext is one-shot (reuse throws; rollback-after-commit no-ops). - deleteAll versions per key; document loadAll/loadAllKeys defaults; delegate the CacheLoaderWriter factory overload. - De-flake FlowStoreTest (subscription synchronisation instead of delays); add tombstone, version-parsing, and tx-guard coverage.
- Centralize the VersionedMapEvent -> CacheEntry mapping in one toEntry() helper instead of restating it across the owner and consumer paths. - Add AbstractFlowStore.cacheReflectIfNewer and collapse the consumer's hand-rolled version-gated computeIfPresent onto it. - Drop the redundant loader param from MutableFlowStoreImpl (always the writer). - Rewrite KDoc/sample docs to describe behaviour without rejected-alternative framing, and drop stale references to the removed delta channel.
asFlow() is delta-only (replay = 0), so a subscriber never sees current state. The new overload runs a bulk query for the current matching entries, emits them, then follows the live stream — version-gated per key so the snapshot and the tail never conflict. The predicate scopes the live tail to the same logical set: a matching upsert enters/updates the view, one that stops matching leaves it as a Removed, and removals forward only for keys in the view (continuous query). The query runs blocking on the store's dispatcher and is not written to the cache. Defaults to always-true, so asFlow(query) follows the whole store. Additive public API.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
FlowStore — A store-backed map that gives you a live, version-stamped delta stream plus read-through access to current values, with a bounded Caffeine cache as the hot set. Owners write through a transaction (MutableFlowStore) and consumers converge by following the stream and reading through on a miss.
SharedFlowCache — A keyed cache of SharedFlows that shares one upstream collection per key across its subscribers. Each key is evicted once its upstream ends (completes, errors, or loses all subscribers), so the cache can't grow without bound.