From dbca0b36a17f1e4f14caf2620ebc5ae643b57375 Mon Sep 17 00:00:00 2001
From: Colin Surprenant
Date: Fri, 27 Mar 2020 15:36:23 -0400
Subject: [PATCH] separate filter & output execution, rebatch after filter when
ordered (#11710)
---
.../lib/logstash/config/lir_serializer.rb | 8 +
.../logstash/ackedqueue/AckedReadBatch.java | 36 ++--
.../logstash/config/ir/CompiledPipeline.java | 157 ++++++++++++++----
.../org/logstash/config/ir/PipelineIR.java | 6 +
.../config/ir/compiler/DatasetCompiler.java | 133 +++++++++++----
.../org/logstash/config/ir/graph/Graph.java | 2 +-
.../config/ir/graph/SeparatorVertex.java | 64 +++++++
.../logstash/execution/MemoryReadBatch.java | 29 ++--
.../org/logstash/execution/QueueBatch.java | 8 +-
.../org/logstash/execution/WorkerLoop.java | 27 +--
10 files changed, 351 insertions(+), 119 deletions(-)
create mode 100644 logstash-core/src/main/java/org/logstash/config/ir/graph/SeparatorVertex.java
diff --git a/logstash-core/lib/logstash/config/lir_serializer.rb b/logstash-core/lib/logstash/config/lir_serializer.rb
index e19049eef46..9826d3217f7 100644
--- a/logstash-core/lib/logstash/config/lir_serializer.rb
+++ b/logstash-core/lib/logstash/config/lir_serializer.rb
@@ -63,6 +63,8 @@ def vertex(v)
if_vertex(v)
when :queue
queue_vertex(v)
+ when :separator
+ separator_vertex(v)
end
decorate_vertex(v, hashified_vertex)
@@ -75,6 +77,8 @@ def vertex_type(v)
:if
elsif v.java_kind_of?(org.logstash.config.ir.graph.QueueVertex)
:queue
+ elsif v.java_kind_of?(org.logstash.config.ir.graph.SeparatorVertex)
+ :separator
else
raise "Unexpected vertex type! #{v}"
end
@@ -106,6 +110,10 @@ def queue_vertex(v)
{}
end
+ def separator_vertex(v)
+ {}
+ end
+
def edge(e)
e_json = {
"from" => e.from.id,
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java
index dbc8c686a06..74a2dafec8d 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java
@@ -22,12 +22,10 @@
import org.jruby.RubyArray;
import org.jruby.RubyHash;
-import org.jruby.runtime.ThreadContext;
-import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.execution.MemoryReadBatch;
import org.logstash.execution.QueueBatch;
-import org.logstash.ext.JrubyEventExtLibrary;
+import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
import java.io.IOException;
import java.util.Collection;
@@ -42,12 +40,19 @@ public final class AckedReadBatch implements QueueBatch {
private RubyHash generated;
- public static AckedReadBatch create(final JRubyAckedQueueExt queue, final int size,
- final long timeout) {
+ public static AckedReadBatch create(
+ final JRubyAckedQueueExt queue,
+ final int size,
+ final long timeout)
+ {
return new AckedReadBatch(queue, size, timeout);
}
- private AckedReadBatch(final JRubyAckedQueueExt queue, final int size, final long timeout) {
+ private AckedReadBatch(
+ final JRubyAckedQueueExt queue,
+ final int size,
+ final long timeout)
+ {
AckedBatch batch;
try {
batch = queue.readBatch(size, timeout);
@@ -65,7 +70,7 @@ private AckedReadBatch(final JRubyAckedQueueExt queue, final int size, final lon
}
@Override
- public void merge(final IRubyObject event) {
+ public void merge(final RubyEvent event) {
if (!event.isNil() && !originals.containsKey(event)) {
generated.put(event, RUBY.getTrue());
}
@@ -75,14 +80,12 @@ public void merge(final IRubyObject event) {
@Override
public RubyArray to_a() {
final RubyArray result = RUBY.newArray(filteredSize());
- for (final JrubyEventExtLibrary.RubyEvent event
- : (Collection) originals.keys()) {
+ for (final RubyEvent event : (Collection) originals.keys()) {
if (!MemoryReadBatch.isCancelled(event)) {
result.append(event);
}
}
- for (final JrubyEventExtLibrary.RubyEvent event
- : (Collection) generated.keys()) {
+ for (final RubyEvent event : (Collection) generated.keys()) {
if (!MemoryReadBatch.isCancelled(event)) {
result.append(event);
}
@@ -90,6 +93,17 @@ public RubyArray to_a() {
return result;
}
+ @SuppressWarnings({"unchecked"})
+ @Override
+ public Collection collection() {
+ // This only returns the originals and does not filter cancelled one
+ // because it is only used in the WorkerLoop where only originals
+ // non-cancelled exists. We should revisit this AckedReadBatch
+ // implementation and get rid of this dual original/generated idea.
+ // The MemoryReadBatch does not use such a strategy.
+ return originals.directKeySet();
+ }
+
public void close() throws IOException {
if (ackedBatch != null) {
ackedBatch.close();
diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java
index 7ea3f636e55..015cbed05b8 100644
--- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java
+++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java
@@ -17,12 +17,12 @@
* under the License.
*/
-
package org.logstash.config.ir;
import co.elastic.logstash.api.Codec;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.jruby.RubyArray;
import org.jruby.RubyHash;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
@@ -38,11 +38,13 @@
import org.logstash.config.ir.compiler.EventCondition;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.config.ir.compiler.SplitDataset;
+import org.logstash.config.ir.graph.SeparatorVertex;
import org.logstash.config.ir.graph.IfVertex;
import org.logstash.config.ir.graph.PluginVertex;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.config.ir.imperative.PluginStatement;
-import org.logstash.ext.JrubyEventExtLibrary;
+import org.logstash.execution.QueueBatch;
+import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.secret.store.SecretStore;
@@ -57,6 +59,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.logstash.config.ir.compiler.Utils.copyNonCancelledEvents;
+
/**
* Compiled Logstash Pipeline Configuration.
* This class represents an executable pipeline, compiled from the configured topology that is
@@ -136,14 +140,27 @@ public Collection inputs() {
}
/**
- * This method contains the actual compilation of the {@link Dataset} representing the
- * underlying pipeline from the Queue to the outputs.
- * @return Compiled {@link Dataset} representation of the underlying {@link PipelineIR} topology
+ * Perform the actual compilation of the {@link Dataset} representing the
+ * underlying pipeline from the Queue to the outputs using the
+ * unordered execution model.
+ * @return CompiledPipeline.CompiledExecution the compiled pipeline
*/
- public Dataset buildExecution() {
- return new CompiledPipeline.CompiledExecution().toDataset();
+ public CompiledPipeline.CompiledExecution buildExecution() {
+ return buildExecution(false);
}
+ /**
+ * Perform the actual compilation of the {@link Dataset} representing the
+ * underlying pipeline from the Queue to the outputs using the ordered or
+ * unordered execution model.
+ * @param orderedExecution determines whether to build an execution that enforces order or not
+ * @return CompiledPipeline.CompiledExecution the compiled pipeline
+ */
+ public CompiledPipeline.CompiledExecution buildExecution(boolean orderedExecution) {
+ return orderedExecution
+ ? new CompiledPipeline.CompiledOrderedExecution()
+ : new CompiledPipeline.CompiledUnorderedExecution();
+ }
/**
* Sets up all outputs learned from {@link PipelineIR}.
@@ -294,12 +311,58 @@ private boolean isOutput(final Vertex vertex) {
return outputs.containsKey(vertex.getId());
}
+ public final class CompiledOrderedExecution extends CompiledExecution {
+
+ @Override
+ public void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
+ compute(batch.collection(), flush, shutdown);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public void compute(final RubyArray batch, final boolean flush, final boolean shutdown) {
+ compute((Collection) batch, flush, shutdown);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private void compute(final Collection batch, final boolean flush, final boolean shutdown) {
+ final RubyArray outputBatch = RubyUtil.RUBY.newArray();
+ // send batch one-by-one as single-element batches down the filters
+ final RubyArray filterBatch = RubyUtil.RUBY.newArray(1);
+ for (final RubyEvent e : batch) {
+ filterBatch.set(0, e);
+ final Collection result = compiledFilters.compute(filterBatch, flush, shutdown);
+ copyNonCancelledEvents(result, outputBatch);
+ compiledFilters.clear();
+ }
+ compiledOutputs.compute(outputBatch, flush, shutdown);
+ }
+ }
+
+ public final class CompiledUnorderedExecution extends CompiledExecution {
+
+ @Override
+ public void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
+ compute(batch.to_a(), flush, shutdown);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public void compute(final RubyArray batch, final boolean flush, final boolean shutdown) {
+ final RubyArray outputBatch = RubyUtil.RUBY.newArray();
+ final Collection result = compiledFilters.compute(batch, flush, shutdown);
+ copyNonCancelledEvents(result, outputBatch);
+ compiledFilters.clear();
+ compiledOutputs.compute(outputBatch, flush, shutdown);
+ }
+ }
+
/**
* Instances of this class represent a fully compiled pipeline execution. Note that this class
* has a separate lifecycle from {@link CompiledPipeline} because it holds per (worker-thread)
* state and thus needs to be instantiated once per thread.
*/
- private final class CompiledExecution {
+ public abstract class CompiledExecution {
/**
* Compiled {@link IfVertex, indexed by their ID as returned by {@link Vertex#getId()}.
@@ -312,35 +375,51 @@ private final class CompiledExecution {
*/
private final Map plugins = new HashMap<>(50);
- private final Dataset compiled;
+ protected final Dataset compiledFilters;
+ protected final Dataset compiledOutputs;
CompiledExecution() {
- compiled = compile();
+ compiledFilters = compileFilters();
+ compiledOutputs = compileOutputs();
}
- Dataset toDataset() {
- return compiled;
+ public abstract void compute(final QueueBatch batch, final boolean flush, final boolean shutdown);
+
+ @SuppressWarnings({"rawtypes"})
+ public abstract void compute(final RubyArray batch, final boolean flush, final boolean shutdown);
+
+ /**
+ * Instantiates the graph of compiled filter section {@link Dataset}.
+ * @return Compiled {@link Dataset} representing the filter section of the pipeline.
+ */
+ private Dataset compileFilters() {
+ final Vertex separator = pipelineIR.getGraph()
+ .vertices()
+ .filter(v -> v instanceof SeparatorVertex)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Missing Filter End Vertex"));
+ return DatasetCompiler.terminalFilterDataset(flatten(Collections.emptyList(), separator));
}
/**
- * Instantiates the graph of compiled {@link Dataset}.
- * @return Compiled {@link Dataset} representing the pipeline.
+ * Instantiates the graph of compiled output section {@link Dataset}.
+ * @return Compiled {@link Dataset} representing the output section of the pipeline.
*/
- private Dataset compile() {
+ private Dataset compileOutputs() {
final Collection outputNodes = pipelineIR.getGraph()
.allLeaves().filter(CompiledPipeline.this::isOutput)
.collect(Collectors.toList());
if (outputNodes.isEmpty()) {
return Dataset.IDENTITY;
} else {
- return DatasetCompiler.terminalDataset(outputNodes.stream().map(
- leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf))
- ).collect(Collectors.toList()));
+ return DatasetCompiler.terminalOutputDataset(outputNodes.stream()
+ .map(leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf)))
+ .collect(Collectors.toList()));
}
}
/**
- * Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
+ * Build a {@link Dataset} representing the {@link RubyEvent}s after
* the application of the given filter.
* @param vertex Vertex of the filter to create this {@link Dataset} for
* @param datasets All the datasets that have children passing into this filter
@@ -353,7 +432,8 @@ private Dataset filterDataset(final Vertex vertex, final Collection dat
final ComputeStepSyntaxElement prepared =
DatasetCompiler.filterDataset(
flatten(datasets, vertex),
- filters.get(vertexId));
+ filters.get(vertexId)
+ );
LOGGER.debug("Compiled filter\n {} \n into \n {}", vertex, prepared);
plugins.put(vertexId, prepared.instantiate());
@@ -363,7 +443,7 @@ private Dataset filterDataset(final Vertex vertex, final Collection dat
}
/**
- * Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
+ * Build a {@link Dataset} representing the {@link RubyEvent}s after
* the application of the given output.
* @param vertex Vertex of the output to create this {@link Dataset} for
* @param datasets All the datasets that have children passing into this output
@@ -377,7 +457,8 @@ private Dataset outputDataset(final Vertex vertex, final Collection dat
DatasetCompiler.outputDataset(
flatten(datasets, vertex),
outputs.get(vertexId),
- outputs.size() == 1);
+ outputs.size() == 1
+ );
LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared);
plugins.put(vertexId, prepared.instantiate());
@@ -388,14 +469,17 @@ private Dataset outputDataset(final Vertex vertex, final Collection dat
/**
* Split the given {@link Dataset}s and return the dataset half of their elements that contains
- * the {@link JrubyEventExtLibrary.RubyEvent} that fulfil the given {@link EventCondition}.
+ * the {@link RubyEvent} that fulfil the given {@link EventCondition}.
* @param datasets Datasets that are the parents of the datasets to split
* @param condition Condition that must be fulfilled
* @param vertex Vertex id to cache the resulting {@link Dataset} under
* @return The half of the datasets contents that fulfils the condition
*/
- private SplitDataset split(final Collection datasets,
- final EventCondition condition, final Vertex vertex) {
+ private SplitDataset split(
+ final Collection datasets,
+ final EventCondition condition,
+ final Vertex vertex)
+ {
final String vertexId = vertex.getId();
SplitDataset conditional = iffs.get(vertexId);
if (conditional == null) {
@@ -425,9 +509,13 @@ private SplitDataset split(final Collection datasets,
* @param start Vertex to compile children for
* @return Datasets originating from given {@link Vertex}
*/
- private Collection flatten(final Collection datasets,
- final Vertex start) {
- final Collection result = compileDependencies(start, datasets,
+ private Collection flatten(
+ final Collection datasets,
+ final Vertex start)
+ {
+ final Collection result = compileDependencies(
+ start,
+ datasets,
start.incomingVertices().filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex)
);
return result.isEmpty() ? datasets : result;
@@ -440,8 +528,11 @@ private Collection flatten(final Collection datasets,
* @param dependencies Dependencies of {@code start}
* @return Datasets compiled from vertex children
*/
- private Collection compileDependencies(final Vertex start,
- final Collection datasets, final Stream dependencies) {
+ private Collection compileDependencies(
+ final Vertex start,
+ final Collection datasets,
+ final Stream dependencies)
+ {
return dependencies.map(
dependency -> {
if (isFilter(dependency)) {
@@ -460,13 +551,15 @@ private Collection compileDependencies(final Vertex start,
// It is important that we double check that we are actually dealing with the
// positive/left branch of the if condition
if (ifvert.outgoingBooleanEdgesByType(true)
- .anyMatch(edge -> Objects.equals(edge.getTo(), start))) {
+ .anyMatch(edge -> Objects.equals(edge.getTo(), start)))
+ {
return ifDataset;
} else {
return ifDataset.right();
}
}
- }).collect(Collectors.toList());
+ }
+ ).collect(Collectors.toList());
}
}
}
diff --git a/logstash-core/src/main/java/org/logstash/config/ir/PipelineIR.java b/logstash-core/src/main/java/org/logstash/config/ir/PipelineIR.java
index f7b89aad000..0c0b15ef277 100644
--- a/logstash-core/src/main/java/org/logstash/config/ir/PipelineIR.java
+++ b/logstash-core/src/main/java/org/logstash/config/ir/PipelineIR.java
@@ -28,6 +28,7 @@
import org.logstash.config.ir.graph.PluginVertex;
import org.logstash.config.ir.graph.QueueVertex;
import org.logstash.config.ir.graph.Vertex;
+import org.logstash.config.ir.graph.SeparatorVertex;
public final class PipelineIR implements Hashable {
@@ -42,7 +43,9 @@ public QueueVertex getQueue() {
}
private final Graph graph;
+
private final QueueVertex queue;
+
// Temporary until we have LIR execution
// Then we will no longer need this property here
private final String originalSource;
@@ -63,6 +66,9 @@ public PipelineIR(Graph inputSection, Graph filterSection, Graph outputSection,
// Now we connect the queue to the root of the filter section
tempGraph = tempGraph.chain(filterSection);
+ // Connect the filter section to the filter end vertex to separate from the output section
+ tempGraph = tempGraph.chain(new SeparatorVertex("filter_to_output"));
+
// Finally, connect the filter out node to all the outputs
this.graph = tempGraph.chain(outputSection);
diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java
index 74bd7eb4639..2e30d678ca7 100644
--- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java
+++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java
@@ -53,8 +53,10 @@ private DatasetCompiler() {
// Utility Class
}
- public static ComputeStepSyntaxElement splitDataset(final Collection parents,
- final EventCondition condition) {
+ public static ComputeStepSyntaxElement splitDataset(
+ final Collection parents,
+ final EventCondition condition)
+ {
final ClassFields fields = new ClassFields();
final ValueSyntaxElement ifData = fields.add(new ArrayList<>());
final ValueSyntaxElement elseData = fields.add(new ArrayList<>());
@@ -104,8 +106,10 @@ public static ComputeStepSyntaxElement splitDataset(final Collecti
* @param plugin Filter Plugin
* @return Dataset representing the filter plugin
*/
- public static ComputeStepSyntaxElement filterDataset(final Collection parents,
- final AbstractFilterDelegatorExt plugin) {
+ public static ComputeStepSyntaxElement filterDataset(
+ final Collection parents,
+ final AbstractFilterDelegatorExt plugin)
+ {
final ClassFields fields = new ClassFields();
final ValueSyntaxElement outputBuffer = fields.add(new ArrayList<>());
final Closure clear = Closure.wrap();
@@ -113,10 +117,12 @@ public static ComputeStepSyntaxElement filterDataset(final Collection parentFields =
- parents.stream().map(fields::add).collect(Collectors.toList());
- @SuppressWarnings("rawtypes")
- final RubyArray inputBuffer = RubyUtil.RUBY.newArray();
+ final Collection parentFields = parents
+ .stream()
+ .map(fields::add)
+ .collect(Collectors.toList()
+ );
+ @SuppressWarnings("rawtypes") final RubyArray inputBuffer = RubyUtil.RUBY.newArray();
clear.add(clearSyntax(parentFields));
final ValueSyntaxElement inputBufferField = fields.add(inputBuffer);
compute = withInputBuffering(
@@ -128,37 +134,82 @@ public static ComputeStepSyntaxElement filterDataset(final CollectionBuilds a terminal {@link Dataset} from the given parent {@link Dataset}s.
+ * Builds a terminal {@link Dataset} for the filters from the given parent {@link Dataset}s.
* If the given set of parent {@link Dataset} is empty the sum is defined as the
* trivial dataset that does not invoke any computation whatsoever.
* {@link Dataset#compute(RubyArray, boolean, boolean)} is always
* {@link Collections#emptyList()}.
- * @param parents Parent {@link Dataset} to sum and terminate
+ * @param parents Parent {@link Dataset} to sum
* @return Dataset representing the sum of given parent {@link Dataset}
*/
- public static Dataset terminalDataset(final Collection parents) {
+ public static Dataset terminalFilterDataset(final Collection parents) {
+ if (parents.isEmpty()) {
+ return Dataset.IDENTITY;
+ }
+
final int count = parents.size();
- final Dataset result;
- if (count > 1) {
- final ClassFields fields = new ClassFields();
- final Collection parentFields =
- parents.stream().map(fields::add).collect(Collectors.toList());
- result = compileOutput(
- Closure.wrap(
- parentFields.stream().map(DatasetCompiler::computeDataset)
- .toArray(MethodLevelSyntaxElement[]::new)
- ).add(clearSyntax(parentFields)), Closure.EMPTY, fields
- ).instantiate();
- } else if (count == 1) {
+ if (count == 1) {
// No need for a terminal dataset here, if there is only a single parent node we can
// call it directly.
- result = parents.iterator().next();
- } else {
+ return parents.iterator().next();
+ }
+
+ final ClassFields fields = new ClassFields();
+ final Collection parentFields = parents
+ .stream()
+ .map(fields::add)
+ .collect(Collectors.toList());
+ @SuppressWarnings("rawtypes") final RubyArray inputBuffer = RubyUtil.RUBY.newArray();
+ final ValueSyntaxElement inputBufferField = fields.add(inputBuffer);
+ final ValueSyntaxElement outputBufferField = fields.add(new ArrayList<>());
+ final Closure clear = Closure.wrap().add(clearSyntax(parentFields));
+ final Closure compute = withInputBuffering(
+ Closure.wrap(
+ // pass thru filter
+ buffer(outputBufferField, inputBufferField)
+ ),
+ parentFields,
+ inputBufferField
+ );
+
+ return prepare(withOutputBuffering(compute, clear, outputBufferField, fields)).instantiate();
+ }
+
+ /**
+ * Builds a terminal {@link Dataset} for the outputs from the given parent {@link Dataset}s.
+ * If the given set of parent {@link Dataset} is empty the sum is defined as the
+ * trivial dataset that does not invoke any computation whatsoever.
+ * {@link Dataset#compute(RubyArray, boolean, boolean)} is always
+ * {@link Collections#emptyList()}.
+ * @param parents Parent {@link Dataset} to sum and terminate
+ * @return Dataset representing the sum of given parent {@link Dataset}
+ */
+ public static Dataset terminalOutputDataset(final Collection parents) {
+ if (parents.isEmpty()) {
throw new IllegalArgumentException(
- "Cannot create Terminal Dataset for an empty number of parent datasets"
+ "Cannot create terminal output dataset for an empty number of parent datasets"
);
}
- return result;
+
+ final int count = parents.size();
+ if (count == 1) {
+ // No need for a terminal dataset here, if there is only a single parent node we can
+ // call it directly.
+ return parents.iterator().next();
+ }
+
+ final ClassFields fields = new ClassFields();
+ final Collection parentFields = parents
+ .stream()
+ .map(fields::add)
+ .collect(Collectors.toList());
+ final Closure compute = Closure.wrap(parentFields
+ .stream()
+ .map(DatasetCompiler::computeDataset)
+ .toArray(MethodLevelSyntaxElement[]::new)
+ ).add(clearSyntax(parentFields));
+
+ return compileOutput(compute, Closure.EMPTY, fields).instantiate();
}
/**
@@ -177,8 +228,11 @@ public static Dataset terminalDataset(final Collection parents) {
* @param terminal Set to true if this output is the only output in the pipeline
* @return Output Dataset
*/
- public static ComputeStepSyntaxElement outputDataset(final Collection parents,
- final AbstractOutputDelegatorExt output, final boolean terminal) {
+ public static ComputeStepSyntaxElement outputDataset(
+ final Collection parents,
+ final AbstractOutputDelegatorExt output,
+ final boolean terminal)
+ {
final ClassFields fields = new ClassFields();
final Closure clearSyntax;
final Closure computeSyntax;
@@ -215,14 +269,19 @@ public static ComputeStepSyntaxElement outputDataset(final Collection compileOutput(final Closure syn
);
}
- private static MethodLevelSyntaxElement buffer(final ValueSyntaxElement resultBuffer,
- final ValueSyntaxElement argument) {
+ private static MethodLevelSyntaxElement buffer(
+ final ValueSyntaxElement resultBuffer,
+ final ValueSyntaxElement argument)
+ {
return resultBuffer.call("addAll", argument);
}
diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java
index f41558ad7b1..e94aa1e3a58 100644
--- a/logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java
+++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/Graph.java
@@ -432,7 +432,7 @@ public Stream edges() {
public String uniqueHash() {
return Util.digest(this.vertices().
- filter(v -> !(v instanceof QueueVertex)). // has no metadata
+ filter(v -> !(v instanceof QueueVertex) && !(v instanceof SeparatorVertex)). // has no metadata
map(Vertex::getSourceWithMetadata).
map(SourceWithMetadata::uniqueHash).
sorted().
diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/SeparatorVertex.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/SeparatorVertex.java
new file mode 100644
index 00000000000..cb9a753ddba
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/SeparatorVertex.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.logstash.config.ir.graph;
+
+import org.logstash.common.IncompleteSourceWithMetadataException;
+import org.logstash.common.Util;
+import org.logstash.config.ir.SourceComponent;
+import org.logstash.common.SourceWithMetadata;
+
+public final class SeparatorVertex extends Vertex {
+
+ public SeparatorVertex(String id) throws IncompleteSourceWithMetadataException {
+ super(new SourceWithMetadata("internal", id, 0,0,"separator"), id);
+ }
+
+ @Override
+ public String toString() {
+ return this.getId();
+ }
+
+ @Override
+ public SeparatorVertex copy() {
+ try {
+ return new SeparatorVertex(this.getId());
+ } catch (IncompleteSourceWithMetadataException e) {
+ // Never happens
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean sourceComponentEquals(SourceComponent other) {
+ return other instanceof SeparatorVertex && ((SeparatorVertex)other).getId() == this.getId();
+ }
+
+ // Special vertices really have no metadata
+ @Override
+ public SourceWithMetadata getSourceWithMetadata() {
+ return null;
+ }
+
+ @Override
+ public String uniqueHash() {
+ return Util.digest("SEPARATOR_" + this.getId());
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/execution/MemoryReadBatch.java b/logstash-core/src/main/java/org/logstash/execution/MemoryReadBatch.java
index 7764fd4afa6..d7713a119d2 100644
--- a/logstash-core/src/main/java/org/logstash/execution/MemoryReadBatch.java
+++ b/logstash-core/src/main/java/org/logstash/execution/MemoryReadBatch.java
@@ -17,30 +17,27 @@
* under the License.
*/
-
package org.logstash.execution;
import org.jruby.RubyArray;
-import org.jruby.runtime.builtin.IRubyObject;
-import org.logstash.ext.JrubyEventExtLibrary;
-
+import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
+import java.util.Collection;
import java.util.LinkedHashSet;
-
import static org.logstash.RubyUtil.RUBY;
public final class MemoryReadBatch implements QueueBatch {
- private final LinkedHashSet events;
+ private final LinkedHashSet events;
- public MemoryReadBatch(final LinkedHashSet events) {
+ public MemoryReadBatch(final LinkedHashSet events) {
this.events = events;
}
- public static boolean isCancelled(final IRubyObject event) {
- return ((JrubyEventExtLibrary.RubyEvent) event).getEvent().isCancelled();
+ public static boolean isCancelled(final RubyEvent event) {
+ return event.getEvent().isCancelled();
}
- public static MemoryReadBatch create(LinkedHashSet events) {
+ public static MemoryReadBatch create(LinkedHashSet events) {
return new MemoryReadBatch(events);
}
@@ -52,7 +49,7 @@ public static MemoryReadBatch create() {
@SuppressWarnings({"rawtypes"})
public RubyArray to_a() {
final RubyArray result = RUBY.newArray(events.size());
- for (final IRubyObject event : events) {
+ for (final RubyEvent event : events) {
if (!isCancelled(event)) {
result.append(event);
}
@@ -61,7 +58,15 @@ public RubyArray to_a() {
}
@Override
- public void merge(final IRubyObject event) {
+ public Collection collection() {
+ // This does not filter cancelled events because it is
+ // only used in the WorkerLoop where there are no cancelled
+ // events yet.
+ return events;
+ }
+
+ @Override
+ public void merge(final RubyEvent event) {
events.add(event);
}
diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueBatch.java b/logstash-core/src/main/java/org/logstash/execution/QueueBatch.java
index 1ae819029c1..d303ff40428 100644
--- a/logstash-core/src/main/java/org/logstash/execution/QueueBatch.java
+++ b/logstash-core/src/main/java/org/logstash/execution/QueueBatch.java
@@ -17,17 +17,17 @@
* under the License.
*/
-
package org.logstash.execution;
import org.jruby.RubyArray;
-import org.jruby.runtime.builtin.IRubyObject;
-
+import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;
import java.io.IOException;
+import java.util.Collection;
public interface QueueBatch {
int filteredSize();
@SuppressWarnings({"rawtypes"}) RubyArray to_a();
- void merge(IRubyObject event);
+ Collection collection();
+ void merge(RubyEvent event);
void close() throws IOException;
}
diff --git a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java
index 5c1266e3ac0..16dd457d84e 100644
--- a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java
+++ b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java
@@ -17,24 +17,19 @@
* under the License.
*/
-
package org.logstash.execution;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.jruby.RubyArray;
-import org.jruby.runtime.ThreadContext;
-import org.logstash.RubyUtil;
import org.logstash.config.ir.CompiledPipeline;
-import org.logstash.config.ir.compiler.Dataset;
public final class WorkerLoop implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(WorkerLoop.class);
- private final Dataset execution;
+ private final CompiledPipeline.CompiledExecution execution;
private final QueueReadClient readClient;
@@ -65,7 +60,7 @@ public WorkerLoop(
{
this.consumedCounter = consumedCounter;
this.filteredCounter = filteredCounter;
- this.execution = pipeline.buildExecution();
+ this.execution = pipeline.buildExecution(preserveEventOrder);
this.drainQueue = drainQueue;
this.readClient = readClient;
this.flushRequested = flushRequested;
@@ -84,7 +79,7 @@ public void run() {
consumedCounter.add(batch.filteredSize());
final boolean isFlush = flushRequested.compareAndSet(true, false);
readClient.startMetrics(batch);
- compute(batch, isFlush, false);
+ execution.compute(batch, isFlush, false);
int filteredCount = batch.filteredSize();
filteredCounter.add(filteredCount);
readClient.addOutputMetrics(filteredCount);
@@ -98,7 +93,7 @@ public void run() {
//for this we need to create a new empty batch to contain the final flushed events
final QueueBatch batch = readClient.newBatch();
readClient.startMetrics(batch);
- compute(batch, true, true);
+ execution.compute(batch, true, true);
readClient.closeBatch(batch);
} catch (final Exception ex) {
LOGGER.error(
@@ -109,20 +104,6 @@ public void run() {
}
}
- @SuppressWarnings("unchecked")
- private void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
- if (preserveEventOrder) {
- // send batch events one-by-one as single-element batches
- @SuppressWarnings({"rawtypes"}) final RubyArray singleElementBatch = RubyUtil.RUBY.newArray(1);
- batch.to_a().forEach((e) -> {
- singleElementBatch.set(0, e);
- execution.compute(singleElementBatch, flush, shutdown);
- });
- } else {
- execution.compute(batch.to_a(), flush, shutdown);
- }
- }
-
private boolean isDraining() {
return drainQueue && !readClient.isEmpty();
}