From 900043828d817064dc73d29eed95162d456137d7 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Mon, 22 Jun 2026 12:27:13 +0200 Subject: [PATCH] Detect duplicate JSON keys in flow files Resolves #77. Before computing a diff, both flow files are now scanned with a streaming JSON parser that has STRICT_DUPLICATE_DETECTION enabled. If a duplicate key is found the action posts a CAUTION callout in the PR comment with the file path and exact line/column, then exits with code 1 so the PR check is blocked until the file is fixed. - Add validateNoDuplicateKeys() which uses a separate JsonFactory with STRICT_DUPLICATE_DETECTION; re-wraps JsonParseException with the file path for a self-contained error message. - Call it at the start of getDiff() for both pathA (skipped when absent) and pathB. - Catch JsonParseException in executeFlowDiffForOneFlow() and print the CAUTION block; set jsonParseError flag for run() to track. - Track jsonParseError per flow in run() and return RETURN_FAILURE if any parse error occurred. - Tighten the catch for snapshotA from Exception to IOException. - Add test fixture flow_v9_duplicate_key.json and four new tests. --- README.md | 12 ++++ .../java/com/snowflake/openflow/FlowDiff.java | 49 ++++++++++++- .../com/snowflake/openflow/FlowDiffTest.java | 53 ++++++++++++++ .../test/resources/flow_v9_duplicate_key.json | 72 +++++++++++++++++++ 4 files changed, 184 insertions(+), 2 deletions(-) create mode 100644 flow-diff/src/test/resources/flow_v9_duplicate_key.json diff --git a/README.md b/README.md index 72bbdaa..340794a 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,18 @@ Available rules: - `enforcePrioritizer` to check if all connections in the flow are set with the configured list of prioritizers (parameter: `prioritizers`, comma-separated list of expected prioritizers, example: `org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer`) - `backpressureThreshold` to ensure each connection keeps both data size and object count backpressure thresholds greater than zero +## JSON Validation + +Before computing a diff, the action validates both flow files for duplicate JSON keys. Duplicate keys are not valid JSON and silently produce wrong diffs because parsers apply last-wins on repeated keys, discarding earlier occurrences. This commonly occurs when a merge conflict is not fully resolved. + +If a duplicate key is detected the action posts a `[!CAUTION]` block in the PR comment identifying the file and the exact line, then exits with a non-zero status code so the PR check is blocked until the file is fixed: + +```markdown +> [!CAUTION] +> Flow file `submitted-changes/flows/my-flow.json` contains duplicate JSON keys (this typically indicates a merge conflict that was not fully resolved): Duplicate field 'flowContents' +> Line 37, column 17 +``` + ## Example The GitHub Action will automatically publish a comment on the pull request with a comprehensive description of the changes between the flows of the two branches. diff --git a/flow-diff/src/main/java/com/snowflake/openflow/FlowDiff.java b/flow-diff/src/main/java/com/snowflake/openflow/FlowDiff.java index 8b357c0..e591442 100644 --- a/flow-diff/src/main/java/com/snowflake/openflow/FlowDiff.java +++ b/flow-diff/src/main/java/com/snowflake/openflow/FlowDiff.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -75,6 +76,7 @@ public class FlowDiff { private static Map parameterContexts; private static Map processGroups; private static List checkstyleViolations; + private static boolean jsonParseError; public static void main(String[] args) throws IOException { final int exitCode = run(args); @@ -134,6 +136,7 @@ static int run(String[] args) throws IOException { } boolean hasBlockingCheckstyleViolations = false; + boolean hasParseErrors = false; for (int i = 0; i < pathsA.size(); i++) { System.out.println(""); @@ -141,9 +144,11 @@ static int run(String[] args) throws IOException { flowName = ""; parameterContexts = new HashMap<>(); processGroups = new HashMap<>(); + jsonParseError = false; final boolean flowHasCheckstyleViolations = executeFlowDiffForOneFlow(pathsA.get(i), pathsB.get(i), checkstyleEnabled, rulesConfig); hasBlockingCheckstyleViolations = hasBlockingCheckstyleViolations || flowHasCheckstyleViolations; + hasParseErrors = hasParseErrors || jsonParseError; } // Post to GitHub if credentials are provided @@ -162,6 +167,10 @@ static int run(String[] args) throws IOException { } } + if (hasParseErrors) { + return RETURN_FAILURE; + } + if (checkstyleEnabled && failOnCheckstyleViolations && hasBlockingCheckstyleViolations) { return RETURN_CHECKSTYLE_VIOLATIONS; } @@ -174,7 +183,22 @@ static int run(String[] args) throws IOException { private static boolean executeFlowDiffForOneFlow(final String pathA, final String pathB, final boolean checkstyleEnabled, final CheckstyleRulesConfig rulesConfig) throws IOException { - final Set diffs = getDiff(pathA, pathB, checkstyleEnabled, rulesConfig); + final Set diffs; + try { + diffs = getDiff(pathA, pathB, checkstyleEnabled, rulesConfig); + } catch (JsonParseException e) { + jsonParseError = true; + System.out.println("### Executing Snowflake Flow Diff for flow: `" + pathB + "`"); + System.out.println(""); + System.out.println("> [!CAUTION]"); + System.out.println("> " + e.getOriginalMessage()); + if (e.getLocation() != null) { + System.out.println("> Line " + e.getLocation().getLineNr() + + ", column " + e.getLocation().getColumnNr()); + } + System.out.println(""); + return false; + } final Set bundleChanges = new HashSet<>(); boolean flowHasCheckstyleViolations = false; @@ -566,6 +590,9 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin public static Set getDiff(final String pathA, final String pathB, final boolean checkstyleEnabled, final CheckstyleRulesConfig rulesConfig) throws IOException { + validateNoDuplicateKeys(pathA); + validateNoDuplicateKeys(pathB); + final ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -576,7 +603,7 @@ public static Set getDiff(final String pathA, final String pathB FlowSnapshotContainer snapshotA = null; try { snapshotA = getFlowContainer(pathA, factory); - } catch (Exception e) { + } catch (IOException e) { // no original flow - meaning that the Github Action is executed against the // first version of the flow noOriginalFlow = true; @@ -705,6 +732,24 @@ static FlowSnapshotContainer getFlowContainer(final String path, final JsonFacto } } + static void validateNoDuplicateKeys(final String path) throws IOException { + final File file = new File(path); + if (!file.exists()) { + return; + } + final JsonFactory strictFactory = new JsonFactory(); + strictFactory.enable(JsonParser.Feature.STRICT_DUPLICATE_DETECTION); + try (final JsonParser parser = strictFactory.createParser(file)) { + while (parser.nextToken() != null) {} + } catch (JsonParseException e) { + throw new JsonParseException(null, + "Flow file `" + path + "` contains duplicate JSON keys" + + " (this typically indicates a merge conflict that was not fully resolved)" + + ": " + e.getOriginalMessage(), + e.getLocation()); + } + } + static String printComponent(final VersionedComponent component) { String result = component.getComponentType().getTypeName(); diff --git a/flow-diff/src/test/java/com/snowflake/openflow/FlowDiffTest.java b/flow-diff/src/test/java/com/snowflake/openflow/FlowDiffTest.java index bc2a288..fd57be2 100644 --- a/flow-diff/src/test/java/com/snowflake/openflow/FlowDiffTest.java +++ b/flow-diff/src/test/java/com/snowflake/openflow/FlowDiffTest.java @@ -16,14 +16,19 @@ */ package com.snowflake.openflow; +import com.fasterxml.jackson.core.JsonParseException; import org.apache.nifi.registry.flow.diff.DifferenceType; import org.apache.nifi.registry.flow.diff.FlowDifference; import org.junit.jupiter.api.Test; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class FlowDiffTest { @@ -129,4 +134,52 @@ void testCheckstyleFailExitCode() throws IOException { "true" }); assertEquals(2, exitCode); } + + @Test + void testDuplicateKeyInFlowBThrowsException() { + assertThrows(JsonParseException.class, () -> + FlowDiff.getDiff( + "src/test/resources/flow_v1_initial.json", + "src/test/resources/flow_v9_duplicate_key.json", + false, null)); + } + + @Test + void testDuplicateKeyInFlowAThrowsException() { + assertThrows(JsonParseException.class, () -> + FlowDiff.getDiff( + "src/test/resources/flow_v9_duplicate_key.json", + "src/test/resources/flow_v1_initial.json", + false, null)); + } + + @Test + void testDuplicateKeyReturnsFailureExitCode() throws IOException { + final int exitCode = FlowDiff.run(new String[]{ + "src/test/resources/flow_v1_initial.json", + "src/test/resources/flow_v9_duplicate_key.json", + "", "", "" + }); + assertEquals(1, exitCode); + } + + @Test + void testDuplicateKeyOutputContainsCaution() throws IOException { + final ByteArrayOutputStream buf = new ByteArrayOutputStream(); + final PrintStream orig = System.out; + System.setOut(new PrintStream(buf, true, StandardCharsets.UTF_8)); + try { + FlowDiff.run(new String[]{ + "src/test/resources/flow_v1_initial.json", + "src/test/resources/flow_v9_duplicate_key.json", + "", "", "" + }); + } finally { + System.setOut(orig); + } + final String output = buf.toString(StandardCharsets.UTF_8); + assertTrue(output.contains("[!CAUTION]"), "CAUTION callout missing"); + assertTrue(output.contains("duplicate") || output.contains("Duplicate"), "duplicate key message missing"); + assertTrue(output.contains("flowContents"), "duplicate field name missing"); + } } diff --git a/flow-diff/src/test/resources/flow_v9_duplicate_key.json b/flow-diff/src/test/resources/flow_v9_duplicate_key.json new file mode 100644 index 0000000..24febfa --- /dev/null +++ b/flow-diff/src/test/resources/flow_v9_duplicate_key.json @@ -0,0 +1,72 @@ +{ + "externalControllerServices" : { }, + "flow" : { + "createdTimestamp" : 1726000168945, + "description" : "test", + "identifier" : "test", + "lastModifiedTimestamp" : 1726000168945, + "name" : "test", + "versionCount" : 0 + }, + "flowContents" : { + "comments" : "", + "componentType" : "PROCESS_GROUP", + "connections" : [ ], + "controllerServices" : [ ], + "defaultBackPressureDataSizeThreshold" : "1 GB", + "defaultBackPressureObjectThreshold" : 10000, + "defaultFlowFileExpiration" : "0 sec", + "executionEngine" : "INHERITED", + "externalControllerServiceReferences" : { }, + "flowFileConcurrency" : "UNBOUNDED", + "flowFileOutboundPolicy" : "STREAM_WHEN_AVAILABLE", + "funnels" : [ ], + "identifier" : "flow-contents-group", + "inputPorts" : [ ], + "labels" : [ ], + "maxConcurrentTasks" : 1, + "name" : "TestingFlowDiff", + "outputPorts" : [ ], + "position" : { "x" : 0.0, "y" : 0.0 }, + "processGroups" : [ ], + "processors" : [ ], + "remoteProcessGroups" : [ ], + "scheduledState" : "ENABLED", + "statelessFlowTimeout" : "1 min" + }, + "flowContents" : { + "comments" : "", + "componentType" : "PROCESS_GROUP", + "connections" : [ ], + "controllerServices" : [ ], + "defaultBackPressureDataSizeThreshold" : "1 GB", + "defaultBackPressureObjectThreshold" : 10000, + "defaultFlowFileExpiration" : "0 sec", + "executionEngine" : "INHERITED", + "externalControllerServiceReferences" : { }, + "flowFileConcurrency" : "UNBOUNDED", + "flowFileOutboundPolicy" : "STREAM_WHEN_AVAILABLE", + "funnels" : [ ], + "identifier" : "flow-contents-group-duplicate", + "inputPorts" : [ ], + "labels" : [ ], + "maxConcurrentTasks" : 1, + "name" : "TestingFlowDiff", + "outputPorts" : [ ], + "position" : { "x" : 0.0, "y" : 0.0 }, + "processGroups" : [ ], + "processors" : [ ], + "remoteProcessGroups" : [ ], + "scheduledState" : "ENABLED", + "statelessFlowTimeout" : "1 min" + }, + "flowEncodingVersion" : "1.0", + "latest" : false, + "parameterContexts" : { }, + "parameterProviders" : { }, + "snapshotMetadata" : { + "author" : "test", + "flowIdentifier" : "test", + "timestamp" : 0 + } +}