AA Processor refactor and make more configurable#223
Open
jacomago wants to merge 13 commits into
Open
Conversation
3feb24a to
f6be0e5
Compare
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Eliminate three duplicate inline stream-filter-findFirst patterns for channel property lookups. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace a 15-line Optional.map(→Stream).orElse(stream) chain with a named helper method, making the intent immediately clear at the call site. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
process() was doing three things: guard clauses, mapping channels to per-archiver PV options, and submitting actions. Extract the latter two into named methods so process() reads as a three-step narrative. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace the imperative for-loop (with continue guard) with a stream pipeline. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tion - INFO: "Processing N channels." / "Configured N channels." frame each run - WARNING (new): "No reachable archivers configured" when archiversInfo is empty - WARNING: per-channel failure now names the channel and archiver alias - WARNING: archiver response anomalies now identify which archiver and what was wrong - FINE (was INFO): per-archiver status query — too chatty for production INFO - FINER: raw status response now includes archiver alias for correlation - All String.format calls wrapped in lambdas for lazy evaluation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…urious ARCHIVE Previously, a failed HTTP status batch silently returned an empty list. Those PVs were then treated as "not archived" and submitted for archiving on every subsequent run, spamming the archiver with duplicate requests. ArchiverService now throws ArchiverServiceException on status fetch failure. AAChannelProcessor catches it in getArchiveActions() and returns null, causing submitToArchivers() to skip that archiver entirely for the run. Also renames getStatusesFromPvListQuery/Body → getStatusesViaGet/Post to reflect their transport semantics Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… transport methods ArchiverService was holding archiver-alias config (postSupportArchivers) and making the POST-vs-GET routing decision — a concern that belongs in AAChannelProcessor alongside all other per-archiver configuration. ArchiverService now exposes two public transport methods: getStatusesViaGet(url, pvs) — batches internally (URL length limit) getStatusesViaPost(url, pvs) — sends in a single POST body AAChannelProcessor holds @value("${aa.post_support:}") and selects the correct transport in getArchiveActions(). The field is volatile so the runtime setter added later is thread-safe. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ot PV map Logging the full PV map at INFO generates enormous output in production. Replace with a count summary at FINE. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Policies are fetched once at startup (@PostConstruct) and refreshed on a configurable fixed delay (aa.policy_refresh_interval_seconds, default 1 h) rather than on every process() call. process() now reads a volatile snapshot, so policy fetches can no longer block or fail mid-run. processorInfo() exposes LastPolicyRefresh and per-archiver policy counts for observability. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…fail fast Without a connect timeout, TCP SYN drops (firewall, no route) stall for the OS default (~2 min) before returning. Setting connect timeout equal to the configured read timeout ensures both slow and unreachable archivers fail at the same bounded deadline. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
a4945df to
8b5e689
Compare
Channel processors (e.g. archiver sync on IOC reconnect) are dispatched asynchronously on every channel write. Previously the pool was anonymous, sized by a single hard-coded property, and used AbortPolicy — meaning a saturated queue would propagate an exception to the HTTP caller and cause receivers to retry, increasing load further. ChannelFinderProcessorExecutor replaces it with a named @component whose size is driven by processors.max_concurrent_updates (default 10). Core and max are equal to avoid ramp-up lag during bursts of channel updates. The queue is deliberately shallow (N/4) with a discard-oldest rejection policy: when the pool is saturated, the stalest pending batch is evicted in favour of the fresher update, since a newer channel snapshot always supersedes an older one. Individual pool parameters can be overridden via processors.task_executor.* properties without touching the shared default. Tests cover pool-size derivation from defaults and overrides, and confirm the eviction behaviour under a saturated queue. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When the status fetch for an archiver fails (host unreachable), the processor already skips that archiver to avoid spurious ARCHIVE submissions. However, channels with pvStatus=Active that are currently paused in the archiver will remain paused indefinitely — until the archiver recovers and another channel update happens to arrive. Add warnSkippedActivePvs() to make that risk visible: after a failed status fetch, filter the skipped batch to Active channels and log them at WARNING with a count and up to 10 names, so operators can see which PVs may need a manual RESUME if the archiver was down for an extended period. Test: AAChannelProcessorIT.testStatusFetchFailureSkipsConfigureAndReturnsZero verifies that when getStatusesViaGet throws, process() returns 0 and configureAA is never called. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



Combination of refactor and fixes for the Archiver Channel Processor
Fixes
Feature changes
Fixes #221