Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions data-prepper-plugins/split-event-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 1.0
}
}
}
}


dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.splitevent;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;

import java.util.ArrayList;
import java.util.Collection;
import java.util.function.Function;
import java.util.regex.Pattern;


@DataPrepperPlugin(name = "split_event", pluginType = Processor.class, pluginConfigurationType = SplitEventProcessorConfig.class)
public class SplitEventProcessor extends AbstractProcessor<Record<Event>, Record<Event>>{
final String delimiter;
final String delimiterRegex;
final String field;
final Pattern pattern;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to retain this as a field. The lambda will hold a reference to the Pattern.

private final Function<String, String[]> splitter;

@DataPrepperPluginConstructor
public SplitEventProcessor(final PluginMetrics pluginMetrics, final SplitEventProcessorConfig config) {
super(pluginMetrics);
this.delimiter = config.getDelimiter();
this.delimiterRegex = config.getDelimiterRegex();
this.field = config.getField();

if(delimiterRegex != null && !delimiterRegex.isEmpty()
&& delimiter != null && !delimiter.isEmpty()) {
throw new IllegalArgumentException("delimiter and delimiter_regex cannot be defined at the same time");
} else if((delimiterRegex == null || delimiterRegex.isEmpty()) &&
(delimiter == null || delimiter.isEmpty())) {
throw new IllegalArgumentException("delimiter or delimiter_regex needs to be defined");
}

if(delimiterRegex != null && !delimiterRegex.isEmpty()) {
pattern = Pattern.compile(delimiterRegex);
splitter = pattern::split;
} else {
splitter = inputString -> inputString.split(delimiter);
pattern = null;
}
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
Collection<Record<Event>> newRecords = new ArrayList<>();
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();

if (!recordEvent.containsKey(field)) {
newRecords.add(record);
continue;
}

final Object value = recordEvent.get(field, Object.class);

//split record according to delimiter
final String[] splitValues = splitter.apply((String) value);

// when no splits or empty value use the original record
if(splitValues.length <= 1) {
newRecords.add(record);
continue;
}

//create new events for the splits
for (int i = 0; i < splitValues.length-1 ; i++) {
Record newRecord = createNewRecordFromEvent(recordEvent, splitValues[i]);
addToAcknowledgementSetFromOriginEvent((Event) newRecord.getData(), recordEvent);
newRecords.add(newRecord);
}

// Modify original event to hold the last split
recordEvent.put(field, splitValues[splitValues.length-1]);
newRecords.add(record);
}
return newRecords;
}

protected Record createNewRecordFromEvent(final Event recordEvent, String splitValue) {
Record newRecord;
JacksonEvent newRecordEvent;

newRecordEvent = JacksonEvent.fromEvent(recordEvent);
newRecordEvent.put(field,(Object) splitValue);
newRecord = new Record<>(newRecordEvent);
return newRecord;
}

protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) {
DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
eventHandle.getAcknowledgementSet().add(recordEvent);
}
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.splitevent;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;


public class SplitEventProcessorConfig {
@NotEmpty
@NotNull
@JsonProperty("field")
private String field;

@JsonProperty("delimiter_regex")
private String delimiterRegex;

@Size(min = 1, max = 1)
private String delimiter;

public String getField() {
return field;
}

public String getDelimiterRegex() {
return delimiterRegex;
}

public String getDelimiter() {
return delimiter;
}
}
Loading