Skip to content

Commit

Permalink
fix flushing upon empty batches with ordered execution
Browse files Browse the repository at this point in the history
when running a pipeline with ordered execution, flushes on the pipeline
were no longer being called when compute is called with an empty batch, causing
issues with the aggregate filter, for example, not being able to push events on
timeout.
  • Loading branch information
colinsurprenant authored and elasticsearch-bot committed Aug 26, 2020
1 parent b6b85e1 commit 726061d
Showing 1 changed file with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,23 +306,35 @@ private boolean isOutput(final Vertex vertex) {

public final class CompiledOrderedExecution extends CompiledExecution {

@SuppressWarnings({"unchecked"}) private final RubyArray<RubyEvent> EMPTY_ARRAY = RubyUtil.RUBY.newEmptyArray();

@Override
public void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
compute(batch.events(), flush, shutdown);
}

@Override
public void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
// send batch one-by-one as single-element batches down the filters
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
for (final RubyEvent e : batch) {
filterBatch.set(0, e);
final Collection<RubyEvent> result = compiledFilters.compute(filterBatch, flush, shutdown);
copyNonCancelledEvents(result, outputBatch);
compiledFilters.clear();
if (!batch.isEmpty()) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
// send batch one-by-one as single-element batches down the filters
for (final RubyEvent e : batch) {
filterBatch.set(0, e);
_compute(filterBatch, outputBatch, flush, shutdown);
}
compiledOutputs.compute(outputBatch, flush, shutdown);
} else if (flush || shutdown) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
_compute(EMPTY_ARRAY, outputBatch, flush, shutdown);
compiledOutputs.compute(outputBatch, flush, shutdown);
}
compiledOutputs.compute(outputBatch, flush, shutdown);
}

private void _compute(final RubyArray<RubyEvent> batch, final RubyArray<RubyEvent> outputBatch, final boolean flush, final boolean shutdown) {
final Collection<RubyEvent> result = compiledFilters.compute(batch, flush, shutdown);
copyNonCancelledEvents(result, outputBatch);
compiledFilters.clear();
}
}

Expand Down

0 comments on commit 726061d

Please sign in to comment.