|  | 
|  | 1 | +/* | 
|  | 2 | + * Copyright OpenSearch Contributors | 
|  | 3 | + * SPDX-License-Identifier: Apache-2.0 | 
|  | 4 | + * | 
|  | 5 | + * The OpenSearch Contributors require contributions made to | 
|  | 6 | + * this file be licensed under the Apache-2.0 license or a | 
|  | 7 | + * compatible open source license. | 
|  | 8 | + * | 
|  | 9 | + */ | 
|  | 10 | + | 
|  | 11 | +package org.opensearch.dataprepper.plugins.processor.splitevent; | 
|  | 12 | + | 
|  | 13 | +import org.opensearch.dataprepper.metrics.PluginMetrics; | 
|  | 14 | +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | 
|  | 15 | +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | 
|  | 16 | +import org.opensearch.dataprepper.model.event.DefaultEventHandle; | 
|  | 17 | +import org.opensearch.dataprepper.model.event.Event; | 
|  | 18 | +import org.opensearch.dataprepper.model.event.JacksonEvent; | 
|  | 19 | +import org.opensearch.dataprepper.model.processor.AbstractProcessor; | 
|  | 20 | +import org.opensearch.dataprepper.model.processor.Processor; | 
|  | 21 | +import org.opensearch.dataprepper.model.record.Record; | 
|  | 22 | + | 
|  | 23 | +import java.util.ArrayList; | 
|  | 24 | +import java.util.Collection; | 
|  | 25 | +import java.util.function.Function; | 
|  | 26 | +import java.util.regex.Pattern; | 
|  | 27 | + | 
|  | 28 | + | 
|  | 29 | +@DataPrepperPlugin(name = "split_event", pluginType = Processor.class, pluginConfigurationType = SplitEventProcessorConfig.class) | 
|  | 30 | +public class SplitEventProcessor extends AbstractProcessor<Record<Event>, Record<Event>>{ | 
|  | 31 | +    final String delimiter; | 
|  | 32 | +    final String delimiterRegex; | 
|  | 33 | +    final String field; | 
|  | 34 | +    final Pattern pattern; | 
|  | 35 | +    private final Function<String, String[]> splitter; | 
|  | 36 | + | 
|  | 37 | +    @DataPrepperPluginConstructor | 
|  | 38 | +    public SplitEventProcessor(final PluginMetrics pluginMetrics, final SplitEventProcessorConfig config) { | 
|  | 39 | +        super(pluginMetrics); | 
|  | 40 | +        this.delimiter = config.getDelimiter(); | 
|  | 41 | +        this.delimiterRegex = config.getDelimiterRegex(); | 
|  | 42 | +        this.field = config.getField(); | 
|  | 43 | + | 
|  | 44 | +        if(delimiterRegex != null && !delimiterRegex.isEmpty() | 
|  | 45 | +                && delimiter != null && !delimiter.isEmpty()) { | 
|  | 46 | +            throw new IllegalArgumentException("delimiter and delimiter_regex cannot be defined at the same time"); | 
|  | 47 | +        } else if((delimiterRegex == null || delimiterRegex.isEmpty()) && | 
|  | 48 | +                (delimiter == null || delimiter.isEmpty())) { | 
|  | 49 | +            throw new IllegalArgumentException("delimiter or delimiter_regex needs to be defined"); | 
|  | 50 | +        } | 
|  | 51 | + | 
|  | 52 | +        if(delimiterRegex != null && !delimiterRegex.isEmpty()) { | 
|  | 53 | +            pattern = Pattern.compile(delimiterRegex); | 
|  | 54 | +            splitter = pattern::split; | 
|  | 55 | +        } else { | 
|  | 56 | +            splitter = inputString -> inputString.split(delimiter); | 
|  | 57 | +            pattern = null; | 
|  | 58 | +        } | 
|  | 59 | +    } | 
|  | 60 | + | 
|  | 61 | +    @Override | 
|  | 62 | +    public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) { | 
|  | 63 | +        Collection<Record<Event>> newRecords = new ArrayList<>(); | 
|  | 64 | +        for(final Record<Event> record : records) { | 
|  | 65 | +            final Event recordEvent = record.getData(); | 
|  | 66 | + | 
|  | 67 | +            if (!recordEvent.containsKey(field)) { | 
|  | 68 | +                newRecords.add(record); | 
|  | 69 | +                continue; | 
|  | 70 | +            } | 
|  | 71 | +             | 
|  | 72 | +            final Object value = recordEvent.get(field, Object.class); | 
|  | 73 | + | 
|  | 74 | +            //split record according to delimiter | 
|  | 75 | +            final String[] splitValues = splitter.apply((String) value); | 
|  | 76 | + | 
|  | 77 | +           // when no splits or empty value use the original record | 
|  | 78 | +           if(splitValues.length <= 1) { | 
|  | 79 | +                newRecords.add(record); | 
|  | 80 | +                continue; | 
|  | 81 | +           } | 
|  | 82 | + | 
|  | 83 | +            //create new events for the splits  | 
|  | 84 | +            for (int i = 0; i < splitValues.length-1 ; i++) { | 
|  | 85 | +                Record newRecord = createNewRecordFromEvent(recordEvent, splitValues[i]); | 
|  | 86 | +                addToAcknowledgementSetFromOriginEvent((Event) newRecord.getData(), recordEvent); | 
|  | 87 | +                newRecords.add(newRecord); | 
|  | 88 | +            } | 
|  | 89 | + | 
|  | 90 | +            // Modify original event to hold the last split | 
|  | 91 | +            recordEvent.put(field, splitValues[splitValues.length-1]); | 
|  | 92 | +            newRecords.add(record); | 
|  | 93 | +        } | 
|  | 94 | +        return newRecords; | 
|  | 95 | +    } | 
|  | 96 | + | 
|  | 97 | +    protected Record createNewRecordFromEvent(final Event recordEvent, String splitValue) { | 
|  | 98 | +        Record newRecord; | 
|  | 99 | +        JacksonEvent newRecordEvent; | 
|  | 100 | + | 
|  | 101 | +        newRecordEvent = JacksonEvent.fromEvent(recordEvent); | 
|  | 102 | +        newRecordEvent.put(field,(Object) splitValue); | 
|  | 103 | +        newRecord = new Record<>(newRecordEvent); | 
|  | 104 | +        return newRecord; | 
|  | 105 | +    } | 
|  | 106 | + | 
|  | 107 | +    protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) { | 
|  | 108 | +        DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle(); | 
|  | 109 | +        if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { | 
|  | 110 | +            eventHandle.getAcknowledgementSet().add(recordEvent); | 
|  | 111 | +        } | 
|  | 112 | +    } | 
|  | 113 | + | 
|  | 114 | +    @Override | 
|  | 115 | +    public void prepareForShutdown() { | 
|  | 116 | +    } | 
|  | 117 | + | 
|  | 118 | +    @Override | 
|  | 119 | +    public boolean isReadyForShutdown() { | 
|  | 120 | +        return true; | 
|  | 121 | +    } | 
|  | 122 | + | 
|  | 123 | +    @Override | 
|  | 124 | +    public void shutdown() { | 
|  | 125 | +    } | 
|  | 126 | +} | 
0 commit comments