Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Configuration Transformation #4446

Merged
merged 24 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f43aaf8
Adding templates
srikanthjg Apr 5, 2024
7f80316
Added Dynamic yaml transformer
srikanthjg Apr 8, 2024
b549e41
Added Rule evaluator
srikanthjg Apr 8, 2024
769a3ae
Added rule evaluator
srikanthjg Apr 11, 2024
1817b56
Added json walk
srikanthjg Apr 12, 2024
1ba89fb
Add transformation logic
srikanthjg Apr 12, 2024
7ac2c0a
Add dynamic rule
srikanthjg Apr 13, 2024
2060882
Almost working
srikanthjg Apr 14, 2024
fc50983
Adding multiple pipelines part1
srikanthjg Apr 15, 2024
a63961d
Adding multiple pipelines part2-incomplete
srikanthjg Apr 15, 2024
8b877b5
Works e2e for 1 pipeline
srikanthjg Apr 15, 2024
97472c0
Added multi pipeline template and pipelinemodel support, works for do…
srikanthjg Apr 16, 2024
40750e3
added tests for models and fixed beans, one more fix needed for bean
srikanthjg Apr 17, 2024
abe79a1
Fixed IT and beans
srikanthjg Apr 17, 2024
422e3ab
Update bean to have only pipelineDataModel and not parser
srikanthjg Apr 17, 2024
b89e3a9
Add banner
srikanthjg Apr 18, 2024
147cd86
Code cleanup and add comments
srikanthjg Apr 19, 2024
6d4ed2b
Support user pipeline configuration dynamic transformation based on
srikanthjg Apr 5, 2024
a7badaf
Address comments
srikanthjg Apr 22, 2024
8eb6894
Added Function Call support in templates
srikanthjg Apr 24, 2024
b0544f2
Added Function Call support in templates
srikanthjg Apr 24, 2024
5b4825f
Modify documentDB template.
srikanthjg Apr 24, 2024
d04768c
Code clean up
srikanthjg Apr 25, 2024
36b1dba
Code clean up
srikanthjg Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add dynamic rule
Signed-off-by: srigovs <srigovs@amazon.com>
  • Loading branch information
srikanthjg committed Apr 25, 2024
commit 7ac2c0a8261f0eb899b725e4dfc74280133689af
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
package org.opensearch.dataprepper.parser.config;

import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader;
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader;
import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser;
import org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer;
import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;

import javax.inject.Qualifier;

Expand All @@ -37,7 +39,7 @@ public PipelineTransformer pipelineParser(
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory
) {
) {
return new PipelineTransformer(pipelinesDataFlowModel,
pluginFactory,
peerForwarderProvider,
Expand All @@ -63,20 +65,41 @@ public PipelinesDataflowModelParser pipelinesDataflowModelParser(

@Bean
public PipelinesDataFlowModel pipelinesDataFlowModel(
@Qualifier("preTransformedDataFlowModel") PipelinesDataFlowModel preTransformedDataFlowModel,
DynamicConfigTransformer pipelineConfigTransformer,
TransformersFactory transformersFactory) {
transformersFactory.getTemplateModel("documentdb");
return pipelineConfigTransformer.transformConfiguration(preTransformedDataFlowModel, templateModel);
}

@Bean(name = "preTransformedDataFlowModel")
public PipelinesDataFlowModel preTransformedDataFlowModel(
final PipelinesDataflowModelParser pipelinesDataflowModelParser) {
return pipelinesDataflowModelParser.parseConfiguration();
}

// TODO
// @Bean
// public PipelinesDataFlowModel pipelinesDataFlowModel(
// @Qualifier("preTransformedDataFlowModel") final PipelinesDataflowModelParser pipelinesDataflowModelParser) {
// return pipelineTransformer(pipelinesDataflowModelParser);
// public PipelineTemplateModel pipelineTemplateModel(
// final PipelineTransformationPathProvider transformationResourcesPathProvider){
// return transformationResourcesPathProvider.get();
// }

// @Bean
// public PipelineTemplateModel pipelineTemplateModel(
// final PipelineTransformationPathProvider transformationResourcesPathProvider){
// return transformationResourcesPathProvider.getTemplateModel();
// }
//
// @Bean(name = "preTransformedDataFlowModel")
// public PipelinesDataFlowModel preTransformedDataFlowModel(
// final PipelinesDataflowModelParser pipelinesDataflowModelParser) {
// return pipelinesDataflowModelParser.parseConfiguration();
// @Bean
// public PipelineTemplateModel pipelineTemplateModel(
// final PipelineTransformationPathProvider transformationResourcesPathProvider){
// return transformationResourcesPathProvider.getTemplateModel();
// }
//
// @Bean
// public PipelineTransformationPathProvider transformationResourcesPathProvider(
// final PipelineTransformationPathProvider transformationResourcesPathProvider){
// return transformationResourcesPathProvider.getTemplateModel();
// }

}
10 changes: 10 additions & 0 deletions data-prepper-pipeline-parser/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ dependencies {
implementation 'org.projectlombok:lombok:1.18.22'
implementation 'com.jayway.jsonpath:json-path:2.6.0'
implementation 'javax.inject:javax.inject:1'
implementation 'org.testng:testng:7.1.0'
implementation(libs.spring.core) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
srikanthjg marked this conversation as resolved.
Show resolved Hide resolved
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this dependency?


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.dataprepper.pipeline.parser;

import static java.lang.String.format;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,8 +14,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;

public class PipelineConfigurationFileReader implements PipelineConfigurationReader {
private static final Logger LOG = LoggerFactory.getLogger(PipelineConfigurationFileReader.class);
private final String pipelineConfigurationFileLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
public interface PipelineConfigurationReader {

/**
*
* @return a List of InputStream that contains each of the pipeline configurations.
* the caller of this method is responsible for closing these input streams after they are used
* the caller of this method is responsible for closing these input streams after they are used
*/
List<InputStream> getPipelineConfigurationInputStreams();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.opensearch.dataprepper.pipeline.parser;

import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
import org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer;
import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.inject.Named;

@Configuration
public class PipelineTransformationConfiguration {
public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH";
public static final String RULES_DIRECTORY_PATH = "VALIDATORS_DIRECTORY_PATH";

@Bean
@Named(RULES_DIRECTORY_PATH)
static String provideRulesDirectoryPath() {
return "resources/rules";
}

@Bean
@Named(TEMPLATES_DIRECTORY_PATH)
static String provideTemplateDirectoryPath() {
return "resources/templates";
}

@Bean
TransformersFactory transformersFactory(
@Named(TEMPLATES_DIRECTORY_PATH) String provideTransformerDirectoryPath,
@Named(RULES_DIRECTORY_PATH) String provideTemplateDirectoryPath
) {
return new TransformersFactory(RULES_DIRECTORY_PATH, TEMPLATES_DIRECTORY_PATH);
}

@Bean
public DynamicConfigTransformer pipelineConfigTransformer(
RuleEvaluator ruleEvaluator) {
return new DynamicConfigTransformer(ruleEvaluator);
}

@Bean
public RuleEvaluator ruleEvaluator(TransformersFactory transformersFactory) {
return new RuleEvaluator(transformersFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import static java.lang.String.format;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
//import org.opensearch.dataprepper.pipeline.parser.model.RuleConfig;
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
//import org.opensearch.dataprepper.pipeline.parser.rule.RuleParser;
import org.opensearch.dataprepper.pipeline.parser.transformer.DynamicYamlTransformer;
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineConfigurationTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,8 +23,6 @@
import java.util.Objects;
import java.util.stream.Collectors;

import static java.lang.String.format;

public class PipelinesDataflowModelParser {
private static final Logger LOG = LoggerFactory.getLogger(PipelinesDataflowModelParser.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory())
Expand All @@ -47,7 +41,7 @@ public PipelinesDataFlowModel parseConfiguration() {
final List<PipelinesDataFlowModel> pipelinesDataFlowModels = parseStreamsToPipelinesDataFlowModel();
PipelinesDataFlowModel pipelinesDataFlowModel = mergePipelinesDataModels(pipelinesDataFlowModels);

performPipelineConfigurationTransformationIfNeeded(pipelinesDataFlowModel);
// performPipelineConfigurationTransformationIfNeeded(pipelinesDataFlowModel);

return pipelinesDataFlowModel;
}
Expand All @@ -66,64 +60,20 @@ private List<PipelinesDataFlowModel> parseStreamsToPipelinesDataFlowModel() {
.collect(Collectors.toList());
}

private void performPipelineConfigurationTransformationIfNeeded(PipelinesDataFlowModel pipelinesDataFlowModel){

/**
* Step1:
* Check rules folder. get all files.
* Apply all rules in every file.
* Get pluginName of the rules that passed
*
* Step2:
* Based on the rule that passed, we will get the plugin name from that, that needs transformation
*
*/



//check if transformation is required based on rules present in yaml file
RuleEvaluator ruleEvaluator = new RuleEvaluator(pipelinesDataFlowModel);
PipelineConfigurationTransformer configurationTransformer = new DynamicYamlTransformer();

if(ruleEvaluator.isTransformationNeeded()){
String templateJson = ruleEvaluator.getTemplateJsonString();
configurationTransformer.transformConfiguration(pipelinesDataFlowModel,templateJson);
}


// RuleParser ruleParser = new RuleParser();
// RuleEvaluator ruleEvaluator = new RuleEvaluator();
//
//// RuleConfig rule = ruleParser.parseRule(templateFileLocation);
// List<> rules = ruleParser.getRules();
//TODO
// private PipelinesDataFlowModel performPipelineConfigurationTransformationIfNeeded(PipelinesDataFlowModel pipelinesDataFlowModel) {
//
// //for all the rules in the rule folder, check if any one of it matches
// // if so, then perform transformation based on template
// for(RuleConfig rule:rules){
// if (ruleEvaluator.isRuleValid(rule, pipelinesDataFlowModel)) {
// //check if transformation is required based on rules present in yaml file
// RuleEvaluator ruleEvaluator = new RuleEvaluator(pipelinesDataFlowModel);
// PipelineConfigurationTransformer configurationTransformer = new DynamicYamlTransformer();
//
// String templateFileLocation = ruleEvaluator.getTemplateFileLocationForTransformation(rule);

//load template dataFlowModel from templateFileLocation.
// PipelinesDataFlowModel templatePipelineDataModels = getTemplateDataFlowModel(templateFileLocation);

// TODO
// validateTemplateModel()

//transform template dataFlowModel based on pipelineDataFlowModel


//TODO
// final List<PipelinesDataFlowModel> pipelineTemplateDataFlowModels = parseStreamsToTemplateDataFlowModel();

//Transform pipeline configuration
// final List<PipelinesDataFlowModel> transformedPipelinesDataFlowModels = transformConfiguration(pipelinesDataFlowModels, pipelineTemplateDataFlowModels);

// return mergePipelinesDataModels(transformedPipelinesDataFlowModels);
// }
// if (ruleEvaluator.isTransformationNeeded()) {
// PipelineTemplateModel templateModel = ruleEvaluator.getTemplateModel();
// PipelinesDataFlowModel transformedPipelineDataModel = configurationTransformer.transformConfiguration(pipelinesDataFlowModel, templateModel);
// return transformedPipelineDataModel;
// }

}
// return pipelinesDataFlowModel;
// }

private PipelinesDataFlowModel parseStreamToPipelineDataFlowModel(final InputStream configurationInputStream) {
try (final InputStream pipelineConfigurationInputStream = configurationInputStream) {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading