Skip to content

Commit

Permalink
Aggrerate processor : add option to allow raw events (#4598)
Browse files Browse the repository at this point in the history
* Aggregate Processor: Add support to allow raw events

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Modify test to check for aggregated tag

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka authored Jun 10, 2024
1 parent 0584573 commit ad92aa2
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
private boolean localMode = false;
private final String whenCondition;
private final ExpressionEvaluator expressionEvaluator;
private final boolean outputUnaggregatedEvents;
private final String aggregatedEventsTag;

@DataPrepperPluginConstructor
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final ExpressionEvaluator expressionEvaluator) {
Expand All @@ -59,7 +61,9 @@ public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfi
final IdentificationKeysHasher identificationKeysHasher, final AggregateActionSynchronizer.AggregateActionSynchronizerProvider aggregateActionSynchronizerProvider, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.aggregateProcessorConfig = aggregateProcessorConfig;
this.aggregatedEventsTag = aggregateProcessorConfig.getAggregatedEventsTag();
this.aggregateGroupManager = aggregateGroupManager;
this.outputUnaggregatedEvents = aggregateProcessorConfig.getOutputUnaggregatedEvents();
this.expressionEvaluator = expressionEvaluator;
this.identificationKeysHasher = identificationKeysHasher;
this.aggregateAction = loadAggregateAction(pluginFactory);
Expand Down Expand Up @@ -92,6 +96,9 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final List<Event> concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null;
if (!concludeGroupEvents.isEmpty()) {
concludeGroupEvents.stream().forEach((event) -> {
if (aggregatedEventsTag != null) {
event.getMetadata().addTags(List.of(aggregatedEventsTag));
}
recordsOut.add(new Record(event));
actionConcludeGroupEventsOutCounter.increment();
});
Expand All @@ -116,11 +123,17 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final Event aggregateActionResponseEvent = handleEventResponse.getEvent();

if (aggregateActionResponseEvent != null) {
if (aggregatedEventsTag != null) {
aggregateActionResponseEvent.getMetadata().addTags(List.of(aggregatedEventsTag));
}
recordsOut.add(new Record<>(aggregateActionResponseEvent, record.getMetadata()));
handleEventsOut++;
} else {
handleEventsDropped++;
}
if (outputUnaggregatedEvents) {
recordsOut.add(record);
}
}

actionHandleEventsOutCounter.increment(handleEventsOut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

Expand All @@ -32,6 +33,12 @@ public class AggregateProcessorConfig {
@NotNull
private Boolean localMode = false;

@JsonProperty("output_unaggregated_events")
private Boolean outputUnaggregatedEvents = false;

@JsonProperty("aggregated_events_tag")
private String aggregatedEventsTag;

@JsonProperty("aggregate_when")
private String whenCondition;

Expand All @@ -47,10 +54,23 @@ public String getWhenCondition() {
return whenCondition;
}

public String getAggregatedEventsTag() {
return aggregatedEventsTag;
}

public Boolean getOutputUnaggregatedEvents() {
return outputUnaggregatedEvents;
}

public Boolean getLocalMode() {
return localMode;
}

@AssertTrue(message="Aggragated Events Tag must be set when output_unaggregated_events is set")
boolean isValidConfig() {
return (!outputUnaggregatedEvents || (outputUnaggregatedEvents && aggregatedEventsTag != null));
}

public PluginModel getAggregateAction() { return aggregateAction; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ void setup() {

pluginMetrics = PluginMetrics.fromNames(UUID.randomUUID().toString(), UUID.randomUUID().toString());

when(aggregateProcessorConfig.getOutputUnaggregatedEvents()).thenReturn(false);
when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(identificationKeys);
when(aggregateProcessorConfig.getAggregateAction()).thenReturn(actionConfiguration);
when(actionConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString());
Expand Down Expand Up @@ -445,6 +446,50 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio
assertThat(record.getData().toMap(), hasKey(DEFAULT_START_TIME_KEY));
}

@RepeatedTest(value = 2)
void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
when(aggregateProcessorConfig.getOutputUnaggregatedEvents()).thenReturn(true);
String tag = UUID.randomUUID().toString();
when(aggregateProcessorConfig.getAggregatedEventsTag()).thenReturn(tag);
CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig();
setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW.toString());
aggregateAction = new CountAggregateAction(countAggregateActionConfig);
when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class)))
.thenReturn(aggregateAction);
when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE));
eventBatch = getBatchOfEvents(true);

final AggregateProcessor objectUnderTest = createObjectUnderTest();

final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS);

for (int i = 0; i < NUM_THREADS; i++) {
executorService.execute(() -> {
final List<Record<Event>> recordsOut = (List<Record<Event>>) objectUnderTest.doExecute(eventBatch);
assertThat(recordsOut.size(), equalTo(NUM_EVENTS_PER_BATCH));
countDownLatch.countDown();
});
}
// wait longer so that the raw events are processed.
Thread.sleep(2*GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));

Collection<Record<Event>> results = objectUnderTest.doExecute(new ArrayList<Record<Event>>());
assertThat(results.size(), equalTo(1));

Map<String, Object> expectedEventMap = new HashMap<>(getEventMap(testValue));
expectedEventMap.put(DEFAULT_COUNT_KEY, NUM_THREADS * NUM_EVENTS_PER_BATCH);

final Record<Event> record = (Record<Event>)results.toArray()[0];
assertTrue(record.getData().getMetadata().hasTags(List.of(tag)));
expectedEventMap.forEach((k, v) -> assertThat(record.getData().toMap(), hasEntry(k,v)));
assertThat(record.getData().toMap(), hasKey(DEFAULT_START_TIME_KEY));
}


@RepeatedTest(value = 2)
void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuchFieldException, IllegalAccessException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -124,6 +125,7 @@ private AggregateProcessor createObjectUnderTest() {
@BeforeEach
void setUp() {
when(aggregateProcessorConfig.getAggregateAction()).thenReturn(actionConfiguration);
when(aggregateProcessorConfig.getOutputUnaggregatedEvents()).thenReturn(false);
when(aggregateProcessorConfig.getLocalMode()).thenReturn(false);
when(actionConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString());
when(actionConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap());
Expand Down Expand Up @@ -400,6 +402,33 @@ void handleEvent_returning_with_event_adds_event_to_records_out() {

verify(aggregateGroupManager).getGroupsToConclude(eq(false));
}
@Test
void handleEvent_returning_with_event_adds_event_to_records_out_with_output_unaggregated_events() {
when(aggregateProcessorConfig.getOutputUnaggregatedEvents()).thenReturn(true);
String tag = UUID.randomUUID().toString();
when(aggregateProcessorConfig.getAggregatedEventsTag()).thenReturn(tag);
final AggregateProcessor objectUnderTest = createObjectUnderTest();
final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry = new AbstractMap.SimpleEntry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>(identificationKeysMap, aggregateGroup);
when(aggregateGroupManager.getGroupsToConclude(eq(false))).thenReturn(Collections.singletonList(groupEntry));
when(aggregateActionResponse.getEvent()).thenReturn(event);
when(aggregateActionSynchronizer.concludeGroup(identificationKeysMap, aggregateGroup, false)).thenReturn(new AggregateActionOutput(List.of()));

final List<Record<Event>> recordsOut = (List<Record<Event>>) objectUnderTest.doExecute(Collections.singletonList(new Record<>(event)));

assertThat(recordsOut.size(), equalTo(2));
assertThat(recordsOut.get(0), notNullValue());
assertThat(recordsOut.get(0).getData(), equalTo(event));
assertThat(recordsOut.get(1), notNullValue());
assertThat(recordsOut.get(1).getData(), equalTo(event));
Event receivedEvent = recordsOut.get(1).getData();
assertTrue(receivedEvent.getMetadata().hasTags(List.of(tag)));

verify(actionHandleEventsOutCounter).increment(1);
verify(actionHandleEventsDroppedCounter).increment(0);
verifyNoInteractions(actionConcludeGroupEventsOutCounter);

verify(aggregateGroupManager).getGroupsToConclude(eq(false));
}

@Test
void concludeGroup_returning_with_no_event_does_not_add_event_to_records_out() {
Expand Down

0 comments on commit ad92aa2

Please sign in to comment.