Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ Available rules:
- `enforcePrioritizer` to check if all connections in the flow are set with the configured list of prioritizers (parameter: `prioritizers`, comma-separated list of expected prioritizers, example: `org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer`)
- `backpressureThreshold` to ensure each connection keeps both data size and object count backpressure thresholds greater than zero

## JSON Validation

Before computing a diff, the action validates both flow files for duplicate JSON keys. Duplicate keys are not valid JSON and silently produce wrong diffs because parsers apply last-wins on repeated keys, discarding earlier occurrences. This commonly occurs when a merge conflict is not fully resolved.

If a duplicate key is detected the action posts a `[!CAUTION]` block in the PR comment identifying the file and the exact line, then exits with a non-zero status code so the PR check is blocked until the file is fixed:

```markdown
> [!CAUTION]
> Flow file `submitted-changes/flows/my-flow.json` contains duplicate JSON keys (this typically indicates a merge conflict that was not fully resolved): Duplicate field 'flowContents'
> Line 37, column 17
```

## Example

The GitHub Action will automatically publish a comment on the pull request with a comprehensive description of the changes between the flows of the two branches.
Expand Down
49 changes: 47 additions & 2 deletions flow-diff/src/main/java/com/snowflake/openflow/FlowDiff.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class FlowDiff {
private static Map<String, VersionedParameterContext> parameterContexts;
private static Map<String, VersionedProcessGroup> processGroups;
private static List<String> checkstyleViolations;
private static boolean jsonParseError;

public static void main(String[] args) throws IOException {
final int exitCode = run(args);
Expand Down Expand Up @@ -134,16 +136,19 @@ static int run(String[] args) throws IOException {
}

boolean hasBlockingCheckstyleViolations = false;
boolean hasParseErrors = false;

for (int i = 0; i < pathsA.size(); i++) {
System.out.println("");

flowName = "";
parameterContexts = new HashMap<>();
processGroups = new HashMap<>();
jsonParseError = false;

final boolean flowHasCheckstyleViolations = executeFlowDiffForOneFlow(pathsA.get(i), pathsB.get(i), checkstyleEnabled, rulesConfig);
hasBlockingCheckstyleViolations = hasBlockingCheckstyleViolations || flowHasCheckstyleViolations;
hasParseErrors = hasParseErrors || jsonParseError;
}

// Post to GitHub if credentials are provided
Expand All @@ -162,6 +167,10 @@ static int run(String[] args) throws IOException {
}
}

if (hasParseErrors) {
return RETURN_FAILURE;
}

if (checkstyleEnabled && failOnCheckstyleViolations && hasBlockingCheckstyleViolations) {
return RETURN_CHECKSTYLE_VIOLATIONS;
}
Expand All @@ -174,7 +183,22 @@ static int run(String[] args) throws IOException {

private static boolean executeFlowDiffForOneFlow(final String pathA, final String pathB,
final boolean checkstyleEnabled, final CheckstyleRulesConfig rulesConfig) throws IOException {
final Set<FlowDifference> diffs = getDiff(pathA, pathB, checkstyleEnabled, rulesConfig);
final Set<FlowDifference> diffs;
try {
diffs = getDiff(pathA, pathB, checkstyleEnabled, rulesConfig);
} catch (JsonParseException e) {
jsonParseError = true;
System.out.println("### Executing Snowflake Flow Diff for flow: `" + pathB + "`");
System.out.println("");
System.out.println("> [!CAUTION]");
System.out.println("> " + e.getOriginalMessage());
if (e.getLocation() != null) {
System.out.println("> Line " + e.getLocation().getLineNr()
+ ", column " + e.getLocation().getColumnNr());
}
System.out.println("");
return false;
}
final Set<String> bundleChanges = new HashSet<>();
boolean flowHasCheckstyleViolations = false;

Expand Down Expand Up @@ -566,6 +590,9 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin

public static Set<FlowDifference> getDiff(final String pathA, final String pathB,
final boolean checkstyleEnabled, final CheckstyleRulesConfig rulesConfig) throws IOException {
validateNoDuplicateKeys(pathA);
validateNoDuplicateKeys(pathB);

final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand All @@ -576,7 +603,7 @@ public static Set<FlowDifference> getDiff(final String pathA, final String pathB
FlowSnapshotContainer snapshotA = null;
try {
snapshotA = getFlowContainer(pathA, factory);
} catch (Exception e) {
} catch (IOException e) {
// no original flow - meaning that the Github Action is executed against the
// first version of the flow
noOriginalFlow = true;
Expand Down Expand Up @@ -705,6 +732,24 @@ static FlowSnapshotContainer getFlowContainer(final String path, final JsonFacto
}
}

static void validateNoDuplicateKeys(final String path) throws IOException {
final File file = new File(path);
if (!file.exists()) {
return;
}
final JsonFactory strictFactory = new JsonFactory();
strictFactory.enable(JsonParser.Feature.STRICT_DUPLICATE_DETECTION);
try (final JsonParser parser = strictFactory.createParser(file)) {
while (parser.nextToken() != null) {}
} catch (JsonParseException e) {
throw new JsonParseException(null,
"Flow file `" + path + "` contains duplicate JSON keys"
+ " (this typically indicates a merge conflict that was not fully resolved)"
+ ": " + e.getOriginalMessage(),
e.getLocation());
}
}

static String printComponent(final VersionedComponent component) {
String result = component.getComponentType().getTypeName();

Expand Down
53 changes: 53 additions & 0 deletions flow-diff/src/test/java/com/snowflake/openflow/FlowDiffTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
*/
package com.snowflake.openflow;

import com.fasterxml.jackson.core.JsonParseException;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class FlowDiffTest {
Expand Down Expand Up @@ -129,4 +134,52 @@ void testCheckstyleFailExitCode() throws IOException {
"true" });
assertEquals(2, exitCode);
}

@Test
void testDuplicateKeyInFlowBThrowsException() {
assertThrows(JsonParseException.class, () ->
FlowDiff.getDiff(
"src/test/resources/flow_v1_initial.json",
"src/test/resources/flow_v9_duplicate_key.json",
false, null));
}

@Test
void testDuplicateKeyInFlowAThrowsException() {
assertThrows(JsonParseException.class, () ->
FlowDiff.getDiff(
"src/test/resources/flow_v9_duplicate_key.json",
"src/test/resources/flow_v1_initial.json",
false, null));
}

@Test
void testDuplicateKeyReturnsFailureExitCode() throws IOException {
final int exitCode = FlowDiff.run(new String[]{
"src/test/resources/flow_v1_initial.json",
"src/test/resources/flow_v9_duplicate_key.json",
"", "", ""
});
assertEquals(1, exitCode);
}

@Test
void testDuplicateKeyOutputContainsCaution() throws IOException {
final ByteArrayOutputStream buf = new ByteArrayOutputStream();
final PrintStream orig = System.out;
System.setOut(new PrintStream(buf, true, StandardCharsets.UTF_8));
try {
FlowDiff.run(new String[]{
"src/test/resources/flow_v1_initial.json",
"src/test/resources/flow_v9_duplicate_key.json",
"", "", ""
});
} finally {
System.setOut(orig);
}
final String output = buf.toString(StandardCharsets.UTF_8);
assertTrue(output.contains("[!CAUTION]"), "CAUTION callout missing");
assertTrue(output.contains("duplicate") || output.contains("Duplicate"), "duplicate key message missing");
assertTrue(output.contains("flowContents"), "duplicate field name missing");
}
}
72 changes: 72 additions & 0 deletions flow-diff/src/test/resources/flow_v9_duplicate_key.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{
"externalControllerServices" : { },
"flow" : {
"createdTimestamp" : 1726000168945,
"description" : "test",
"identifier" : "test",
"lastModifiedTimestamp" : 1726000168945,
"name" : "test",
"versionCount" : 0
},
"flowContents" : {
"comments" : "",
"componentType" : "PROCESS_GROUP",
"connections" : [ ],
"controllerServices" : [ ],
"defaultBackPressureDataSizeThreshold" : "1 GB",
"defaultBackPressureObjectThreshold" : 10000,
"defaultFlowFileExpiration" : "0 sec",
"executionEngine" : "INHERITED",
"externalControllerServiceReferences" : { },
"flowFileConcurrency" : "UNBOUNDED",
"flowFileOutboundPolicy" : "STREAM_WHEN_AVAILABLE",
"funnels" : [ ],
"identifier" : "flow-contents-group",
"inputPorts" : [ ],
"labels" : [ ],
"maxConcurrentTasks" : 1,
"name" : "TestingFlowDiff",
"outputPorts" : [ ],
"position" : { "x" : 0.0, "y" : 0.0 },
"processGroups" : [ ],
"processors" : [ ],
"remoteProcessGroups" : [ ],
"scheduledState" : "ENABLED",
"statelessFlowTimeout" : "1 min"
},
"flowContents" : {
"comments" : "",
"componentType" : "PROCESS_GROUP",
"connections" : [ ],
"controllerServices" : [ ],
"defaultBackPressureDataSizeThreshold" : "1 GB",
"defaultBackPressureObjectThreshold" : 10000,
"defaultFlowFileExpiration" : "0 sec",
"executionEngine" : "INHERITED",
"externalControllerServiceReferences" : { },
"flowFileConcurrency" : "UNBOUNDED",
"flowFileOutboundPolicy" : "STREAM_WHEN_AVAILABLE",
"funnels" : [ ],
"identifier" : "flow-contents-group-duplicate",
"inputPorts" : [ ],
"labels" : [ ],
"maxConcurrentTasks" : 1,
"name" : "TestingFlowDiff",
"outputPorts" : [ ],
"position" : { "x" : 0.0, "y" : 0.0 },
"processGroups" : [ ],
"processors" : [ ],
"remoteProcessGroups" : [ ],
"scheduledState" : "ENABLED",
"statelessFlowTimeout" : "1 min"
},
"flowEncodingVersion" : "1.0",
"latest" : false,
"parameterContexts" : { },
"parameterProviders" : { },
"snapshotMetadata" : {
"author" : "test",
"flowIdentifier" : "test",
"timestamp" : 0
}
}
Loading