Skip to content

Commit

Permalink
NIFI-11706 Add option to create dedicated Parameter Contexts for Impo…
Browse files Browse the repository at this point in the history
…rted Flows

This closes apache#7401

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
simonbence authored and exceptionfactory committed Jun 30, 2023
1 parent 5584a1b commit cadf2fb
Show file tree
Hide file tree
Showing 13 changed files with 597 additions and 64 deletions.
Binary file modified nifi-docs/src/main/asciidoc/images/import-version-dialog.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions nifi-docs/src/main/asciidoc/user-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2440,6 +2440,10 @@ image::import-version-dialog.png["Import Version Dialog"]

Connected registries will appear as options in the Registry drop-down menu. For the chosen Registry, buckets the user has access to will appear as options in the Bucket drop-down menu. The names of the flows in the chosen bucket will appear as options in the Name drop-down menu. Select the desired version of the flow to import and select "Import" for the dataflow to be placed on the canvas.

The import also provides the option to keep or replace existing Parameter Contexts based on name. Keeping the Parameter Contexts (which is the default behaviour) will use the existing Contexts if Contexts with the same name already exists, resulting shared parameter sets between multiple imports.

Unchecking the checkbox named "Keep Existing Parameter Contexts" will result the creation of a new set of Parameter Contexts for the import, making it completely independent of the existing imports. The parameter values of these new Contexts will be set based on the content of the Registry Snapshot.

image::versioned-flow-imported.png["Versioned Flow Imported"]

Since the version imported in this example is the latest version (MySQL CDC, Version 3), the state of the versioned process group is "Up to date" (image:iconUpToDate.png["Up To Date Icon"]). If the version imported had been an older version, the state would be "Stale" (image:iconStale.png["Stale Icon"]).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;

public enum ParameterContextHandlingStrategy {
KEEP_EXISTING, REPLACE
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -225,6 +226,11 @@ public FlowSnapshotContainer getFlowContents(
if (fetchRemoteFlows) {
final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
for (final VersionedProcessGroup child : contents.getProcessGroups()) {
final Map<String, VersionedParameterContext> childParameterContexts = populateVersionedContentsRecursively(context, child, snapshotContainer);

for (final Map.Entry<String, VersionedParameterContext> childParameterContext : childParameterContexts.entrySet()) {
flowSnapshot.getParameterContexts().putIfAbsent(childParameterContext.getKey(), childParameterContext.getValue());
}
populateVersionedContentsRecursively(context, child, snapshotContainer);
}
}
Expand Down Expand Up @@ -297,10 +303,15 @@ private String extractIdentity(final FlowRegistryClientUserContext context) {
return context.getNiFiUserIdentity().orElse(null);
}

private void populateVersionedContentsRecursively(final FlowRegistryClientUserContext context, final VersionedProcessGroup group,
final FlowSnapshotContainer snapshotContainer) throws FlowRegistryException {
private Map<String, VersionedParameterContext> populateVersionedContentsRecursively(
final FlowRegistryClientUserContext context,
final VersionedProcessGroup group,
final FlowSnapshotContainer snapshotContainer
) throws FlowRegistryException {
Map<String, VersionedParameterContext> accumulatedParameterContexts = new HashMap<>();

if (group == null) {
return;
return accumulatedParameterContexts;
}

final VersionedFlowCoordinates coordinates = group.getVersionedFlowCoordinates();
Expand Down Expand Up @@ -330,12 +341,23 @@ private void populateVersionedContentsRecursively(final FlowRegistryClientUserCo
group.setLogFileSuffix(contents.getLogFileSuffix());
coordinates.setLatest(snapshot.isLatest());

for (final Map.Entry<String, VersionedParameterContext> parameterContext : snapshot.getParameterContexts().entrySet()) {
accumulatedParameterContexts.put(parameterContext.getKey(), parameterContext.getValue());
}

snapshotContainer.addChildSnapshot(snapshot, group);
}

for (final VersionedProcessGroup child : group.getProcessGroups()) {
populateVersionedContentsRecursively(context, child, snapshotContainer);
final Map<String, VersionedParameterContext> childParameterContexts = populateVersionedContentsRecursively(context, child, snapshotContainer);

for (final Map.Entry<String, VersionedParameterContext> childParameterContext : childParameterContexts.entrySet()) {
// We favor the context instance from the enclosing versioned flow
accumulatedParameterContexts.putIfAbsent(childParameterContext.getKey(), childParameterContext.getValue());
}
}

return accumulatedParameterContexts;
}

private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,57 @@
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
Expand Down Expand Up @@ -73,6 +124,7 @@
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ParameterContextHandlingStrategy;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
Expand Down Expand Up @@ -123,6 +175,7 @@
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.apache.nifi.web.util.ParameterContextReplacer;
import org.apache.nifi.web.util.Pause;
import org.apache.nifi.xml.processing.stream.StandardXMLStreamReaderProvider;
import org.apache.nifi.xml.processing.stream.XMLStreamReaderProvider;
Expand All @@ -132,58 +185,6 @@
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stream.StreamSource;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* RESTful endpoint for managing a Group.
*/
Expand All @@ -205,6 +206,7 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
private ConnectionResource connectionResource;
private TemplateResource templateResource;
private ControllerServiceResource controllerServiceResource;
private ParameterContextReplacer parameterContextReplacer;

private final ConcurrentMap<String, VariableRegistryUpdateRequest> varRegistryUpdateRequests = new ConcurrentHashMap<>();
private static final int MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS = 100;
Expand Down Expand Up @@ -1969,8 +1971,16 @@ public Response createProcessGroup(
@ApiParam(
value = "The process group configuration details.",
required = true
) final ProcessGroupEntity requestProcessGroupEntity) {

)
final ProcessGroupEntity requestProcessGroupEntity,
@ApiParam(
value = "Handling Strategy controls whether to keep or replace Parameter Contexts",
defaultValue = "KEEP_EXISTING"
)
@QueryParam("parameterContextHandlingStrategy")
@DefaultValue("KEEP_EXISTING")
final ParameterContextHandlingStrategy parameterContextHandlingStrategy
) {
if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("Process group details must be specified.");
}
Expand Down Expand Up @@ -2024,7 +2034,12 @@ public Response createProcessGroup(
}
}

// Step 4: Resolve Bundle info
// Step 4: Replace parameter contexts if necessary
if (ParameterContextHandlingStrategy.REPLACE.equals(parameterContextHandlingStrategy)) {
parameterContextReplacer.replaceParameterContexts(flowSnapshot, serviceFacade.getParameterContexts());
}

// Step 5: Resolve Bundle info
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());

// If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to.
Expand All @@ -2033,7 +2048,7 @@ public Response createProcessGroup(
// If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to.
serviceFacade.resolveParameterProviders(flowSnapshot, NiFiUserUtils.getNiFiUser());

// Step 5: Update contents of the ProcessGroupDTO passed in to include the components that need to be added.
// Step 6: Update contents of the ProcessGroupDTO passed in to include the components that need to be added.
requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot);
}

Expand All @@ -2042,7 +2057,7 @@ public Response createProcessGroup(
serviceFacade.verifyImportProcessGroup(versionControlInfo, flowSnapshot.getFlowContents(), groupId);
}

// Step 6: Replicate the request or call serviceFacade.updateProcessGroup
// Step 7: Replicate the request or call serviceFacade.updateProcessGroup
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestProcessGroupEntity);
} else if (isDisconnectedFromCluster()) {
Expand Down Expand Up @@ -4787,6 +4802,10 @@ public void setControllerServiceResource(ControllerServiceResource controllerSer
this.controllerServiceResource = controllerServiceResource;
}

public void setParameterContextReplacer(ParameterContextReplacer parameterContextReplacer) {
this.parameterContextReplacer = parameterContextReplacer;
}

private static class DropEntity extends Entity {
final String entityId;
final String dropRequestId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.util;

import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.entity.ParameterContextEntity;

import java.util.Collection;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

class ParameterContextNameCollisionResolver {
private static final String PATTERN_GROUP_NAME = "name";
private static final String PATTERN_GROUP_INDEX = "index";

private static final String LINEAGE_FORMAT = "^(?<" + PATTERN_GROUP_NAME + ">.+?)( \\((?<" + PATTERN_GROUP_INDEX + ">[0-9]+)\\))?$";
private static final Pattern LINEAGE_PATTERN = Pattern.compile(LINEAGE_FORMAT);

private static final String NAME_FORMAT = "%s (%d)";

public String resolveNameCollision(final String originalParameterContextName, final Collection<ParameterContextEntity> existingContexts) {
final Matcher lineageMatcher = LINEAGE_PATTERN.matcher(originalParameterContextName);

if (!lineageMatcher.matches()) {
throw new IllegalArgumentException("Existing Parameter Context name \"(" + originalParameterContextName + "\") cannot be processed");
}

final String lineName = lineageMatcher.group(PATTERN_GROUP_NAME);
final String originalIndex = lineageMatcher.group(PATTERN_GROUP_INDEX);

// Candidates cannot be cached because new context might be added between calls
final Set<ParameterContextDTO> candidates = existingContexts
.stream()
.map(pc -> pc.getComponent())
.filter(dto -> dto.getName().startsWith(lineName))
.collect(Collectors.toSet());

int biggestIndex = (originalIndex == null) ? 0 : Integer.valueOf(originalIndex);

for (final ParameterContextDTO candidate : candidates) {
final Matcher matcher = LINEAGE_PATTERN.matcher(candidate.getName());

if (matcher.matches() && lineName.equals(matcher.group(PATTERN_GROUP_NAME))) {
final String indexGroup = matcher.group(PATTERN_GROUP_INDEX);

if (indexGroup != null) {
int biggestIndexCandidate = Integer.valueOf(indexGroup);

if (biggestIndexCandidate > biggestIndex) {
biggestIndex = biggestIndexCandidate;
}
}
}
}

return String.format(NAME_FORMAT, lineName, biggestIndex + 1);
}
}
Loading

0 comments on commit cadf2fb

Please sign in to comment.