From 34bd7744bd18bc02b9b373b3ca59085fe8a654b2 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Thu, 1 Aug 2019 16:58:30 -0500 Subject: [PATCH] preserve input event ordering with single pipeline worker --- logstash-core/lib/logstash/java_pipeline.rb | 2 +- .../logstash/config/ir/CompiledPipeline.java | 60 +++++++++++------ .../config/ir/compiler/DatasetCompiler.java | 35 ++++++++-- .../logstash/config/ir/compiler/Utils.java | 13 ++++ .../org/logstash/execution/WorkerLoop.java | 5 +- .../logstash/ext/JrubyEventExtLibrary.java | 6 ++ .../config/ir/CompiledPipelineTest.java | 65 ++++++++++++++++--- .../ir/compiler/DatasetCompilerTest.java | 5 +- 8 files changed, 151 insertions(+), 40 deletions(-) diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 1360dd5e04d..b848c6a8e15 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -238,7 +238,7 @@ def start_workers Util.set_thread_name("[#{pipeline_id}]>worker#{t}") org.logstash.execution.WorkerLoop.new( lir_execution, filter_queue_client, @events_filtered, @events_consumed, - @flushRequested, @flushing, @shutdownRequested, @drain_queue).run + @flushRequested, @flushing, @shutdownRequested, @drain_queue, pipeline_workers == 1).run end @worker_threads << thread end diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 9cc2546ab28..e988aadacb1 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -97,10 +97,12 @@ public Collection inputs() { /** * This method contains the actual compilation of the {@link Dataset} representing the * underlying pipeline from the Queue to the outputs. + * @param orderedEvents When true, generates code to process events in order. Event ordering is + * guaranteed only with a single pipeline worker. * @return Compiled {@link Dataset} representation of the underlying {@link PipelineIR} topology */ - public Dataset buildExecution() { - return new CompiledPipeline.CompiledExecution().toDataset(); + public Dataset buildExecution(boolean orderedEvents) { + return new CompiledPipeline.CompiledExecution(orderedEvents).toDataset(); } /** @@ -247,8 +249,8 @@ private final class CompiledExecution { private final Dataset compiled; - CompiledExecution() { - compiled = compile(); + CompiledExecution(boolean orderedEvents) { + compiled = compile(orderedEvents); } Dataset toDataset() { @@ -257,9 +259,11 @@ Dataset toDataset() { /** * Instantiates the graph of compiled {@link Dataset}. + * @param orderedEvents When true, generates code to process events in order. Event ordering is + * guaranteed only with a single pipeline worker. * @return Compiled {@link Dataset} representing the pipeline. */ - private Dataset compile() { + private Dataset compile(boolean orderedEvents) { final Collection outputNodes = pipelineIR.getGraph() .allLeaves().filter(CompiledPipeline.this::isOutput) .collect(Collectors.toList()); @@ -267,7 +271,7 @@ private Dataset compile() { return Dataset.IDENTITY; } else { return DatasetCompiler.terminalDataset(outputNodes.stream().map( - leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf)) + leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf), orderedEvents) ).collect(Collectors.toList())); } } @@ -299,16 +303,20 @@ private Dataset filterDataset(final Vertex vertex, final Collection dat * the application of the given output. * @param vertex Vertex of the output to create this {@link Dataset} for * @param datasets All the datasets that have children passing into this output + * @param orderedEvents When true, generates code to process events in order. Event ordering is + * guaranteed only with a single pipeline worker. * @return Output {@link Dataset} */ - private Dataset outputDataset(final Vertex vertex, final Collection datasets) { + private Dataset outputDataset(final Vertex vertex, final Collection datasets, + final boolean orderedEvents) { final String vertexId = vertex.getId(); if (!plugins.containsKey(vertexId)) { final ComputeStepSyntaxElement prepared = - DatasetCompiler.outputDataset(flatten(datasets, vertex), + DatasetCompiler.outputDataset(flatten(datasets, vertex, orderedEvents), outputs.get(vertexId), - outputs.size() == 1); + outputs.size() == 1, + orderedEvents); LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared); plugins.put(vertexId, prepared.instantiate()); } @@ -318,18 +326,20 @@ private Dataset outputDataset(final Vertex vertex, final Collection dat /** * Split the given {@link Dataset}s and return the dataset half of their elements that contains - * the {@link JrubyEventExtLibrary.RubyEvent} that fulfil the given {@link EventCondition}. + * the {@link JrubyEventExtLibrary.RubyEvent} that fulfill the given {@link EventCondition}. * @param datasets Datasets that are the parents of the datasets to split * @param condition Condition that must be fulfilled * @param vertex Vertex id to cache the resulting {@link Dataset} under + * @param orderedEvents When true, generates code to process events in order. Event ordering is + * guaranteed only with a single pipeline worker. * @return The half of the datasets contents that fulfils the condition */ - private SplitDataset split(final Collection datasets, - final EventCondition condition, final Vertex vertex) { + private SplitDataset split(final Collection datasets, final EventCondition condition, + final Vertex vertex, final boolean orderedEvents) { final String key = vertex.getId(); SplitDataset conditional = iffs.get(key); if (conditional == null) { - final Collection dependencies = flatten(datasets, vertex); + final Collection dependencies = flatten(datasets, vertex, orderedEvents); conditional = iffs.get(key); // Check that compiling the dependencies did not already instantiate the conditional // by requiring its else branch. @@ -354,31 +364,40 @@ private SplitDataset split(final Collection datasets, * a {code filter} or and {code if} statement). * @param datasets Nodes from the last already compiled level * @param start Vertex to compile children for + * @param orderedEvents When true, generates code to process events in order. Event ordering is + * guaranteed only with a single pipeline worker. * @return Datasets originating from given {@link Vertex} */ - private Collection flatten(final Collection datasets, - final Vertex start) { + private Collection flatten(final Collection datasets, final Vertex start, + final boolean orderedEvents) { final Collection result = compileDependencies(start, datasets, - start.incomingVertices().filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex) + start.incomingVertices().filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex), + orderedEvents ); return result.isEmpty() ? datasets : result; } + private Collection flatten(final Collection datasets, final Vertex start) { + return flatten(datasets, start, false); + } + /** * Compiles all child vertices for a given vertex. * @param datasets Datasets from previous stage * @param start Start Vertex that got expanded * @param dependencies Dependencies of {@code start} + * @param orderedEvents When true, generates code to process events in order. Event ordering is + * guaranteed only with a single pipeline worker. * @return Datasets compiled from vertex children */ - private Collection compileDependencies(final Vertex start, - final Collection datasets, final Stream dependencies) { + private Collection compileDependencies(final Vertex start, final Collection datasets, + final Stream dependencies, boolean orderedEvents) { return dependencies.map( dependency -> { if (isFilter(dependency)) { return filterDataset(dependency, datasets); } else if (isOutput(dependency)) { - return outputDataset(dependency, datasets); + return outputDataset(dependency, datasets, orderedEvents); } else { // We know that it's an if vertex since the the input children are either // output, filter or if in type. @@ -386,7 +405,8 @@ private Collection compileDependencies(final Vertex start, final SplitDataset ifDataset = split( datasets, conditionalCompiler.buildCondition(ifvert.getBooleanExpression()), - dependency + dependency, + orderedEvents ); // It is important that we double check that we are actually dealing with the // positive/left branch of the if condition diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java index 28717e9832c..78333b47bbe 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java @@ -4,6 +4,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.stream.Collectors; import org.jruby.RubyArray; import org.jruby.RubyHash; @@ -155,13 +156,16 @@ public static Dataset terminalDataset(final Collection parents) { * @param parents Parent Datasets * @param output Output Plugin (of Ruby type OutputDelegator) * @param terminal Set to true if this output is the only output in the pipeline + * @param orderedEvents When true, generates code to process events in order. Event ordering is + * guaranteed only with a single pipeline worker. * @return Output Dataset */ public static ComputeStepSyntaxElement outputDataset(final Collection parents, - final AbstractOutputDelegatorExt output, final boolean terminal) { + final AbstractOutputDelegatorExt output, final boolean terminal, final boolean orderedEvents) { final ClassFields fields = new ClassFields(); final Closure clearSyntax; final Closure computeSyntax; + if (parents.isEmpty()) { clearSyntax = Closure.EMPTY; computeSyntax = Closure.wrap(invokeOutput(fields.add(output), BATCH_ARG)); @@ -181,7 +185,7 @@ public static ComputeStepSyntaxElement outputDataset(final Collection prepare(final DatasetCompiler.C * @param compute Closure to execute * @param parents Parents to buffer results for * @param inputBuffer Buffer to store results in + * @param sorted When true, generates code to sort events by sequence number before sending + * to outputs in order to preserve input event order. Event ordering is + * guaranteed only with a single pipeline worker. * @return Closure wrapped by buffering parent results and clearing them */ - private static Closure withInputBuffering(final Closure compute, - final Collection parents, final ValueSyntaxElement inputBuffer) { + private static Closure withInputBuffering(final Closure compute, final Collection parents, + final ValueSyntaxElement inputBuffer, final boolean sorted) { return Closure.wrap( parents.stream().map(par -> SyntaxFactory.value("org.logstash.config.ir.compiler.Utils") - .call("copyNonCancelledEvents", computeDataset(par), inputBuffer) - ).toArray(MethodLevelSyntaxElement[]::new) - ).add(compute).add(clear(inputBuffer)); + .call("copyNonCancelledEvents", computeDataset(par), inputBuffer)) + .toArray(MethodLevelSyntaxElement[]::new)) + .add(sorted ? eventSorter(inputBuffer) : Closure.EMPTY) + .add(compute) + .add(clear(inputBuffer)); + } + + private static Closure withInputBuffering(final Closure compute, final Collection parents, + final ValueSyntaxElement inputBuffer) { + return withInputBuffering(compute, parents, inputBuffer, false); + } + + private static Closure eventSorter(ValueSyntaxElement inputBuffer) { + return Closure.wrap(SyntaxFactory.value("java.util.Collections") + .call("sort", + inputBuffer, + SyntaxFactory.value("new org.logstash.config.ir.compiler.Utils.EventSequenceComparator()"))); } /** diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/Utils.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/Utils.java index b47d268fd05..55567656dea 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/Utils.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/Utils.java @@ -3,6 +3,7 @@ import org.logstash.ext.JrubyEventExtLibrary; import java.util.Collection; +import java.util.Comparator; import java.util.List; /** @@ -32,4 +33,16 @@ public static void filterEvents(Collection input } } + /** + * Comparator for events based on the sequence of their instantiation. Used to maintain input event + * ordering with a single pipeline worker. + */ + public static class EventSequenceComparator implements Comparator { + + @Override + public int compare(JrubyEventExtLibrary.RubyEvent o1, JrubyEventExtLibrary.RubyEvent o2) { + return Long.compare(o1.sequence(), o2.sequence()); + } + } + } diff --git a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java index c3825ddf65d..43e633fd930 100644 --- a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java +++ b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java @@ -40,10 +40,11 @@ public final class WorkerLoop implements Runnable { public WorkerLoop(final CompiledPipeline pipeline, final QueueReadClient readClient, final LongAdder filteredCounter, final LongAdder consumedCounter, final AtomicBoolean flushRequested, final AtomicBoolean flushing, - final AtomicBoolean shutdownRequested, final boolean drainQueue) { + final AtomicBoolean shutdownRequested, final boolean drainQueue, + final boolean orderedEvents) { this.consumedCounter = consumedCounter; this.filteredCounter = filteredCounter; - this.execution = pipeline.buildExecution(); + this.execution = pipeline.buildExecution(orderedEvents); this.drainQueue = drainQueue; this.readClient = readClient; this.flushRequested = flushRequested; diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyEventExtLibrary.java b/logstash-core/src/main/java/org/logstash/ext/JrubyEventExtLibrary.java index dcd1b6b0ed6..5da5508ffcd 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyEventExtLibrary.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyEventExtLibrary.java @@ -42,6 +42,8 @@ public static final class RubyEvent extends RubyObject { */ private final int hash = nextHash(); + private final long sequence = SEQUENCE_GENERATOR.get(); + private Event event; public RubyEvent(final Ruby runtime, final RubyClass klass) { @@ -277,6 +279,10 @@ public IRubyObject ruby_set_timestamp(ThreadContext context, IRubyObject value) return value; } + public long sequence() { + return sequence; + } + @Override public int hashCode() { return hash; diff --git a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java index 4017509b8d0..d5ed67b7623 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java @@ -14,6 +14,7 @@ import java.util.function.Supplier; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; +import org.jruby.RubyArray; import org.jruby.RubyInteger; import org.jruby.RubyString; import org.jruby.runtime.builtin.IRubyObject; @@ -35,6 +36,10 @@ import co.elastic.logstash.api.Input; import co.elastic.logstash.api.Context; +import static org.logstash.ext.JrubyEventExtLibrary.RubyEvent; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + /** * Tests for {@link CompiledPipeline}. */ @@ -107,7 +112,7 @@ public void buildsTrivialPipeline() throws Exception { Collections.emptyMap(), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).buildExecution(false).compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -128,7 +133,7 @@ public void buildsStraightPipeline() throws Exception { Collections.singletonMap("mockfilter", () -> IDENTITY_FILTER), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).buildExecution(false).compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -159,12 +164,56 @@ public void buildsForkedPipeline() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).buildExecution(false).compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); } + @Test + public void preservesInputEventInstantiationOrder() throws Exception { + testEventOrder(true, new int[]{0, 2, 1}); + } + + @Test + public void ignoresInputEventInstantiationOrder() throws Exception { + testEventOrder(false, null); + } + + private void testEventOrder(boolean sortedEvents, int[] expectedOrdering) throws Exception { + final RubyEvent event1 = RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event()); + event1.getEvent().setField("message", 1); + final RubyEvent event2 = RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event()); + event2.getEvent().setField("message", 2); + event2.getEvent().tag("foo"); + final RubyEvent event3 = RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event()); + event3.getEvent().setField("message", 3); + + // arrange them out-of-order in the input batch to test ordering + RubyArray inputBatch = RubyUtil.RUBY.newArray(event1, event3, event2); + + final PipelineIR pipelineIR = ConfigCompiler.configToPipelineIR( + "input {mockinput{}} " + + "filter { if \"foo\" not in [tags] { mockfilter { } } }" + + "output {mockoutput{}}", false + ); + new CompiledPipeline( + pipelineIR, + new CompiledPipelineTest.MockPluginFactory( + Collections.singletonMap("mockinput", () -> null), + Collections.singletonMap("mockfilter", () -> IDENTITY_FILTER), + Collections.singletonMap("mockoutput", mockOutputSupplier()) + ) + ).buildExecution(sortedEvents).compute(inputBatch, false, false); + final Collection outputEvents = EVENT_SINKS.get(runId); + assertThat(outputEvents.size(), is(3)); + + RubyEvent[] outputArray = outputEvents.toArray(new RubyEvent[0]); + for (int k = 0; k < outputArray.length; k++) { + assertThat(outputArray[k], is(inputBatch.get(expectedOrdering == null ? k : expectedOrdering[k]))); + } + } + @Test public void correctlyCompilesEquals() throws Exception { final String eq = "=="; @@ -273,7 +322,7 @@ public void equalityCheckOnCompositeField() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).buildExecution(false).compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -297,7 +346,7 @@ public void conditionalWithNullField() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).buildExecution(false).compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -322,7 +371,7 @@ public void conditionalNestedMetaFieldPipeline() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).buildExecution(false).compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -355,7 +404,7 @@ public void moreThan255Parents() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).buildExecution(false).compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -400,7 +449,7 @@ private void verifyComparison(final boolean expected, final String conditional, Collections.singletonMap("mockaddfilter", () -> ADD_FIELD_FILTER), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution() + ).buildExecution(false) .compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java index 23124545669..10119419215 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java @@ -1,6 +1,5 @@ package org.logstash.config.ir.compiler; -import java.util.Collections; import org.jruby.RubyArray; import org.junit.Test; import org.logstash.Event; @@ -9,6 +8,8 @@ import org.logstash.config.ir.PipelineTestUtil; import org.logstash.ext.JrubyEventExtLibrary; +import java.util.Collections; + import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -24,7 +25,7 @@ public void compilesOutputDataset() { DatasetCompiler.outputDataset( Collections.emptyList(), PipelineTestUtil.buildOutput(events -> {}), - true + true, false ).instantiate().compute(RubyUtil.RUBY.newArray(), false, false), nullValue() );