-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance copy_values processor to selectively copy entries from lists #4085
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,41 +13,84 @@ | |
import org.opensearch.dataprepper.model.processor.AbstractProcessor; | ||
import org.opensearch.dataprepper.model.processor.Processor; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
@DataPrepperPlugin(name = "copy_values", pluginType = Processor.class, pluginConfigurationType = CopyValueProcessorConfig.class) | ||
public class CopyValueProcessor extends AbstractProcessor<Record<Event>, Record<Event>> { | ||
private static final Logger LOG = LoggerFactory.getLogger(CopyValueProcessor.class); | ||
private final CopyValueProcessorConfig config; | ||
private final List<CopyValueProcessorConfig.Entry> entries; | ||
|
||
private final ExpressionEvaluator expressionEvaluator; | ||
|
||
@DataPrepperPluginConstructor | ||
public CopyValueProcessor(final PluginMetrics pluginMetrics, final CopyValueProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { | ||
super(pluginMetrics); | ||
this.config = config; | ||
this.entries = config.getEntries(); | ||
this.expressionEvaluator = expressionEvaluator; | ||
} | ||
|
||
@Override | ||
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) { | ||
for(final Record<Event> record : records) { | ||
final Event recordEvent = record.getData(); | ||
for(CopyValueProcessorConfig.Entry entry : entries) { | ||
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) { | ||
continue; | ||
} | ||
try { | ||
final Event recordEvent = record.getData(); | ||
if (config.getFromList() != null || config.getToList() != null) { | ||
// Copying entries between lists | ||
if (recordEvent.containsKey(config.getToList()) && !config.getOverwriteIfToListExists()) { | ||
continue; | ||
} | ||
|
||
if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) { | ||
continue; | ||
} | ||
final List<Map<String, Object>> sourceList = recordEvent.get(config.getFromList(), List.class); | ||
final List<Map<String, Object>> targetList = new ArrayList<>(); | ||
|
||
final Map<CopyValueProcessorConfig.Entry, Boolean> whenConditions = new HashMap<>(); | ||
for (final CopyValueProcessorConfig.Entry entry : entries) { | ||
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) { | ||
whenConditions.put(entry, Boolean.FALSE); | ||
} else { | ||
whenConditions.put(entry, Boolean.TRUE); | ||
} | ||
} | ||
for (final Map<String, Object> sourceField : sourceList) { | ||
final Map<String, Object> targetItem = new HashMap<>(); | ||
for (final CopyValueProcessorConfig.Entry entry : entries) { | ||
if (!whenConditions.get(entry) || !sourceField.containsKey(entry.getFromKey())) { | ||
continue; | ||
} | ||
targetItem.put(entry.getToKey(), sourceField.get(entry.getFromKey())); | ||
} | ||
targetList.add(targetItem); | ||
} | ||
recordEvent.put(config.getToList(), targetList); | ||
} else { | ||
// Copying individual entries | ||
for (final CopyValueProcessorConfig.Entry entry : entries) { | ||
if (Objects.nonNull(entry.getCopyWhen()) && !expressionEvaluator.evaluateConditional(entry.getCopyWhen(), recordEvent)) { | ||
continue; | ||
} | ||
|
||
if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) { | ||
continue; | ||
} | ||
|
||
if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) { | ||
final Object source = recordEvent.get(entry.getFromKey(), Object.class); | ||
recordEvent.put(entry.getToKey(), source); | ||
if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest refactor L77-85 into a private method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good one. Made the change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also opened an issue for adding metrics: #4088 |
||
final Object source = recordEvent.get(entry.getFromKey(), Object.class); | ||
recordEvent.put(entry.getToKey(), source); | ||
} | ||
} | ||
} | ||
} catch (Exception e) { | ||
LOG.error("Fail to perform copy values operation", e); | ||
//TODO: add tagging on failure | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this check correct? What is the expected behavior if
getFromList()
returns null butgetToList()
is not null?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a validation in
CopyValueProcessorConfig
that checksfrom_list
andto_list
should be both null or both non-null. So I assume this situation thatgetFromList()
returns null butgetToList()
is not null won't happen.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding documentation, since we are adopting the new process to use the documentation website and remove readme (see #2740). I have this issue created to update over there: opensearch-project/documentation-website#6390