Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into feature-httpsink-skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
mallikagogoi7 authored Jun 30, 2023
2 parents 86b909d + 0d29418 commit 69d16cc
Show file tree
Hide file tree
Showing 68 changed files with 3,008 additions and 490 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ subprojects {
}
} else if (details.requested.group == 'log4j' && details.requested.name == 'log4j') {
details.useTarget group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.17.1'
} else if (details.requested.group == 'org.xerial.snappy' && details.requested.name == 'snappy-java') {
details.useTarget group: 'org.xerial.snappy', name: 'snappy-java', version: '1.1.10.1'
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
@JsonDeserialize(using = SinkModel.SinkModelDeserializer.class)
public class SinkModel extends PluginModel {

SinkModel(final String pluginName, final List<String> routes, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, pluginSettings));
SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, pluginSettings));
}

private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
Expand All @@ -46,18 +46,30 @@ public Collection<String> getRoutes() {
return this.<SinkInternalJsonModel>getInternalJsonModel().routes;
}

/**
* Gets the tags target key associated with this Sink.
*
* @return The tags target key
* @since 2.4
*/
public String getTagsTargetKey() {
return this.<SinkInternalJsonModel>getInternalJsonModel().tagsTargetKey;
}

public static class SinkModelBuilder {

private final PluginModel pluginModel;
private final List<String> routes;
private final String tagsTargetKey;

private SinkModelBuilder(final PluginModel pluginModel) {
this.pluginModel = pluginModel;
this.routes = Collections.emptyList();
this.tagsTargetKey = null;
}

public SinkModel build() {
return new SinkModel(pluginModel.getPluginName(), routes, pluginModel.getPluginSettings());
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, pluginModel.getPluginSettings());
}
}

Expand All @@ -70,21 +82,27 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
@JsonProperty("routes")
private final List<String> routes;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("tags_target_key")
private final String tagsTargetKey;

@JsonCreator
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes) {
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey) {
super();
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}

private SinkInternalJsonModel(final List<String> routes, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
super(pluginSettings);
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}
}

static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
SinkModelDeserializer() {
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null));
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,19 +373,9 @@ private String trimKey(final String key) {
}

private boolean isValidKey(final String key) {
char previous = ' ';
char next = ' ';
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);

if (i < key.length() - 1) {
next = key.charAt(i + 1);
}

if ((i == 0 || i == key.length() - 1 || previous == '/' || next == '/') && (c == '_' || c == '.' || c == '-')) {
return false;
}

if (!(c >= 48 && c <= 57
|| c >= 65 && c <= 90
|| c >= 97 && c <= 122
Expand All @@ -397,7 +387,6 @@ private boolean isValidKey(final String key) {

return false;
}
previous = c;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.plugin;

import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

import java.util.List;
Expand All @@ -27,6 +28,18 @@ public interface PluginFactory {
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting);

/**
* Loads a new instance of a plugin with SinkContext.
*
* @param baseClass The class type that the plugin is supporting.
* @param pluginSetting The {@link PluginSetting} to configure this plugin
* @param sinkContext The {@link SinkContext} to configure this plugin
* @param <T> The type
* @return A new instance of your plugin, configured
* @since 1.2
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final SinkContext sinkContext);

/**
* Loads a specified number of plugin instances. The total number of instances is provided
* by the numberOfInstancesFunction.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import java.util.Collection;

/**
* Data Prepper Sink Context class. This the class for keeping global
* sink configuration as context so that individual sinks may use them.
*/
public class SinkContext {
private final String tagsTargetKey;
private final Collection<String> routes;

public SinkContext(final String tagsTargetKey, final Collection<String> routes) {
this.tagsTargetKey = tagsTargetKey;
this.routes = routes;
}

/**
* returns the target key name for tags if configured for a given sink
* @return tags target key
*/
public String getTagsTargetKey() {
return tagsTargetKey;
}

/**
* returns routes if configured for a given sink
* @return routes
*/
public Collection<String> getRoutes() {
return routes;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Expand All @@ -72,7 +72,7 @@ void testSerializing_PipelinesDataFlowModel_with_Version() throws JsonProcessing
final DataPrepperVersion version = DataPrepperVersion.parse("2.0");
final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(version, Collections.singletonMap(pipelineName, pipelineModel));
Expand All @@ -93,7 +93,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> preppers = Collections.singletonList(new PluginModel("testPrepper", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, Collections.singletonList(new ConditionalRoute("my-route", "/a==b")), sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
Expand Down Expand Up @@ -74,13 +75,15 @@ void serialize_into_known_SinkModel() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
pluginSettings.put("key1", "value1");
pluginSettings.put("key2", "value2");
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), pluginSettings);
final String tagsTargetKey = "tags";
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), tagsTargetKey, pluginSettings);

final String actualJson = objectMapper.writeValueAsString(sinkModel);

final String expectedJson = createStringFromInputStream(this.getClass().getResourceAsStream("sink_plugin.yaml"));

assertThat("---\n" + actualJson, equalTo(expectedJson));
assertThat(sinkModel.getTagsTargetKey(), equalTo(tagsTargetKey));
}

@Test
Expand All @@ -93,7 +96,8 @@ void deserialize_with_any_pluginModel() throws IOException {
assertAll(
() -> assertThat(sinkModel.getPluginName(), equalTo("customPlugin")),
() -> assertThat(sinkModel.getPluginSettings(), notNullValue()),
() -> assertThat(sinkModel.getRoutes(), notNullValue())
() -> assertThat(sinkModel.getRoutes(), notNullValue()),
() -> assertThat(sinkModel.getTagsTargetKey(), nullValue())
);
assertAll(
() -> assertThat(sinkModel.getPluginSettings().size(), equalTo(3)),
Expand Down Expand Up @@ -123,7 +127,7 @@ void serialize_with_just_pluginModel() throws IOException {
pluginSettings.put("key1", "value1");
pluginSettings.put("key2", "value2");
pluginSettings.put("key3", "value3");
final SinkModel sinkModel = new SinkModel("customPlugin", null, pluginSettings);
final SinkModel sinkModel = new SinkModel("customPlugin", null, null, pluginSettings);

final String actualJson = objectMapper.writeValueAsString(sinkModel);

Expand Down Expand Up @@ -156,10 +160,11 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
assertThat(actualSinkModel.getPluginSettings(), equalTo(pluginSettings));
assertThat(actualSinkModel.getRoutes(), notNullValue());
assertThat(actualSinkModel.getRoutes(), empty());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
}
}

private static String createStringFromInputStream(final InputStream inputStream) throws IOException {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,8 @@ public void testIsValueAList_withNull() {
}

@ParameterizedTest
@ValueSource(strings = {"", "withSpecialChars*$%", "-withPrefixDash", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
"withDashSuffix-", "withDashSuffix-/nestedKey", "withDashPrefix/-nestedKey", "_withUnderscorePrefix", "withUnderscoreSuffix_",
".withDotPrefix", "withDotSuffix.", "with,Comma", "with:Colon", "with[Bracket", "with|Brace"})
@ValueSource(strings = {"", "withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
"with,Comma", "with:Colon", "with[Bracket", "with|Brace"})
void testKey_withInvalidKey_throwsIllegalArgumentException(final String invalidKey) {
assertThrowsForKeyCheck(IllegalArgumentException.class, invalidKey);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import org.apache.commons.lang3.RandomStringUtils;



public class SinkContextTest {
private SinkContext sinkContext;

@Test
public void testSinkContextBasic() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
final List<String> testRoutes = Collections.emptyList();
sinkContext = new SinkContext(testTagsTargetKey, testRoutes);
assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
assertThat(sinkContext.getRoutes(), equalTo(testRoutes));

}

}

Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ customSinkPlugin:
routes:
- "routeA"
- "routeB"
tags_target_key: "tags"
key1: "value1"
key2: "value2"
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.commons.collections.CollectionUtils;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.RoutedPluginSetting;
import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,7 +82,7 @@ private static void visitAndValidate(
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipeline);
touchedPipelineSet.add(pipeline);
//if validation is successful, then there is definitely sink
final List<RoutedPluginSetting> connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings();
//Recursively check connected pipelines
for (PluginSetting pluginSetting : connectedPipelinesSettings) {
//Further process only if the sink is of pipeline type
Expand Down Expand Up @@ -159,7 +159,7 @@ private static void validateForOrphans(
throw new RuntimeException("Invalid configuration, cannot proceed with ambiguous configuration");
}
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(currentPipelineName);
final List<RoutedPluginSetting> pluginSettings = pipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> pluginSettings = pipelineConfiguration.getSinkPluginSettings();
for (PluginSetting pluginSetting : pluginSettings) {
if (PIPELINE_TYPE.equals(pluginSetting.getName()) &&
pluginSetting.getAttributeFromSettings(PIPELINE_ATTRIBUTE_NAME) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.RoutedPluginSetting;
import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator;
Expand Down Expand Up @@ -292,13 +293,13 @@ private Optional<Source> getSourceIfPipelineType(
return Optional.empty();
}

private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final RoutedPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting);
private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final SinkContextPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext());

return new DataFlowComponent<>(sink, pluginSetting.getRoutes());
return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes());
}

private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
private Sink buildSinkOrConnector(final PluginSetting pluginSetting, final SinkContext sinkContext) {
LOG.info("Building [{}] as sink component", pluginSetting.getName());
final Optional<String> pipelineNameOptional = getPipelineNameIfPipelineType(pluginSetting);
if (pipelineNameOptional.isPresent()) { //update to ifPresentOrElse when using JDK9
Expand All @@ -307,7 +308,7 @@ private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
sourceConnectorMap.put(pipelineName, pipelineConnector); //TODO retrieve from parent Pipeline using name
return pipelineConnector;
} else {
return pluginFactory.loadPlugin(Sink.class, pluginSetting);
return pluginFactory.loadPlugin(Sink.class, pluginSetting, sinkContext);
}
}

Expand Down Expand Up @@ -337,7 +338,7 @@ private void removeConnectedPipelines(
sourcePipeline, pipelineConfigurationMap, pipelineMap));

//remove sink connected pipelines
final List<RoutedPluginSetting> sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings();
sinkPluginSettings.forEach(sinkPluginSetting -> {
getPipelineNameIfPipelineType(sinkPluginSetting).ifPresent(sinkPipeline -> processRemoveIfRequired(
sinkPipeline, pipelineConfigurationMap, pipelineMap));
Expand Down
Loading

0 comments on commit 69d16cc

Please sign in to comment.