Skip to content

Commit

Permalink
NIFI-13030 Adding endpoint for comparing versions of registered flows
Browse files Browse the repository at this point in the history
This closes apache#8670

Signed-off-by: Peter Gyori <pgyori@apache.org>
  • Loading branch information
simonbence authored and pgyori committed Jun 5, 2024
1 parent 21f0ca4 commit 0a5be35
Show file tree
Hide file tree
Showing 12 changed files with 802 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.nifi.registry.flow;

import java.util.Objects;

/**
* Information for locating a flow version in a flow registry.
*/
Expand All @@ -43,4 +45,19 @@ public void setVersion(final String version) {
this.version = version;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final FlowVersionLocation that = (FlowVersionLocation) o;
return Objects.equals(getBranch(), that.getBranch())
&& Objects.equals(getBucketId(), that.getBucketId())
&& Objects.equals(getFlowId(), that.getFlowId())
&& Objects.equals(version, that.version);
}

@Override
public int hashCode() {
return Objects.hash(getBranch(), getBucketId(), getFlowId(), version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import jakarta.xml.bind.annotation.XmlType;

import java.util.Objects;

@XmlType(name = "difference")
public class DifferenceDTO {
private String differenceType;
Expand All @@ -44,4 +46,16 @@ public void setDifference(String difference) {
this.difference = difference;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DifferenceDTO that = (DifferenceDTO) o;
return Objects.equals(differenceType, that.differenceType) && Objects.equals(difference, that.difference);
}

@Override
public int hashCode() {
return Objects.hash(differenceType, difference);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.nifi.parameter.ParameterGroupConfiguration;
import org.apache.nifi.registry.flow.FlowLocation;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.FlowVersionLocation;
import org.apache.nifi.registry.flow.RegisterAction;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
Expand Down Expand Up @@ -1501,6 +1502,16 @@ Set<DocumentedTypeDTO> getControllerServiceTypes(final String serviceType, final
*/
RegisteredFlow deleteVersionedFlow(String registryId, String branch, String bucketId, String flowId);

/**
* Returns the differences of version B from version A.
*
* @param registryId the ID of the registry
* @param versionLocationA Location of the baseline snapshot of the comparison
* @param versionLocationB location of the compared snapshot
* @return the differences between the snapshots
*/
FlowComparisonEntity getVersionDifference(String registryId, FlowVersionLocation versionLocationA, FlowVersionLocation versionLocationB);

/**
* Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5258,6 +5258,42 @@ public RegisteredFlow deleteVersionedFlow(final String registryId, final String
}
}

@Override
public FlowComparisonEntity getVersionDifference(final String registryId, FlowVersionLocation versionLocationA, FlowVersionLocation versionLocationB) {
final FlowComparisonEntity result = new FlowComparisonEntity();

if (versionLocationA.equals(versionLocationB)) {
// If both versions are the same, there is no need for comparison. Comparing them should have the same result but with the cost of some calls to the registry.
// Note: because of this optimization we return an empty non-error response in case of non-existing registry, bucket, flow or version if the versions are the same.
result.setComponentDifferences(Collections.emptySet());
return result;
}

final FlowSnapshotContainer snapshotA = this.getVersionedFlowSnapshot(
registryId, versionLocationA.getBranch(), versionLocationA.getBucketId(), versionLocationA.getFlowId(), versionLocationA.getVersion(), true);
final FlowSnapshotContainer snapshotB = this.getVersionedFlowSnapshot(
registryId, versionLocationB.getBranch(), versionLocationB.getBucketId(), versionLocationB.getFlowId(), versionLocationB.getVersion(), true);

final VersionedProcessGroup flowContentsA = snapshotA.getFlowSnapshot().getFlowContents();
final VersionedProcessGroup flowContentsB = snapshotB.getFlowSnapshot().getFlowContents();

final FlowComparator flowComparator = new StandardFlowComparator(
new StandardComparableDataFlow("Flow A", flowContentsA),
new StandardComparableDataFlow("Flow B", flowContentsB),
Collections.emptySet(), // Replacement of an external ControllerService is recognized as property change
new ConciseEvolvingDifferenceDescriptor(),
Function.identity(),
VersionedComponent::getIdentifier,
FlowComparatorVersionedStrategy.DEEP
);

final FlowComparison flowComparison = flowComparator.compare();
final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtosForLocalModifications(flowComparison, flowContentsA, controllerFacade.getFlowManager());
result.setComponentDifferences(differenceDtos);

return result;
}

@Override
public boolean isAnyProcessGroupUnderVersionControl(final String groupId) {
return isProcessGroupUnderVersionControl(processGroupDAO.getProcessGroup(groupId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowVersionLocation;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
import org.apache.nifi.web.NiFiServiceFacade;
Expand All @@ -60,6 +61,8 @@
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.ClusterSummaryDTO;
import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
import org.apache.nifi.web.api.dto.DifferenceDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
Expand Down Expand Up @@ -89,6 +92,7 @@
import org.apache.nifi.web.api.entity.FlowAnalysisResultEntity;
import org.apache.nifi.web.api.entity.FlowAnalysisRuleTypesEntity;
import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
import org.apache.nifi.web.api.entity.FlowRegistryBranchEntity;
import org.apache.nifi.web.api.entity.FlowRegistryBranchesEntity;
Expand Down Expand Up @@ -145,6 +149,7 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.StreamingOutput;
import org.apache.nifi.web.util.PaginationHelper;

import java.text.Collator;
import java.time.OffsetDateTime;
Expand Down Expand Up @@ -2057,6 +2062,100 @@ public Response getDetails(
return generateOkResponse(flowDetails).build();
}

@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("registries/{registry-id}/branches/{branch-id-a}/buckets/{bucket-id-a}/flows/{flow-id-a}/{version-a}/diff/branches/{branch-id-b}/buckets/{bucket-id-b}/flows/{flow-id-b}/{version-b}")
@Operation(
summary = "Gets the differences between two versions of the same versioned flow, the basis of the comparison will be the first version",
responses = @ApiResponse(content = @Content(schema = @Schema(implementation = FlowComparisonEntity.class))),
security = {
@SecurityRequirement(name = "Read - /flow")
}
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
}
)
public Response getVersionDifferences(
@Parameter(
description = "The registry client id.",
required = true
)
@PathParam("registry-id") String registryId,

@Parameter(
description = "The branch id for the base version.",
required = true
)
@PathParam("branch-id-a") String branchIdA,

@Parameter(
description = "The bucket id for the base version.",
required = true
)
@PathParam("bucket-id-a") String bucketIdA,

@Parameter(
description = "The flow id for the base version.",
required = true
)
@PathParam("flow-id-a") String flowIdA,

@Parameter(
description = "The base version.",
required = true
)
@PathParam("version-a") String versionA,

@Parameter(
description = "The branch id for the compared version.",
required = true
)
@PathParam("branch-id-b") String branchIdB,

@Parameter(
description = "The bucket id for the compared version.",
required = true
)
@PathParam("bucket-id-b") String bucketIdB,

@Parameter(
description = "The flow id for the compared version.",
required = true
)
@PathParam("flow-id-b") String flowIdB,

@Parameter(
description = "The compared version.",
required = true
)
@PathParam("version-b") String versionB,
@QueryParam("offset")
@Parameter(description = "Must be a non-negative number. Specifies the starting point of the listing. 0 means start from the beginning.")
@DefaultValue("0")
int offset,
@QueryParam("limit")
@Parameter(description = "Limits the number of differences listed. This might lead to partial result. 0 means no limitation is applied.")
@DefaultValue("1000")
int limit
) {
authorizeFlow();
FlowVersionLocation baseVersionLocation = new FlowVersionLocation(branchIdA, bucketIdA, flowIdA, versionA);
FlowVersionLocation comparedVersionLocation = new FlowVersionLocation(branchIdB, bucketIdB, flowIdB, versionB);
final FlowComparisonEntity versionDifference = serviceFacade.getVersionDifference(registryId, baseVersionLocation, comparedVersionLocation);
// Note: with the current implementation, this is deterministic. However, the internal data structure used in comparison is set, thus
// later changes might cause discrepancies. Practical use of the endpoint usually remains within one "page" though.
return generateOkResponse(limitDifferences(versionDifference, offset, limit))
.type(MediaType.APPLICATION_JSON_TYPE)
.build();
}

@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -2104,6 +2203,24 @@ public Response getVersions(
return generateOkResponse(versionedFlowSnapshotMetadataSetEntity).build();
}

private static FlowComparisonEntity limitDifferences(final FlowComparisonEntity original, final int offset, final int limit) {
final List<ComponentDifferenceDTO> limited = PaginationHelper.paginateByContainedItems(
original.getComponentDifferences(), offset, limit, ComponentDifferenceDTO::getDifferences, FlowResource::limitDifferences);
final FlowComparisonEntity result = new FlowComparisonEntity();
result.setComponentDifferences(new HashSet<>(limited));
return result;
}

private static ComponentDifferenceDTO limitDifferences(final ComponentDifferenceDTO original, final List<DifferenceDTO> partial) {
final ComponentDifferenceDTO result = new ComponentDifferenceDTO();
result.setComponentType(original.getComponentType());
result.setComponentId(original.getComponentId());
result.setComponentName(original.getComponentName());
result.setProcessGroupId(original.getProcessGroupId());
result.setDifferences(partial);
return result;
}

// --------------
// bulletin board
// --------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.util;

/**
* This implementation includes the lower boundary but does not include the higher boundary.
*/
final class ClosedOpenInterval implements Interval {
private final int lowerBoundary;
private final int higherBoundary;

/**
* @param lowerBoundary Inclusive index of lower boundary
* @param higherBoundary Exclusive index of higher boundary. In case of 0, the higher boundary is unspecified and the interval is open.
*/
ClosedOpenInterval(final int lowerBoundary, final int higherBoundary) {
if (lowerBoundary < 0) {
throw new IllegalArgumentException("Lower boundary cannot be negative");
}

if (higherBoundary < 0) {
throw new IllegalArgumentException("Higher boundary cannot be negative");
}

if (higherBoundary <= lowerBoundary && higherBoundary != 0) {
throw new IllegalArgumentException(
"Higher boundary cannot be lower than or equal to lower boundary except when unspecified. Higher boundary is considered unspecified when the value is set to 0"
);
}

this.lowerBoundary = lowerBoundary;
this.higherBoundary = higherBoundary;
}

@Override
public RelativePosition getRelativePositionOf(final int otherIntervalLowerBoundary, final int otherIntervalHigherBoundary) {
if (otherIntervalLowerBoundary < 0) {
throw new IllegalArgumentException("Lower boundary cannot be negative");
}

if (otherIntervalHigherBoundary <= 0) {
// Note: as a design decision the implementation currently does not support comparison with unspecified higher boundary
throw new IllegalArgumentException("Higher boundary must be positive");
}

if (otherIntervalLowerBoundary >= otherIntervalHigherBoundary) {
throw new IllegalArgumentException("Higher boundary must be greater than lower boundary");
}

if (otherIntervalHigherBoundary <= lowerBoundary) {
return RelativePosition.BEFORE;
} else if (otherIntervalLowerBoundary < lowerBoundary && otherIntervalHigherBoundary > higherBoundary && !this.isEndUnspecified()) {
return RelativePosition.EXCEEDS;
} else if (otherIntervalLowerBoundary < lowerBoundary) {
return RelativePosition.TAIL_INTERSECTS;
} else if (otherIntervalHigherBoundary <= higherBoundary || this.isEndUnspecified()) {
return RelativePosition.WITHIN;
} else if (otherIntervalLowerBoundary < higherBoundary) {
return RelativePosition.HEAD_INTERSECTS;
} else {
return RelativePosition.AFTER;
}
}

private boolean isEndUnspecified() {
return higherBoundary == 0;
}
}
Loading

0 comments on commit 0a5be35

Please sign in to comment.