Skip to content

Commit

Permalink
preserve input event ordering with single pipeline worker
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Aug 6, 2019
1 parent 1882ce0 commit 34bd774
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 40 deletions.
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def start_workers
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
org.logstash.execution.WorkerLoop.new(
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
@flushRequested, @flushing, @shutdownRequested, @drain_queue, pipeline_workers == 1).run
end
@worker_threads << thread
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ public Collection<IRubyObject> inputs() {
/**
* This method contains the actual compilation of the {@link Dataset} representing the
* underlying pipeline from the Queue to the outputs.
* @param orderedEvents When true, generates code to process events in order. Event ordering is
* guaranteed <i>only</i> with a single pipeline worker.
* @return Compiled {@link Dataset} representation of the underlying {@link PipelineIR} topology
*/
public Dataset buildExecution() {
return new CompiledPipeline.CompiledExecution().toDataset();
public Dataset buildExecution(boolean orderedEvents) {
return new CompiledPipeline.CompiledExecution(orderedEvents).toDataset();
}

/**
Expand Down Expand Up @@ -247,8 +249,8 @@ private final class CompiledExecution {

private final Dataset compiled;

CompiledExecution() {
compiled = compile();
CompiledExecution(boolean orderedEvents) {
compiled = compile(orderedEvents);
}

Dataset toDataset() {
Expand All @@ -257,17 +259,19 @@ Dataset toDataset() {

/**
* Instantiates the graph of compiled {@link Dataset}.
* @param orderedEvents When true, generates code to process events in order. Event ordering is
* guaranteed <i>only</i> with a single pipeline worker.
* @return Compiled {@link Dataset} representing the pipeline.
*/
private Dataset compile() {
private Dataset compile(boolean orderedEvents) {
final Collection<Vertex> outputNodes = pipelineIR.getGraph()
.allLeaves().filter(CompiledPipeline.this::isOutput)
.collect(Collectors.toList());
if (outputNodes.isEmpty()) {
return Dataset.IDENTITY;
} else {
return DatasetCompiler.terminalDataset(outputNodes.stream().map(
leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf))
leaf -> outputDataset(leaf, flatten(Collections.emptyList(), leaf), orderedEvents)
).collect(Collectors.toList()));
}
}
Expand Down Expand Up @@ -299,16 +303,20 @@ private Dataset filterDataset(final Vertex vertex, final Collection<Dataset> dat
* the application of the given output.
* @param vertex Vertex of the output to create this {@link Dataset} for
* @param datasets All the datasets that have children passing into this output
* @param orderedEvents When true, generates code to process events in order. Event ordering is
* guaranteed <i>only</i> with a single pipeline worker.
* @return Output {@link Dataset}
*/
private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> datasets) {
private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> datasets,
final boolean orderedEvents) {
final String vertexId = vertex.getId();

if (!plugins.containsKey(vertexId)) {
final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.outputDataset(flatten(datasets, vertex),
DatasetCompiler.outputDataset(flatten(datasets, vertex, orderedEvents),
outputs.get(vertexId),
outputs.size() == 1);
outputs.size() == 1,
orderedEvents);
LOGGER.debug("Compiled output\n {} \n into \n {}", vertex, prepared);
plugins.put(vertexId, prepared.instantiate());
}
Expand All @@ -318,18 +326,20 @@ private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> dat

/**
* Split the given {@link Dataset}s and return the dataset half of their elements that contains
* the {@link JrubyEventExtLibrary.RubyEvent} that fulfil the given {@link EventCondition}.
* the {@link JrubyEventExtLibrary.RubyEvent} that fulfill the given {@link EventCondition}.
* @param datasets Datasets that are the parents of the datasets to split
* @param condition Condition that must be fulfilled
* @param vertex Vertex id to cache the resulting {@link Dataset} under
* @param orderedEvents When true, generates code to process events in order. Event ordering is
* guaranteed <i>only</i> with a single pipeline worker.
* @return The half of the datasets contents that fulfils the condition
*/
private SplitDataset split(final Collection<Dataset> datasets,
final EventCondition condition, final Vertex vertex) {
private SplitDataset split(final Collection<Dataset> datasets, final EventCondition condition,
final Vertex vertex, final boolean orderedEvents) {
final String key = vertex.getId();
SplitDataset conditional = iffs.get(key);
if (conditional == null) {
final Collection<Dataset> dependencies = flatten(datasets, vertex);
final Collection<Dataset> dependencies = flatten(datasets, vertex, orderedEvents);
conditional = iffs.get(key);
// Check that compiling the dependencies did not already instantiate the conditional
// by requiring its else branch.
Expand All @@ -354,39 +364,49 @@ private SplitDataset split(final Collection<Dataset> datasets,
* a {code filter} or and {code if} statement).
* @param datasets Nodes from the last already compiled level
* @param start Vertex to compile children for
* @param orderedEvents When true, generates code to process events in order. Event ordering is
* guaranteed <i>only</i> with a single pipeline worker.
* @return Datasets originating from given {@link Vertex}
*/
private Collection<Dataset> flatten(final Collection<Dataset> datasets,
final Vertex start) {
private Collection<Dataset> flatten(final Collection<Dataset> datasets, final Vertex start,
final boolean orderedEvents) {
final Collection<Dataset> result = compileDependencies(start, datasets,
start.incomingVertices().filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex)
start.incomingVertices().filter(v -> isFilter(v) || isOutput(v) || v instanceof IfVertex),
orderedEvents
);
return result.isEmpty() ? datasets : result;
}

private Collection<Dataset> flatten(final Collection<Dataset> datasets, final Vertex start) {
return flatten(datasets, start, false);
}

/**
* Compiles all child vertices for a given vertex.
* @param datasets Datasets from previous stage
* @param start Start Vertex that got expanded
* @param dependencies Dependencies of {@code start}
* @param orderedEvents When true, generates code to process events in order. Event ordering is
* guaranteed <i>only</i> with a single pipeline worker.
* @return Datasets compiled from vertex children
*/
private Collection<Dataset> compileDependencies(final Vertex start,
final Collection<Dataset> datasets, final Stream<Vertex> dependencies) {
private Collection<Dataset> compileDependencies(final Vertex start, final Collection<Dataset> datasets,
final Stream<Vertex> dependencies, boolean orderedEvents) {
return dependencies.map(
dependency -> {
if (isFilter(dependency)) {
return filterDataset(dependency, datasets);
} else if (isOutput(dependency)) {
return outputDataset(dependency, datasets);
return outputDataset(dependency, datasets, orderedEvents);
} else {
// We know that it's an if vertex since the the input children are either
// output, filter or if in type.
final IfVertex ifvert = (IfVertex) dependency;
final SplitDataset ifDataset = split(
datasets,
conditionalCompiler.buildCondition(ifvert.getBooleanExpression()),
dependency
dependency,
orderedEvents
);
// It is important that we double check that we are actually dealing with the
// positive/left branch of the if condition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.jruby.RubyArray;
import org.jruby.RubyHash;
Expand Down Expand Up @@ -155,13 +156,16 @@ public static Dataset terminalDataset(final Collection<Dataset> parents) {
* @param parents Parent Datasets
* @param output Output Plugin (of Ruby type OutputDelegator)
* @param terminal Set to true if this output is the only output in the pipeline
* @param orderedEvents When true, generates code to process events in order. Event ordering is
* guaranteed <i>only</i> with a single pipeline worker.
* @return Output Dataset
*/
public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<Dataset> parents,
final AbstractOutputDelegatorExt output, final boolean terminal) {
final AbstractOutputDelegatorExt output, final boolean terminal, final boolean orderedEvents) {
final ClassFields fields = new ClassFields();
final Closure clearSyntax;
final Closure computeSyntax;

if (parents.isEmpty()) {
clearSyntax = Closure.EMPTY;
computeSyntax = Closure.wrap(invokeOutput(fields.add(output), BATCH_ARG));
Expand All @@ -181,7 +185,7 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<D
final ValueSyntaxElement inputBuffer = fields.add(buffer);
computeSyntax = withInputBuffering(
Closure.wrap(invokeOutput(fields.add(output), inputBuffer), inlineClear),
parentFields, inputBuffer
parentFields, inputBuffer, orderedEvents
);
}
return compileOutput(computeSyntax, clearSyntax, fields);
Expand Down Expand Up @@ -235,15 +239,32 @@ private static ComputeStepSyntaxElement<Dataset> prepare(final DatasetCompiler.C
* @param compute Closure to execute
* @param parents Parents to buffer results for
* @param inputBuffer Buffer to store results in
* @param sorted When true, generates code to sort events by sequence number before sending
* to outputs in order to preserve input event order. Event ordering is
* guaranteed <i>only</i> with a single pipeline worker.
* @return Closure wrapped by buffering parent results and clearing them
*/
private static Closure withInputBuffering(final Closure compute,
final Collection<ValueSyntaxElement> parents, final ValueSyntaxElement inputBuffer) {
private static Closure withInputBuffering(final Closure compute, final Collection<ValueSyntaxElement> parents,
final ValueSyntaxElement inputBuffer, final boolean sorted) {
return Closure.wrap(
parents.stream().map(par -> SyntaxFactory.value("org.logstash.config.ir.compiler.Utils")
.call("copyNonCancelledEvents", computeDataset(par), inputBuffer)
).toArray(MethodLevelSyntaxElement[]::new)
).add(compute).add(clear(inputBuffer));
.call("copyNonCancelledEvents", computeDataset(par), inputBuffer))
.toArray(MethodLevelSyntaxElement[]::new))
.add(sorted ? eventSorter(inputBuffer) : Closure.EMPTY)
.add(compute)
.add(clear(inputBuffer));
}

private static Closure withInputBuffering(final Closure compute, final Collection<ValueSyntaxElement> parents,
final ValueSyntaxElement inputBuffer) {
return withInputBuffering(compute, parents, inputBuffer, false);
}

private static Closure eventSorter(ValueSyntaxElement inputBuffer) {
return Closure.wrap(SyntaxFactory.value("java.util.Collections")
.call("sort",
inputBuffer,
SyntaxFactory.value("new org.logstash.config.ir.compiler.Utils.EventSequenceComparator()")));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.logstash.ext.JrubyEventExtLibrary;

import java.util.Collection;
import java.util.Comparator;
import java.util.List;

/**
Expand Down Expand Up @@ -32,4 +33,16 @@ public static void filterEvents(Collection<JrubyEventExtLibrary.RubyEvent> input
}
}

/**
* Comparator for events based on the sequence of their instantiation. Used to maintain input event
* ordering with a single pipeline worker.
*/
public static class EventSequenceComparator implements Comparator<JrubyEventExtLibrary.RubyEvent> {

@Override
public int compare(JrubyEventExtLibrary.RubyEvent o1, JrubyEventExtLibrary.RubyEvent o2) {
return Long.compare(o1.sequence(), o2.sequence());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ public final class WorkerLoop implements Runnable {
public WorkerLoop(final CompiledPipeline pipeline, final QueueReadClient readClient,
final LongAdder filteredCounter, final LongAdder consumedCounter,
final AtomicBoolean flushRequested, final AtomicBoolean flushing,
final AtomicBoolean shutdownRequested, final boolean drainQueue) {
final AtomicBoolean shutdownRequested, final boolean drainQueue,
final boolean orderedEvents) {
this.consumedCounter = consumedCounter;
this.filteredCounter = filteredCounter;
this.execution = pipeline.buildExecution();
this.execution = pipeline.buildExecution(orderedEvents);
this.drainQueue = drainQueue;
this.readClient = readClient;
this.flushRequested = flushRequested;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public static final class RubyEvent extends RubyObject {
*/
private final int hash = nextHash();

private final long sequence = SEQUENCE_GENERATOR.get();

private Event event;

public RubyEvent(final Ruby runtime, final RubyClass klass) {
Expand Down Expand Up @@ -277,6 +279,10 @@ public IRubyObject ruby_set_timestamp(ThreadContext context, IRubyObject value)
return value;
}

public long sequence() {
return sequence;
}

@Override
public int hashCode() {
return hash;
Expand Down
Loading

0 comments on commit 34bd774

Please sign in to comment.