Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance copy_values processor to selectively copy entries from lists #4085

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,84 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@DataPrepperPlugin(name = "copy_values", pluginType = Processor.class, pluginConfigurationType = CopyValueProcessorConfig.class)
public class CopyValueProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(CopyValueProcessor.class);
private final CopyValueProcessorConfig config;
private final List<CopyValueProcessorConfig.Entry> entries;

private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public CopyValueProcessor(final PluginMetrics pluginMetrics, final CopyValueProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.config = config;
this.entries = config.getEntries();
this.expressionEvaluator = expressionEvaluator;
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();
for(CopyValueProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) {
continue;
}
try {
final Event recordEvent = record.getData();
if (config.getFromList() != null || config.getToList() != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check correct? What is the expected behavior if getFromList() returns null but getToList() is not null?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a validation in CopyValueProcessorConfig that checks from_list and to_list should be both null or both non-null. So I assume this situation that getFromList() returns null but getToList() is not null won't happen.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding documentation, since we are adopting the new process to use the documentation website and remove readme (see #2740). I have this issue created to update over there: opensearch-project/documentation-website#6390

// Copying entries between lists
if (recordEvent.containsKey(config.getToList()) && !config.getOverwriteIfToListExists()) {
continue;
}

if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) {
continue;
}
final List<Map<String, Object>> sourceList = recordEvent.get(config.getFromList(), List.class);
final List<Map<String, Object>> targetList = new ArrayList<>();

final Map<CopyValueProcessorConfig.Entry, Boolean> whenConditions = new HashMap<>();
for (final CopyValueProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) {
whenConditions.put(entry, Boolean.FALSE);
} else {
whenConditions.put(entry, Boolean.TRUE);
}
}
for (final Map<String, Object> sourceField : sourceList) {
final Map<String, Object> targetItem = new HashMap<>();
for (final CopyValueProcessorConfig.Entry entry : entries) {
if (!whenConditions.get(entry) || !sourceField.containsKey(entry.getFromKey())) {
continue;
}
targetItem.put(entry.getToKey(), sourceField.get(entry.getFromKey()));
}
targetList.add(targetItem);
}
recordEvent.put(config.getToList(), targetList);
} else {
// Copying individual entries
for (final CopyValueProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) {
continue;
}

if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) {
continue;
}

if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest refactor L77-85 into a private method shouldCopyEntry or skipCopyEntry

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one. Made the change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also opened an issue for adding metrics: #4088

final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
}
}
}
} catch (Exception e) {
LOG.error("Fail to perform copy values operation", e);
//TODO: add tagging on failure
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.List;

public class CopyValueProcessorConfig {
public static class Entry {
public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("from_key")
Expand Down Expand Up @@ -61,7 +62,33 @@ public Entry() {
@Valid
private List<Entry> entries;

@JsonProperty("from_list")
private String fromList;

@JsonProperty("to_list")
private String toList;

@JsonProperty("overwrite_if_to_list_exists")
private boolean overwriteIfToListExists = false;

@AssertTrue(message = "Both from_list and to_list should be specified when copying entries between lists.")
boolean isBothFromListAndToListProvided() {
return (fromList == null && toList == null) || (fromList != null && toList != null);
}

public List<Entry> getEntries() {
return entries;
}

public String getFromList() {
return fromList;
}

public String getToList() {
return toList;
}

public boolean getOverwriteIfToListExists() {
return overwriteIfToListExists;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -26,6 +27,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -39,6 +41,13 @@ public class CopyValueProcessorTests {
@Mock
private ExpressionEvaluator expressionEvaluator;

@BeforeEach
void setUp() {
lenient().when(mockConfig.getFromList()).thenReturn(null);
lenient().when(mockConfig.getToList()).thenReturn(null);
lenient().when(mockConfig.getOverwriteIfToListExists()).thenReturn(false);
}

@Test
public void testSingleCopyProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, null)));
Expand Down Expand Up @@ -186,6 +195,106 @@ public void testKey_is_not_copied_when_copyWhen_returns_false() {
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage"));
}

@Test
public void testCopyEntriesFromList() {
when(mockConfig.getFromList()).thenReturn("mylist");
when(mockConfig.getToList()).thenReturn("newlist");
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
createEntry("name", "fruit_name", true, null),
createEntry("color", "fruit_color", true, null)
));

final CopyValueProcessor processor = createObjectUnderTest();
final Record<Event> record = getEventWithLists(List.of(
Map.of("name", "apple", "color", "red", "shape", "round"),
Map.of("name", "orange", "color", "orange", "shape", "round"),
Map.of("name", "banana", "color", "yellow", "shape", "curved")
));
final List<Record<Event>> resultRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
final Record<Event> resultRecord = resultRecords.get(0);

assertThat(resultRecord.getData().containsKey("newlist"), is(true));
assertThat(resultRecord.getData().get("newlist", List.class), is(List.of(
Map.of("fruit_name", "apple", "fruit_color", "red"),
Map.of("fruit_name", "orange", "fruit_color", "orange"),
Map.of("fruit_name", "banana", "fruit_color", "yellow")
)));
}

@Test
public void testCopyEntriesFromListNotOverwriteByDefault() {
when(mockConfig.getFromList()).thenReturn("mylist");
when(mockConfig.getToList()).thenReturn("mylist");

final CopyValueProcessor processor = createObjectUnderTest();
final Record<Event> record = getEventWithLists(List.of(
Map.of("name", "apple", "color", "red", "shape", "round"),
Map.of("name", "orange", "color", "orange", "shape", "round"),
Map.of("name", "banana", "color", "yellow", "shape", "curved")
));
final List<Record<Event>> resultRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
final Record<Event> resultRecord = resultRecords.get(0);

assertThat(resultRecord.getData().containsKey("newlist"), is(false));
}

@Test
public void testCopyEntriesFromListOverwritesExistingList() {
when(mockConfig.getFromList()).thenReturn("mylist");
when(mockConfig.getToList()).thenReturn("mylist");
when(mockConfig.getOverwriteIfToListExists()).thenReturn(true);
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
createEntry("name", "fruit_name", true, null),
createEntry("color", "fruit_color", true, null)
));

final CopyValueProcessor processor = createObjectUnderTest();
final Record<Event> record = getEventWithLists(List.of(
Map.of("name", "apple", "color", "red", "shape", "round"),
Map.of("name", "orange", "color", "orange", "shape", "round"),
Map.of("name", "banana", "color", "yellow", "shape", "curved")
));
final List<Record<Event>> resultRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
final Record<Event> resultRecord = resultRecords.get(0);

assertThat(resultRecord.getData().containsKey("mylist"), is(true));
assertThat(resultRecord.getData().get("mylist", List.class), is(List.of(
Map.of("fruit_name", "apple", "fruit_color", "red"),
Map.of("fruit_name", "orange", "fruit_color", "orange"),
Map.of("fruit_name", "banana", "fruit_color", "yellow")
)));
}

@Test
public void testCopyEntriesFromListWithWhenConditions() {
when(mockConfig.getFromList()).thenReturn("mylist");
when(mockConfig.getToList()).thenReturn("mylist");
when(mockConfig.getOverwriteIfToListExists()).thenReturn(true);
final String copyWhen = UUID.randomUUID().toString();

when(mockConfig.getEntries()).thenReturn(createListOfEntries(
createEntry("name", "fruit_name", true, null),
createEntry("color", "fruit_color", true, copyWhen)
));

final CopyValueProcessor processor = createObjectUnderTest();
final Record<Event> record = getEventWithLists(List.of(
Map.of("name", "apple", "color", "red", "shape", "round"),
Map.of("name", "orange", "color", "orange", "shape", "round"),
Map.of("name", "banana", "color", "yellow", "shape", "curved")
));
when(expressionEvaluator.evaluateConditional(copyWhen, record.getData())).thenReturn(false);
final List<Record<Event>> resultRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
final Record<Event> resultRecord = resultRecords.get(0);

assertThat(resultRecord.getData().containsKey("mylist"), is(true));
assertThat(resultRecord.getData().get("mylist", List.class), is(List.of(
Map.of("fruit_name", "apple"),
Map.of("fruit_name", "orange"),
Map.of("fruit_name", "banana")
)));
}

private CopyValueProcessor createObjectUnderTest() {
return new CopyValueProcessor(pluginMetrics, mockConfig, expressionEvaluator);
}
Expand All @@ -204,6 +313,11 @@ private Record<Event> getEvent(String message) {
return buildRecordWithEvent(testData);
}

private Record<Event> getEventWithLists(List<Map<String, Object>> testList) {
final Map<String, Object> testData = Map.of("mylist", testList);
return buildRecordWithEvent(testData);
}

private static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
Expand Down
Loading