Skip to content

AA Processor refactor and make more configurable#223

Open
jacomago wants to merge 13 commits into
ChannelFinder:masterfrom
jacomago:aa-processor-clarify
Open

AA Processor refactor and make more configurable#223
jacomago wants to merge 13 commits into
ChannelFinder:masterfrom
jacomago:aa-processor-clarify

Conversation

@jacomago
Copy link
Copy Markdown
Contributor

@jacomago jacomago commented May 22, 2026

Combination of refactor and fixes for the Archiver Channel Processor

  • Simplify the code by making it more DRY
  • Extract some lambdas for clarity
  • Move decision of post vs get for archiver requests from AAService to Processor
  • Reduce INFO and WARNING level logging (by swapping to counts instead of full lists) and use lambda logging

Fixes

  • Add a connection timeout
  • Warn on not sent resume updates

Feature changes

  • Cache the policies from the archivers rather than fetching them everytime
    • Policies rarely change so this should not be fetched all the time
  • Make processor task executor configuration configurable

Fixes #221

@jacomago jacomago force-pushed the aa-processor-clarify branch 3 times, most recently from 3feb24a to f6be0e5 Compare May 22, 2026 12:31
jacomago and others added 11 commits May 25, 2026 08:19
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Eliminate three duplicate inline stream-filter-findFirst patterns for
channel property lookups.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace a 15-line Optional.map(→Stream).orElse(stream) chain with a
named helper method, making the intent immediately clear at the call site.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
process() was doing three things: guard clauses, mapping channels to per-archiver
PV options, and submitting actions. Extract the latter two into named methods so
process() reads as a three-step narrative.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace the imperative for-loop (with continue guard) with a stream pipeline.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tion

- INFO: "Processing N channels." / "Configured N channels." frame each run
- WARNING (new): "No reachable archivers configured" when archiversInfo is empty
- WARNING: per-channel failure now names the channel and archiver alias
- WARNING: archiver response anomalies now identify which archiver and what was wrong
- FINE (was INFO): per-archiver status query — too chatty for production INFO
- FINER: raw status response now includes archiver alias for correlation
- All String.format calls wrapped in lambdas for lazy evaluation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…urious ARCHIVE

Previously, a failed HTTP status batch silently returned an empty list.
Those PVs were then treated as "not archived" and submitted for archiving
on every subsequent run, spamming the archiver with duplicate requests.

ArchiverService now throws ArchiverServiceException on status fetch failure.
AAChannelProcessor catches it in getArchiveActions() and returns null,
causing submitToArchivers() to skip that archiver entirely for the run.

Also renames getStatusesFromPvListQuery/Body → getStatusesViaGet/Post
to reflect their transport semantics

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… transport methods

ArchiverService was holding archiver-alias config (postSupportArchivers) and
making the POST-vs-GET routing decision — a concern that belongs in
AAChannelProcessor alongside all other per-archiver configuration.

ArchiverService now exposes two public transport methods:
  getStatusesViaGet(url, pvs)  — batches internally (URL length limit)
  getStatusesViaPost(url, pvs) — sends in a single POST body

AAChannelProcessor holds @value("${aa.post_support:}") and selects the
correct transport in getArchiveActions(). The field is volatile so the
runtime setter added later is thread-safe.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ot PV map

Logging the full PV map at INFO generates enormous output in production.
Replace with a count summary at FINE.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Policies are fetched once at startup (@PostConstruct) and refreshed on a
configurable fixed delay (aa.policy_refresh_interval_seconds, default 1 h)
rather than on every process() call. process() now reads a volatile snapshot,
so policy fetches can no longer block or fail mid-run. processorInfo() exposes
LastPolicyRefresh and per-archiver policy counts for observability.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…fail fast

Without a connect timeout, TCP SYN drops (firewall, no route) stall
for the OS default (~2 min) before returning. Setting connect timeout
equal to the configured read timeout ensures both slow and unreachable
archivers fail at the same bounded deadline.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@jacomago jacomago force-pushed the aa-processor-clarify branch from a4945df to 8b5e689 Compare May 25, 2026 07:20
jacomago and others added 2 commits May 25, 2026 14:39
Channel processors (e.g. archiver sync on IOC reconnect) are dispatched
asynchronously on every channel write. Previously the pool was anonymous,
sized by a single hard-coded property, and used AbortPolicy — meaning a
saturated queue would propagate an exception to the HTTP caller and cause
receivers to retry, increasing load further.

ChannelFinderProcessorExecutor replaces it with a named @component whose
size is driven by processors.max_concurrent_updates (default 10). Core and
max are equal to avoid ramp-up lag during bursts of channel updates. The
queue is deliberately shallow (N/4) with a discard-oldest rejection policy:
when the pool is saturated, the stalest pending batch is evicted in favour
of the fresher update, since a newer channel snapshot always supersedes an
older one. Individual pool parameters can be overridden via
processors.task_executor.* properties without touching the shared default.

Tests cover pool-size derivation from defaults and overrides, and confirm
the eviction behaviour under a saturated queue.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When the status fetch for an archiver fails (host unreachable), the
processor already skips that archiver to avoid spurious ARCHIVE
submissions. However, channels with pvStatus=Active that are currently
paused in the archiver will remain paused indefinitely — until the
archiver recovers and another channel update happens to arrive.

Add warnSkippedActivePvs() to make that risk visible: after a failed
status fetch, filter the skipped batch to Active channels and log them
at WARNING with a count and up to 10 names, so operators can see which
PVs may need a manual RESUME if the archiver was down for an extended
period.

Test: AAChannelProcessorIT.testStatusFetchFailureSkipsConfigureAndReturnsZero
verifies that when getStatusesViaGet throws, process() returns 0 and
configureAA is never called.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@sonarqubecloud
Copy link
Copy Markdown

@jacomago jacomago changed the title Aa processor clarify AA Processor refactor and make more configurable May 25, 2026
@jacomago jacomago marked this pull request as ready for review May 25, 2026 13:19
@jacomago jacomago self-assigned this May 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fetching policy blocks use of aa processor

1 participant