Add ByteDecoder for opensearch_api source to support Kafka buffer#6879
Add ByteDecoder for opensearch_api source to support Kafka buffer#6879divakarsingh wants to merge 1 commit into
Conversation
6fbd388 to
220f714
Compare
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
220f714 to
2a982d8
Compare
dlvenable
left a comment
There was a problem hiding this comment.
Thank you @divakarsingh for this contribution! I have a few comments on the code and the approach.
| public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException { | ||
| final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); | ||
| String line; | ||
| while ((line = reader.readLine()) != null) { |
There was a problem hiding this comment.
This code should be the same as the existing code in OpenSearchAPIService for handling the response for non-byte buffers. The current structure of this class looks reusable. I think we might be able to refactor OpenSearchAPIService to use this in processBulkRequest.
There was a problem hiding this comment.
@dlvenable Refactored OpenSearchAPIService.processBulkRequest to delegate to OpenSearchBulkByteDecoder for the non-byte-buffer path, so there's a single copy of the parsing logic. Let me know if you had a different structure in mind.
The opensearch_api source did not work with Kafka buffer because no ByteDecoder was registered. When buffer.isByteBuffer()=true, raw bytes are written but the consumer side had no way to reconstruct events from the NDJSON bulk format. This adds OpenSearchBulkByteDecoder which parses NDJSON bulk format back into Data Prepper events with correct metadata attributes. OpenSearchAPIService now delegates to the decoder for the non-byte-buffer path as well, eliminating duplicate parsing logic. Resolves opensearch-project#6876 Signed-off-by: Divakar Pratap Singh <divakar.p.singh@gmail.com>
2a982d8 to
504f703
Compare
Description
The
opensearch_apisource did not work with Kafka buffer because noByteDecoderwas registered. Whenbuffer.isByteBuffer()=true, raw bytes are written but the consumer side had no way to reconstruct events from the NDJSON bulkformat.
This adds
OpenSearchBulkByteDecoderwhich parses NDJSON bulk format back into Data Prepper events with correct metadataattributes.
Issues Resolved
Resolves #6876
Check List