Skip to content

Commit

Permalink
separate filter & output execution, rebatch after filter when ordered (
Browse files Browse the repository at this point in the history
  • Loading branch information
colinsurprenant authored Mar 27, 2020
1 parent aa93166 commit dbca0b3
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 119 deletions.
8 changes: 8 additions & 0 deletions logstash-core/lib/logstash/config/lir_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def vertex(v)
if_vertex(v)
when :queue
queue_vertex(v)
when :separator
separator_vertex(v)
end

decorate_vertex(v, hashified_vertex)
Expand All @@ -75,6 +77,8 @@ def vertex_type(v)
:if
elsif v.java_kind_of?(org.logstash.config.ir.graph.QueueVertex)
:queue
elsif v.java_kind_of?(org.logstash.config.ir.graph.SeparatorVertex)
:separator
else
raise "Unexpected vertex type! #{v}"
end
Expand Down Expand Up @@ -106,6 +110,10 @@ def queue_vertex(v)
{}
end

def separator_vertex(v)
{}
end

def edge(e)
e_json = {
"from" => e.from.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@

import org.jruby.RubyArray;
import org.jruby.RubyHash;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.execution.MemoryReadBatch;
import org.logstash.execution.QueueBatch;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -42,12 +40,19 @@ public final class AckedReadBatch implements QueueBatch {

private RubyHash generated;

public static AckedReadBatch create(final JRubyAckedQueueExt queue, final int size,
final long timeout) {
public static AckedReadBatch create(
final JRubyAckedQueueExt queue,
final int size,
final long timeout)
{
return new AckedReadBatch(queue, size, timeout);
}

private AckedReadBatch(final JRubyAckedQueueExt queue, final int size, final long timeout) {
private AckedReadBatch(
final JRubyAckedQueueExt queue,
final int size,
final long timeout)
{
AckedBatch batch;
try {
batch = queue.readBatch(size, timeout);
Expand All @@ -65,7 +70,7 @@ private AckedReadBatch(final JRubyAckedQueueExt queue, final int size, final lon
}

@Override
public void merge(final IRubyObject event) {
public void merge(final RubyEvent event) {
if (!event.isNil() && !originals.containsKey(event)) {
generated.put(event, RUBY.getTrue());
}
Expand All @@ -75,21 +80,30 @@ public void merge(final IRubyObject event) {
@Override
public RubyArray to_a() {
final RubyArray result = RUBY.newArray(filteredSize());
for (final JrubyEventExtLibrary.RubyEvent event
: (Collection<JrubyEventExtLibrary.RubyEvent>) originals.keys()) {
for (final RubyEvent event : (Collection<RubyEvent>) originals.keys()) {
if (!MemoryReadBatch.isCancelled(event)) {
result.append(event);
}
}
for (final JrubyEventExtLibrary.RubyEvent event
: (Collection<JrubyEventExtLibrary.RubyEvent>) generated.keys()) {
for (final RubyEvent event : (Collection<RubyEvent>) generated.keys()) {
if (!MemoryReadBatch.isCancelled(event)) {
result.append(event);
}
}
return result;
}

@SuppressWarnings({"unchecked"})
@Override
public Collection<RubyEvent> collection() {
// This only returns the originals and does not filter cancelled one
// because it is only used in the WorkerLoop where only originals
// non-cancelled exists. We should revisit this AckedReadBatch
// implementation and get rid of this dual original/generated idea.
// The MemoryReadBatch does not use such a strategy.
return originals.directKeySet();
}

public void close() throws IOException {
if (ackedBatch != null) {
ackedBatch.close();
Expand Down
Loading

0 comments on commit dbca0b3

Please sign in to comment.