Skip to content

Add ByteDecoder for opensearch_api source to support Kafka buffer#6879

Open
divakarsingh wants to merge 1 commit into
opensearch-project:mainfrom
divakarsingh:fix/opensearch-api-kafka-buffer
Open

Add ByteDecoder for opensearch_api source to support Kafka buffer#6879
divakarsingh wants to merge 1 commit into
opensearch-project:mainfrom
divakarsingh:fix/opensearch-api-kafka-buffer

Conversation

@divakarsingh

Copy link
Copy Markdown

Description
The opensearch_api source did not work with Kafka buffer because no ByteDecoder was registered. When
buffer.isByteBuffer()=true, raw bytes are written but the consumer side had no way to reconstruct events from the NDJSON bulk
format.

This adds OpenSearchBulkByteDecoder which parses NDJSON bulk format back into Data Prepper events with correct metadata
attributes.

Issues Resolved
Resolves #6876

Check List

  • New functionality includes testing
  • Commits are signed with DCO (Signed-off-by)

@github-actions

github-actions Bot commented May 21, 2026

Copy link
Copy Markdown

✅ License Header Check Passed

All newly added files have proper license headers. Great work! 🎉

@divakarsingh divakarsingh force-pushed the fix/opensearch-api-kafka-buffer branch from 220f714 to 2a982d8 Compare May 22, 2026 02:49

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @divakarsingh for this contribution! I have a few comments on the code and the approach.

public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
String line;
while ((line = reader.readLine()) != null) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should be the same as the existing code in OpenSearchAPIService for handling the response for non-byte buffers. The current structure of this class looks reusable. I think we might be able to refactor OpenSearchAPIService to use this in processBulkRequest.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Refactored OpenSearchAPIService.processBulkRequest to delegate to OpenSearchBulkByteDecoder for the non-byte-buffer path, so there's a single copy of the parsing logic. Let me know if you had a different structure in mind.

The opensearch_api source did not work with Kafka buffer because no
ByteDecoder was registered. When buffer.isByteBuffer()=true, raw bytes
are written but the consumer side had no way to reconstruct events from
the NDJSON bulk format.

This adds OpenSearchBulkByteDecoder which parses NDJSON bulk format
back into Data Prepper events with correct metadata attributes.
OpenSearchAPIService now delegates to the decoder for the non-byte-buffer
path as well, eliminating duplicate parsing logic.

Resolves opensearch-project#6876

Signed-off-by: Divakar Pratap Singh <divakar.p.singh@gmail.com>
@divakarsingh divakarsingh force-pushed the fix/opensearch-api-kafka-buffer branch from 2a982d8 to 504f703 Compare June 9, 2026 11:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] opensearch_api source does not work with Kafka buffer - no ByteDecoder registered

2 participants