Skip to content

Commit

Permalink
KYLO-3125 fix controller service ref on feed import
Browse files Browse the repository at this point in the history
  • Loading branch information
scottreisdorf committed Nov 27, 2018
1 parent 1567466 commit e7a98fb
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.thinkbiganalytics.nifi.rest.client.NifiClientRuntimeException;
import com.thinkbiganalytics.nifi.rest.client.NifiComponentNotFoundException;
import com.thinkbiganalytics.nifi.rest.model.NiFiAllowableValue;
import com.thinkbiganalytics.nifi.rest.model.NiFiPropertyDescriptor;
import com.thinkbiganalytics.nifi.rest.model.NiFiPropertyDescriptorTransform;
import com.thinkbiganalytics.nifi.rest.model.NifiError;
import com.thinkbiganalytics.nifi.rest.model.NifiProcessGroup;
Expand All @@ -49,6 +50,7 @@
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,6 +102,8 @@ public class TemplateCreationHelper {

private List<NifiProperty> templateProperties;

private List<NifiProperty> originalFeedProperties;

public TemplateCreationHelper(LegacyNifiRestClient restClient) {
this.restClient = restClient;
this.nifiRestClient = restClient.getNiFiRestClient();
Expand Down Expand Up @@ -134,6 +138,14 @@ public void setTemplateProperties(List<NifiProperty> templateProperties) {
this.templateProperties = templateProperties;
}

public List<NifiProperty> getOriginalFeedProperties() {
return originalFeedProperties;
}

public void setOriginalFeedProperties(List<NifiProperty> originalFeedProperties) {
this.originalFeedProperties = originalFeedProperties;
}

/**
* Creates an instance of the supplied template under the temporary inspection group inside its own process group
*
Expand Down Expand Up @@ -378,7 +390,7 @@ private void mergeControllerServices(TemplateInstance templateInstance) {

mergedControllerServices = map;
//merge back in the map of old cs id to the good root cs id
if(templateInstance != null) {
if (templateInstance != null) {
mergedControllerServices.putAll(templateInstance.getCreatedScopeToRootMap().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().getId(), e -> e.getValue())));
}
//validate
Expand All @@ -397,6 +409,38 @@ private boolean hasMatchingService(Map<String, List<ControllerServiceDTO>> nameM
return nameMap.containsKey(name) && !nameMap.get(name).isEmpty();
}

public String getProcessorNameTypeKey(ProcessorDTO dto, String key) {
return dto.getName() + "-" + dto.getType() + "-" + key;
}

private boolean updateProcessorControllerServiceReference(ProcessorDTO processorDTO, PropertyDescriptorDTO propertyDescriptorDTO, NifiProperty matchingProperty,
Set<ProcessorDTO> updatedProcessors, Map<String, String> updatedProcessorProperties) {
NiFiPropertyDescriptor descriptor = matchingProperty.getPropertyDescriptor();
String propertyKey = matchingProperty.getKey();
if (matchingProperty != null && descriptor != null && descriptor.getAllowableValues() != null) {
NiFiAllowableValue matchingValue = descriptor.getAllowableValues().stream()
.filter(niFiAllowableValue -> niFiAllowableValue.getValue().equalsIgnoreCase(matchingProperty.getValue())).findFirst().orElse(null);
if (matchingValue != null) {
String name = matchingValue.getDisplayName();
String
validControllerServiceId = hasMatchingService(enabledServiceNameMap, name) ? enabledServiceNameMap.get(name).get(0).getId()
: hasMatchingService(serviceNameMap, name) ? serviceNameMap.get(name).get(0).getId()
: null;

if (StringUtils.isNotBlank(validControllerServiceId) && (propertyDescriptorDTO.isRequired() || !propertyDescriptorDTO.isRequired() && StringUtils
.isNotBlank(processorDTO.getConfig().getProperties().get(propertyKey)))) {
processorDTO.getConfig().getProperties().put(propertyKey, validControllerServiceId);
updatedProcessorProperties.put(propertyKey, validControllerServiceId);
if (!updatedProcessors.contains(processorDTO)) {
updatedProcessors.add(processorDTO);
}
return true;
}
}
}
return false;
}


private List<ProcessorDTO> reassignControllerServiceIds(List<ProcessorDTO> processors, TemplateInstance instance) {

Expand All @@ -406,38 +450,33 @@ private List<ProcessorDTO> reassignControllerServiceIds(List<ProcessorDTO> proce
Map<String, String> updatedProcessorProperties = new HashMap<>();
processorDTO.getConfig().getDescriptors().forEach((k, v) -> {
if (v.getIdentifiesControllerService() != null) {

boolean idsMatch = getMergedControllerServices().keySet().stream().anyMatch(id -> id.equalsIgnoreCase(processorDTO.getConfig().getProperties().get(k)));
if (!idsMatch && templateProperties != null && !templateProperties.isEmpty()) {

String value = processorDTO.getConfig().getProperties().get(k);

boolean match = getMergedControllerServices().keySet().stream().anyMatch(id -> id.equalsIgnoreCase(value));

if (!match && this.originalFeedProperties != null) {
//try to find an existing controller service that matches the name of the one supplied by this feed
//find the cs name assigned to this feed property
NifiProperty
originalProperty =
this.originalFeedProperties.stream().filter(p -> p.getProcessorNameTypeKey().equalsIgnoreCase(getProcessorNameTypeKey(processorDTO, k))).findFirst().orElse(null);
if (originalProperty != null) {
match = updateProcessorControllerServiceReference(processorDTO, v, originalProperty, updatedProcessors, updatedProcessorProperties);
}
}
if (!match && templateProperties != null && !templateProperties.isEmpty()) {
//attempt to match and set via template properties
NifiProperty matchingProperty = templateProperties.stream().filter(
p -> p.getKey().equalsIgnoreCase(k) && p.getProcessorName().equalsIgnoreCase(processorDTO.getName()) && v.getIdentifiesControllerService()
.equalsIgnoreCase(p.getPropertyDescriptor().getIdentifiesControllerService())
).findFirst().orElse(null);
if (matchingProperty != null && matchingProperty.getPropertyDescriptor() != null && matchingProperty.getPropertyDescriptor().getAllowableValues() != null) {
NiFiAllowableValue matchingValue = matchingProperty.getPropertyDescriptor().getAllowableValues().stream()
.filter(niFiAllowableValue -> niFiAllowableValue.getValue().equalsIgnoreCase(matchingProperty.getValue())).findFirst().orElse(null);
if (matchingValue != null) {
String name = matchingValue.getDisplayName();
String
validControllerServiceId = hasMatchingService(enabledServiceNameMap, name) ? enabledServiceNameMap.get(name).get(0).getId()
: hasMatchingService(serviceNameMap, name) ? serviceNameMap.get(name).get(0).getId()
: null;

if (StringUtils.isNotBlank(validControllerServiceId) && (v.isRequired() || !v.isRequired() && StringUtils
.isNotBlank(processorDTO.getConfig().getProperties().get(k)))) {
processorDTO.getConfig().getProperties().put(k, validControllerServiceId);
updatedProcessorProperties.put(k, validControllerServiceId);
if (!updatedProcessors.contains(processorDTO)) {
updatedProcessors.add(processorDTO);
}
}
}
if (matchingProperty != null) {
match = updateProcessorControllerServiceReference(processorDTO, v, matchingProperty, updatedProcessors, updatedProcessorProperties);
}

}
//if we havent made a match attempt to see if the cs was removed
if (!updatedProcessorProperties.containsKey(k) && !idsMatch && instance != null) {
String value = processorDTO.getConfig().getProperties().get(k);
if (!updatedProcessorProperties.containsKey(k) && !match && instance != null) {
//find the correct reference from that was removed due to a matching service
ControllerServiceDTO controllerServiceDTO = instance.findMatchingControllerService(value);
if (controllerServiceDTO != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public class CreateFeedBuilder {

private Long timestamp;

/**
* a snapshot of the feed properties prior to being merged with the template
* This is so we can reference feed values and determine if a different service should be used
*/
private List<NifiProperty> originalFeedProperties;

protected CreateFeedBuilder(LegacyNifiRestClient restClient, NifiFlowCache nifiFlowCache, FeedMetadata feedMetadata, String templateId, PropertyExpressionResolver propertyExpressionResolver,
NiFiPropertyDescriptorTransform propertyDescriptorTransform, NiFiObjectCache niFiObjectCache, TemplateConnectionUtil templateConnectionUtil) {
Expand Down Expand Up @@ -181,6 +186,11 @@ public CreateFeedBuilder autoAlign(boolean autoAlign) {
return this;
}

public CreateFeedBuilder setOriginalFeedProperties(List<NifiProperty> originalFeedProperties){
this.originalFeedProperties = originalFeedProperties;
return this;
}

/**
* Adds the specified Input Port and Output Port connection to this feed.
*
Expand Down Expand Up @@ -261,6 +271,7 @@ public NifiProcessGroup build() throws FeedCreationException {
//snapshot the existing controller services
eventTime.start();
templateCreationHelper.snapshotControllerServiceReferences();
templateCreationHelper.setOriginalFeedProperties(this.originalFeedProperties);
log.debug("Time to snapshotControllerServices. ElapsedTime: {} ms", eventTime(eventTime));

//create the flow from the template
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ private NifiFeed createAndSaveFeed(final FeedMetadata feedMetadata) {
.newFeed(nifiRestClient, nifiFlowCache, feedToSave, registeredTemplate.getNifiTemplateId(), propertyExpressionResolver, propertyDescriptorTransform, niFiObjectCache,
templateConnectionUtil)
.enabled(enabled)
.setOriginalFeedProperties(originalFeedProperties)
.removeInactiveVersionedProcessGroup(removeInactiveNifiVersionedFeedFlows)
.autoAlign(nifiAutoFeedsAlignAfterSave)
.withNiFiTemplateCache(niFiTemplateCache);
Expand Down Expand Up @@ -1310,6 +1311,7 @@ private NifiFeed deployFeed(final FeedMetadata feedMetadata, com.thinkbiganalyti
niFiObjectCache,
templateConnectionUtil)
.enabled(enabled)
.setOriginalFeedProperties(originalFeedProperties)
.removeInactiveVersionedProcessGroup(removeInactiveNifiVersionedFeedFlows)
.autoAlign(nifiAutoFeedsAlignAfterSave)
.withNiFiTemplateCache(niFiTemplateCache);
Expand Down

0 comments on commit e7a98fb

Please sign in to comment.