Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions flow-diff/src/main/java/com/snowflake/openflow/FlowDiff.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin
printConfigurableExtensionProperties(proc);
} else if (diff.getComponentB().getComponentType().equals(ComponentType.CONTROLLER_SERVICE)) {
final VersionedControllerService cs = (VersionedControllerService) diff.getComponentB();
final String pgName = processGroups.get(cs.getGroupIdentifier()).getName();
final VersionedProcessGroup csPg = processGroups.get(cs.getGroupIdentifier());
final String pgName = csPg == null ? cs.getGroupIdentifier() : csPg.getName();
System.out.println("- A " + printComponent(diff.getComponentB())
+ " has been added in Process Group `" + pgName + "` with the below properties:");
printConfigurableExtensionProperties(cs);
Expand Down Expand Up @@ -397,7 +398,12 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin
case PARAMETER_ADDED: {
final String paramKey = diff.getFieldName().get();
final VersionedParameterContext pc = (VersionedParameterContext) diff.getComponentB();
final VersionedParameter param = pc.getParameters().stream().filter(p -> p.getName().equals(paramKey)).findFirst().get();
final VersionedParameter param = pc.getParameters() == null ? null
: pc.getParameters().stream().filter(p -> p.getName().equals(paramKey)).findFirst().orElse(null);
if (param == null) {
System.out.println("- In the Parameter Context `" + pc.getName() + "` a parameter has been added: `" + paramKey + "`");
break;
}

final String description;
if (isEmpty(param.getDescription())) {
Expand Down Expand Up @@ -426,8 +432,15 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin
final String paramKey = diff.getFieldName().get();
final VersionedParameterContext pcBefore = (VersionedParameterContext) diff.getComponentA();
final VersionedParameterContext pcAfter = (VersionedParameterContext) diff.getComponentB();
final VersionedParameter paramBefore = pcBefore.getParameters().stream().filter(p -> p.getName().equals(paramKey)).findFirst().get();
final VersionedParameter paramAfter = pcAfter.getParameters().stream().filter(p -> p.getName().equals(paramKey)).findFirst().get();
final VersionedParameter paramBefore = pcBefore.getParameters() == null ? null
: pcBefore.getParameters().stream().filter(p -> p.getName().equals(paramKey)).findFirst().orElse(null);
final VersionedParameter paramAfter = pcAfter.getParameters() == null ? null
: pcAfter.getParameters().stream().filter(p -> p.getName().equals(paramKey)).findFirst().orElse(null);
if (paramBefore == null || paramAfter == null) {
System.out.println("- In the Parameter Context `" + pcAfter.getName()
+ "`, the value of the parameter `" + paramKey + "` has changed");
break;
}
System.out.println("- In the Parameter Context `" + pcAfter.getName()
+ "`, the value of the parameter `" + paramKey + "` has changed from "
+ printFromTo(paramBefore.isSensitive() ? "<Sensitive Value>" : paramBefore.getValue(),
Expand Down Expand Up @@ -644,9 +657,9 @@ public int compare(FlowDifference o1, FlowDifference o2) {
return sortedDiffs;
}

private static void registerProcessGroups(VersionedProcessGroup rootPG) {
Set<VersionedProcessGroup> childPGs = rootPG.getProcessGroups();
for (VersionedProcessGroup pg : childPGs) {
private static void registerProcessGroups(final VersionedProcessGroup rootPG) {
final Set<VersionedProcessGroup> childPGs = rootPG.getProcessGroups() != null ? rootPG.getProcessGroups() : Set.of();
for (final VersionedProcessGroup pg : childPGs) {
processGroups.put(pg.getIdentifier(), pg);
registerProcessGroups(pg);
}
Expand Down Expand Up @@ -703,8 +716,11 @@ static String printComponent(final VersionedComponent component) {
}

static String printParameterContext(final VersionedParameterContext pc) {
if (pc == null || pc.getParameters() == null) {
return "{}";
}
final Map<String, String> parameters = new HashMap<>();
for (VersionedParameter p : pc.getParameters()) {
for (final VersionedParameter p : pc.getParameters()) {
if (p.isSensitive()) {
parameters.put(p.getName(), "<Sensitive Value>");
} else {
Expand All @@ -715,7 +731,10 @@ static String printParameterContext(final VersionedParameterContext pc) {
}

static void printConfigurableExtensionProperties(final VersionedConfigurableExtension proc) {
for (String key : proc.getProperties().keySet()) {
if (proc.getProperties() == null) {
return;
}
for (final String key : proc.getProperties().keySet()) {
System.out.println(" - `" + key + "` = `" + proc.getProperties().get(key) + "`");
}
}
Expand All @@ -742,26 +761,26 @@ static void printConnection(final VersionedConnection connection) {

List<String> nonDefaultConfigurations = new ArrayList<>();

if (!connection.getLoadBalanceStrategy().equals("DO_NOT_LOAD_BALANCE")) {
if (!"DO_NOT_LOAD_BALANCE".equals(connection.getLoadBalanceStrategy())) {
String lbConfiguration = "load balancing strategy `" + connection.getLoadBalanceStrategy() + "`";
if (connection.getLoadBalanceStrategy().equals("PARTITION_BY_ATTRIBUTE")) {
if ("PARTITION_BY_ATTRIBUTE".equals(connection.getLoadBalanceStrategy())) {
lbConfiguration += " and partitioning attribute `" + connection.getPartitioningAttribute() + "`";
}
if (!connection.getLoadBalanceCompression().equals("DO_NOT_COMPRESS")) {
if (!"DO_NOT_COMPRESS".equals(connection.getLoadBalanceCompression())) {
lbConfiguration += " and load balancing compression `" + connection.getLoadBalanceCompression() + "`";
}
nonDefaultConfigurations.add(lbConfiguration);
}

if (!connection.getPrioritizers().isEmpty()) {
if (connection.getPrioritizers() != null && !connection.getPrioritizers().isEmpty()) {
nonDefaultConfigurations.add("prioritizers `" + connection.getPrioritizers() + "`");
}

if (!connection.getFlowFileExpiration().equals("0 sec")) {
if (!"0 sec".equals(connection.getFlowFileExpiration())) {
nonDefaultConfigurations.add("FlowFile expiration of `" + connection.getFlowFileExpiration() + "`");
}

if (!connection.getBackPressureDataSizeThreshold().equals("1 GB")) {
if (!"1 GB".equals(connection.getBackPressureDataSizeThreshold())) {
nonDefaultConfigurations.add("backpressure data size threshold of `" + connection.getBackPressureDataSizeThreshold() + "`");
}

Expand Down