Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Configuration Transformation #4446

Merged
merged 24 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f43aaf8
Adding templates
srikanthjg Apr 5, 2024
7f80316
Added Dynamic yaml transformer
srikanthjg Apr 8, 2024
b549e41
Added Rule evaluator
srikanthjg Apr 8, 2024
769a3ae
Added rule evaluator
srikanthjg Apr 11, 2024
1817b56
Added json walk
srikanthjg Apr 12, 2024
1ba89fb
Add transformation logic
srikanthjg Apr 12, 2024
7ac2c0a
Add dynamic rule
srikanthjg Apr 13, 2024
2060882
Almost working
srikanthjg Apr 14, 2024
fc50983
Adding multiple pipelines part1
srikanthjg Apr 15, 2024
a63961d
Adding multiple pipelines part2-incomplete
srikanthjg Apr 15, 2024
8b877b5
Works e2e for 1 pipeline
srikanthjg Apr 15, 2024
97472c0
Added multi pipeline template and pipelinemodel support, works for do…
srikanthjg Apr 16, 2024
40750e3
added tests for models and fixed beans, one more fix needed for bean
srikanthjg Apr 17, 2024
abe79a1
Fixed IT and beans
srikanthjg Apr 17, 2024
422e3ab
Update bean to have only pipelineDataModel and not parser
srikanthjg Apr 17, 2024
b89e3a9
Add banner
srikanthjg Apr 18, 2024
147cd86
Code cleanup and add comments
srikanthjg Apr 19, 2024
6d4ed2b
Support user pipeline configuration dynamic transformation based on
srikanthjg Apr 5, 2024
a7badaf
Address comments
srikanthjg Apr 22, 2024
8eb6894
Added Function Call support in templates
srikanthjg Apr 24, 2024
b0544f2
Added Function Call support in templates
srikanthjg Apr 24, 2024
5b4825f
Modify documentDB template.
srikanthjg Apr 24, 2024
d04768c
Code clean up
srikanthjg Apr 25, 2024
36b1dba
Code clean up
srikanthjg Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Works e2e for 1 pipeline
Signed-off-by: srigovs <srigovs@amazon.com>
  • Loading branch information
srikanthjg committed Apr 25, 2024
commit 8b877b5d30d332e43587c25b6d057262519e2ec3
2 changes: 1 addition & 1 deletion data-prepper-pipeline-parser/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ group = 'org.opensearch.dataprepper.core'
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.apache.commons:commons-collections4:4.4'
implementation 'org.apache.commons:commons-jexl3:3.2.1'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are not using this. Please remove.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.dataprepper.pipeline.parser.transformer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -21,7 +22,6 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -34,6 +34,10 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme
private final PipelinesDataFlowModel preTransformedPipelinesDataFlowModel;
Pattern placeholderPattern = Pattern.compile("\\{\\{\\s*(.+?)\\s*}}");

// Pattern pipelineNamePlaceholderPattern = Pattern.compile("\\{\\{\\s*(.+?)\\s*}}");
String pipelineNamePlaceholderRegex = "\\{\\{\\s*" + Pattern.quote("pipeline-name") + "\\s*\\}\\}";
String templatePipelineRootString = "templatePipelines";

// Configuration necessary for JsonPath to work with Jackson
Configuration parseConfig = Configuration.builder()
.jsonProvider(new JacksonJsonProvider())
Expand Down Expand Up @@ -64,79 +68,83 @@ public DynamicConfigTransformer(PipelinesDataflowModelParser pipelinesDataflowMo
public PipelinesDataFlowModel transformConfiguration(PipelineTemplateModel templateModel) {
RuleEvaluatorResult ruleEvaluatorResult = ruleEvaluator.isTransformationNeeded(preTransformedPipelinesDataFlowModel);

if (ruleEvaluatorResult.isEvaluatedResult() == false) {
if (ruleEvaluatorResult.isEvaluatedResult() == false ||
ruleEvaluatorResult.getPipelineName() == null) {
return preTransformedPipelinesDataFlowModel;
}

//To differentiate between sub-pipelines.
//To differentiate between sub-pipelines that dont need transformation.
String pipelineNameThatNeedsTransformation = ruleEvaluatorResult.getPipelineName();
try {

String templateJsonString = objectMapper.writeValueAsString(templateModel);

Map<String, PipelineModel> pipelines = preTransformedPipelinesDataFlowModel.getPipelines();
Map<String, PipelineModel> pipelineMap = new HashMap<>();
pipelineMap.put(pipelineNameThatNeedsTransformation,
pipelines.get(pipelineNameThatNeedsTransformation));
String pipelineJson = objectMapper.writeValueAsString(pipelineMap);


String templateJsonStringWithPipelinePlaceholder = objectMapper.writeValueAsString(templateModel);
String templateJsonString = replaceTemplatePipelineName(templateJsonStringWithPipelinePlaceholder,
pipelineNameThatNeedsTransformation);
//find all placeholderPattern in template json string
// K:placeholder , V:jsonPath in templateJson
Map<String, String> placeholdersMap = findPlaceholdersWithPaths(templateJsonString);
validateAllPlaceHoldersFound(placeholdersMap, templateJsonString);
JsonNode templateRootNode = objectMapper.readTree(templateJsonString);


// get exact path in pipelineJson - this is to avoid
// getting array values(even though it might not be an array) given
// a recursive expression like "$..<>"
// K:jsonPath expression V:exactPath(can be string or array)
Map<String, Object> pipelineExactPathMap = findExactPath(placeholdersMap,pipelineJson);
// K:jsonPath, V:exactPath
Map<String, String> pipelineExactPathMap = findExactPath(placeholdersMap, pipelineNameThatNeedsTransformation);


//replace placeholder with actual value in the template context
placeholdersMap.forEach((placeholder, templateJsonPath) -> {
/**
* $..source for example always returns an array irrespective
* of the values source contains.
*/
// String pipelineGenericJsonPath = getValueFromPlaceHolder(placeholder);
// JsonNode pipelineNode1 = pipelineReadContext.read(pipelineJsonPath, JsonNode.class);
// String pipelineExactJsonPath = getExactPath(pipelineGenericJsonPath);

// JsonNode pipelineNode = JsonPath.using(parseConfigWithJsonNode).parse(pipelinesDataFlowModelJsonString).read(pipelineExactJsonPath);

// //TODO --> had changed these 3 lines so that i dont get a ArrayNode on query every time; but this seems to lead to some RuntimeException.
// String parentPath = templateJsonPath.substring(0, templateJsonPath.lastIndexOf('.'));
// String fieldName = templateJsonPath.substring(templateJsonPath.lastIndexOf('.') + 1);
// JsonNode pipelineNode3 = JsonPath.using(parseConfigWithJsonNode).parse(templateRootNode).read(parentPath);

//replace pipelineNode in the template
// replaceNode(templateRootNode, templateJsonPath, pipelineNode);
String pipelineExactJsonPath = pipelineExactPathMap.get(placeholder);
JsonNode pipelineNode = JsonPath.using(parseConfigWithJsonNode).parse(pipelineJson).read(pipelineExactJsonPath);
replaceNode(templateRootNode, templateJsonPath, pipelineNode);
});

//update template json
String transformedJson = objectMapper.writeValueAsString(templateRootNode);

//transform transformedJson to pipelinesModels
// PipelineModel transformedPipelineMode = objectMapper.readValue(transformedJson, PipelineModel.class);
//


// Map<String, PipelineModel> transformedPipelines = new HashMap<>;
// put all except transformedPipelineName
// Copy version and pipelineConfiguration as is.
//
// PipelinesDataFlowModel transformedPipelinesDataFlowModel = createTransformedPipelineDataFlowModel(preTransformedPipelinesDataFlowModel,
// transformedJson,pipelineNameThatNeedsTransformation);
// objectMapper.readValue(transformedJson, PipelinesDataFlowModel.class);
//convert TransformedJson to PipelineModel with the data from preTransformedDataFlowModel.
//transform transformedJson to Map
Map<String,Object> transformedConfigMap = objectMapper.readValue(transformedJson, Map.class);

//TODO
return null;

// get the root of the Transformed Pipeline Model, to get the actual pipelines.
// direct conversion to PipelineDataModel throws exception.
Map<String, PipelineModel> transformedPipelines = (Map<String, PipelineModel>) transformedConfigMap.get(templatePipelineRootString);
pipelines.forEach((pipelineName, pipeline)->{
if(!pipelineName.equals(pipelineNameThatNeedsTransformation)){
transformedPipelines.put(pipelineName,pipeline);
}
});
PipelinesDataFlowModel transformedPipelinesDataFlowModel = new PipelinesDataFlowModel(
preTransformedPipelinesDataFlowModel.getDataPrepperVersion(),
preTransformedPipelinesDataFlowModel.getPipelineExtensions(),
transformedPipelines
);
return transformedPipelinesDataFlowModel;
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private String replaceTemplatePipelineName(String templateJsonStringWithPipelinePlaceholder, String pipelineName) {
return templateJsonStringWithPipelinePlaceholder.replaceAll(pipelineNamePlaceholderRegex, pipelineName);
}

private void validateAllPlaceHoldersFound(Map<String, String> placeholdersMap, String templateJson) {
assert (placeholdersMap.size() == countMatches(templateJson));
}
//
// private String getExactPath(String pipelineJson, String pipelineGenericJsonPath) {
// try {
Expand Down Expand Up @@ -183,41 +191,53 @@ private void populateMapWithPlaceholderPaths(JsonNode currentNode, String curren
}
}

private Map<String, String> findExactPath(String json) throws IOException {

JsonNode rootNode = objectMapper.readTree(json);
/**
*
* @param placeholdersMap
* @param pipelineName
* @return
* @throws IOException
*/
private Map<String, String> findExactPath(Map<String, String> placeholdersMap, String pipelineName) throws IOException {
Map<String, String> mapWithPaths = new HashMap<>();
populateMapWithPlaceholderPaths(rootNode, "", mapWithPaths);
return mapWithPaths;
}

private void populateMapWithExactPath(JsonNode currentNode, String currentPath, Map<String, String> placeholdersWithPaths) {
if (currentNode.isObject()) {
Iterator<Map.Entry<String, JsonNode>> fields = currentNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
String path = currentPath.isEmpty() ? entry.getKey() : currentPath + "." + entry.getKey();
populateMapWithPlaceholderPaths(entry.getValue(), path, placeholdersWithPaths);
}
} else if (currentNode.isArray()) {
for (int i = 0; i < currentNode.size(); i++) {
String path = currentPath + "[" + i + "]";
populateMapWithPlaceholderPaths(currentNode.get(i), path, placeholdersWithPaths);
}
} else if (currentNode.isValueNode()) {
String placeHolderValue = currentNode.asText();
Matcher matcher = placeholderPattern.matcher(placeHolderValue);
if (matcher.find()) {
placeholdersWithPaths.put(placeHolderValue, currentPath);
for (String genericPathPlaceholder: placeholdersMap.keySet()){
String genericPath = getValueFromPlaceHolder(genericPathPlaceholder);
if(genericPath.contains("$.*.")){
String exactPath = genericPath.replace("$.*.","$."+pipelineName+".");
mapWithPaths.put(genericPathPlaceholder,exactPath);
}
}
return mapWithPaths;
}
//
// private void populateMapWithExactPath(JsonNode currentNode, String currentPath, Map<String, String> mapWithPaths) {
// if (currentNode.isObject()) {
// //Iterate all nodes in array
// Iterator<Map.Entry<String, JsonNode>> fields = currentNode.fields();
// while (fields.hasNext()) {
// Map.Entry<String, JsonNode> entry = fields.next();
// String path = currentPath.isEmpty() ? entry.getKey() : currentPath + "." + entry.getKey();
// populateMapWithPlaceholderPaths(entry.getValue(), path, mapWithPaths);
// }
// } else if (currentNode.isArray()) {
// //Iterate all nodes in array
// for (int i = 0; i < currentNode.size(); i++) {
// String path = currentPath + "[" + i + "]";
// populateMapWithPlaceholderPaths(currentNode.get(i), path, mapWithPaths);
// }
// } else if (currentNode.isValueNode()) {
// //Dont do anything if it is a Value Node.
// String placeHolderValue = currentNode.asText();
// Matcher matcher = placeholderPattern.matcher(placeHolderValue);
// if (matcher.find()) {
// mapWithPaths.put(placeHolderValue, currentPath);
// }
// }
// }

private String getValueFromPlaceHolder(String placeholder) {
if (placeholder.length() < 4) {
//TODO
//invalid placeholder
throw new RuntimeException();
throw new RuntimeException("Invalid placeholder value");
}

//remove the first 2 and last 2 characters.
Expand All @@ -244,4 +264,14 @@ public void replaceNode(JsonNode root, String jsonPath, JsonNode newNode) {
throw new PathNotFoundException(e);
}
}

private int countMatches(String json) {
Matcher matcher = placeholderPattern.matcher(json);

int count = 0;
while (matcher.find()) {
count++;
}
return count;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## Configuration Transformation
Supports transformation of configuration from user provided configuration to
a transformed configuration based on template and rules.

## Usage

User give configuration passes through rules, if the rules are valid,
the template for the transformations are dynamically chosen and applied.

**User config**

**Template**

**Rule**

**Expected Transformed Config**


### Assumptions
1. Deep scan or recursive expressions like`$..` is NOT supported. Always use a more specific expression.
In the event specific variables in a path are not known, use wildcards.
2. User could provide multiple pipelines in their user config but
there can be only one pipeline that can support transformation.
3. There cannot be multiple transformations in a single pipeline.
4. `{{ .. }}` is the placeholder in the template.
`{{ pipeline-name }}` is handled differently as compared to other placeholders
as other placeholders are jsonPaths.
Loading