Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Configuration Transformation #4446

Merged
merged 24 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f43aaf8
Adding templates
srikanthjg Apr 5, 2024
7f80316
Added Dynamic yaml transformer
srikanthjg Apr 8, 2024
b549e41
Added Rule evaluator
srikanthjg Apr 8, 2024
769a3ae
Added rule evaluator
srikanthjg Apr 11, 2024
1817b56
Added json walk
srikanthjg Apr 12, 2024
1ba89fb
Add transformation logic
srikanthjg Apr 12, 2024
7ac2c0a
Add dynamic rule
srikanthjg Apr 13, 2024
2060882
Almost working
srikanthjg Apr 14, 2024
fc50983
Adding multiple pipelines part1
srikanthjg Apr 15, 2024
a63961d
Adding multiple pipelines part2-incomplete
srikanthjg Apr 15, 2024
8b877b5
Works e2e for 1 pipeline
srikanthjg Apr 15, 2024
97472c0
Added multi pipeline template and pipelinemodel support, works for do…
srikanthjg Apr 16, 2024
40750e3
added tests for models and fixed beans, one more fix needed for bean
srikanthjg Apr 17, 2024
abe79a1
Fixed IT and beans
srikanthjg Apr 17, 2024
422e3ab
Update bean to have only pipelineDataModel and not parser
srikanthjg Apr 17, 2024
b89e3a9
Add banner
srikanthjg Apr 18, 2024
147cd86
Code cleanup and add comments
srikanthjg Apr 19, 2024
6d4ed2b
Support user pipeline configuration dynamic transformation based on
srikanthjg Apr 5, 2024
a7badaf
Address comments
srikanthjg Apr 22, 2024
8eb6894
Added Function Call support in templates
srikanthjg Apr 24, 2024
b0544f2
Added Function Call support in templates
srikanthjg Apr 24, 2024
5b4825f
Modify documentDB template.
srikanthjg Apr 24, 2024
d04768c
Code clean up
srikanthjg Apr 25, 2024
36b1dba
Code clean up
srikanthjg Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Code cleanup and add comments
Signed-off-by: srigovs <srigovs@amazon.com>
  • Loading branch information
srikanthjg committed Apr 25, 2024
commit 147cd86baf317802c7fdf336700246492fc6b5c6
3 changes: 2 additions & 1 deletion data-prepper-pipeline-parser/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ dependencies {
implementation 'org.projectlombok:lombok:1.18.22'
implementation 'com.jayway.jsonpath:json-path:2.6.0'
implementation 'javax.inject:javax.inject:1'
// implementation 'org.testng:testng:7.1.0'
implementation(libs.spring.core) {
exclude group: 'commons-logging', module: 'commons-logging'
}
Expand All @@ -28,6 +27,8 @@ dependencies {
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
srikanthjg marked this conversation as resolved.
Show resolved Hide resolved
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this dependency?

testImplementation 'org.assertj:assertj-core:3.20.2'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remove this. There is one import to change to Hamcrest and then we can remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hamcrest does not seem to support usingRecursiveComparison.


compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,9 @@ public PipelinesDataflowModelParser(final PipelineConfigurationReader pipelineCo
}


//TODO
// move transformation code after user pipeline validation in pipelineTransformer.java
public PipelinesDataFlowModel parseConfiguration() {
final List<PipelinesDataFlowModel> pipelinesDataFlowModels = parseStreamsToPipelinesDataFlowModel();
PipelinesDataFlowModel pipelinesDataFlowModel = mergePipelinesDataModels(pipelinesDataFlowModels);

// performPipelineConfigurationTransformationIfNeeded(pipelinesDataFlowModel);

return pipelinesDataFlowModel;
}

Expand All @@ -60,21 +55,6 @@ private List<PipelinesDataFlowModel> parseStreamsToPipelinesDataFlowModel() {
.collect(Collectors.toList());
}

//TODO
// private PipelinesDataFlowModel performPipelineConfigurationTransformationIfNeeded(PipelinesDataFlowModel pipelinesDataFlowModel) {
//
// //check if transformation is required based on rules present in yaml file
// RuleEvaluator ruleEvaluator = new RuleEvaluator(pipelinesDataFlowModel);
// PipelineConfigurationTransformer configurationTransformer = new DynamicYamlTransformer();
//
// if (ruleEvaluator.isTransformationNeeded()) {
// PipelineTemplateModel templateModel = ruleEvaluator.getTemplateModel();
// PipelinesDataFlowModel transformedPipelineDataModel = configurationTransformer.transformConfiguration(pipelinesDataFlowModel, templateModel);
// return transformedPipelineDataModel;
// }
// return pipelinesDataFlowModel;
// }

private PipelinesDataFlowModel parseStreamToPipelineDataFlowModel(final InputStream configurationInputStream) {
try (final InputStream pipelineConfigurationInputStream = configurationInputStream) {
final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(pipelineConfigurationInputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,68 @@ User provided configuration passes through rules, if the rules are valid,
the template for the transformations are dynamically chosen and applied.

**User config**
```aidl
simple-pipeline:
source:
someSource:
hostname: "database.example.com"
port: "27017"
sink:
- opensearch:
hosts: ["https://search-service.example.com"]
index: "my_index"

```

**Template**
```aidl
"<<pipeline-name>>-transformed":
source: "<<$.*.someSource>>"
sink:
- opensearch:
hosts: "<<$.*.sink[?(@.opensearch)].opensearch.hosts>>"
port: "<<$.*.someSource.documentdb.port>>"
index: "<<$.*.sink[0].opensearch.index>>"
aws:
sts_role_arn: "arn123"
region: "us-test-1"
dlq:
s3:
bucket: "test-bucket"
```

**Rule**
```
apply_when:
- "$..source.someSource"
```

**Expected Transformed Config**

```aidl
simple-pipeline-transformed:
source:
someSource:
hostname: "database.example.com"
port: "27017"
sink:
- opensearch:
hosts: ["https://search-service.example.com"]
port: "27017"
index: "my_index"
aws:
sts_role_arn: "arn123"
region: "us-test-1"
dlq:
s3:
bucket: "test-bucket"
```

### Assumptions
1. Deep scan or recursive expressions like`$..` is NOT supported. Always use a more specific expression.
1. In the template definition, Deep scan or recursive expressions like`$..` is NOT supported. Always use a more specific expression.
In the event specific variables in a path are not known, use wildcards.
2. User could provide multiple pipelines in their user config but
there can be only one pipeline that can support transformation.
3. There cannot be multiple transformations in a single pipeline.
4. `{{$ .. }}` is the placeholder in the template. //TODO
`{{ pipeline-name }}` is handled differently as compared to other placeholders
4. `<<$ .. >>` is the placeholder in the template. //TODO
`<< pipeline-name >>` is handled differently as compared to other placeholders
as other placeholders are jsonPaths.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel;
import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
Expand All @@ -27,6 +29,7 @@

public class RuleEvaluator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name RuleEvaluator is quite broad. Maybe we can name this TransformationRuleEvaluator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, make this an interface. Then implement in a single implementation. Perhaps DocumentDbRuleEvaluator (since the current on is focused on that only).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address this comment in the subsequent pr.


private static final Logger LOG = LoggerFactory.getLogger(RuleEvaluator.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
private final TransformersFactory transformersFactory;
Expand All @@ -41,34 +44,36 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin
}

/**
* Evaluates based on rules if it is docDB and needs transformation.
* Assumption: only one pipeline can have transformation.
* Evaluates model based on pre defined rules and
* result contains the name of the pipeline that will need transformation,
* evaluated boolean result and the corresponding template model
* Assumption: only one pipeline can have transformation.
* @param pipelinesModel
* @return
* @return RuleEvaluatorResult
*/
private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) {
//TODO
//dynamically find pluginName Needed for transformation based on rule that applies
PLUGIN_NAME = "documentdb";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a temporary implementation, will move towards scanning folder and fetching and applying rule file dynamically.

String pluginRulesPath = transformersFactory.getPluginRuleFileLocation(PLUGIN_NAME);
Map<String, PipelineModel> pipelines = pipelinesModel.getPipelines();

for (Map.Entry<String, PipelineModel> entry : pipelines.entrySet()) {
try {
String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry);
if(evaluate(pipelineJson, pluginRulesPath)) {
if (evaluate(pipelineJson, pluginRulesPath)) {
String templateFilePath = transformersFactory.getPluginTemplateFileLocation(PLUGIN_NAME);
PipelineTemplateModel templateModel = yamlMapper.readValue(new File(templateFilePath),
PipelineTemplateModel.class);
return RuleEvaluatorResult.builder()
.withEvaluatedResult(true)
.withPipelineTemplateModel(templateModel)
.withPipelineName(entry.getKey()) // TODO naming
.withPipelineName(entry.getKey())
.build();
}
} catch (JsonProcessingException e) {
LOG.error("Error processing json");
throw new RuntimeException(e);
} catch (IOException e) {
LOG.error("Error reading file");
throw new RuntimeException(e);
}
}
Expand All @@ -82,7 +87,6 @@ private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel)
private Boolean evaluate(String pipelinesJson,
String rulePath) {

// ReadContext readPathContext = null;
Configuration parseConfig = Configuration.builder()
.jsonProvider(new JacksonJsonProvider())
.mappingProvider(new JacksonMappingProvider())
Expand All @@ -98,12 +102,10 @@ private Boolean evaluate(String pipelinesJson,
Object result = readPathContext.read(rule);
}
} catch (IOException e) {
//TODO --> this is failing the integ tests in core
// reason - transformer not injected correctly
// throw new RuntimeException(format("error reading file %s.",rulePath));

LOG.warn("Error reading file {}", rulePath);
return false;
} catch (PathNotFoundException e) {
LOG.warn("Path not found {}", rulePath);
return false;
}
return true;
Expand Down
Loading