Skip to content

Commit

Permalink
simplify batch classes, do not compute JE empty batches, refactor RE …
Browse files Browse the repository at this point in the history
…worker loop (elastic#11737)

cleanup RubyArray "rawtypes"
remove all LinkedHashSet from batch and queue classes
avoid processing empty batches in Java worker loop
cleanup AckedReadBatch and MemoryReadBatch
refactor Ruby worker loop similar to Java Execution to not use batch merge
remove QueueBatch merge and replace LinkedHashSet with ArrayList
  • Loading branch information
colinsurprenant authored Apr 2, 2020
1 parent 5de9b23 commit 5a25c6f
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 184 deletions.
52 changes: 29 additions & 23 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,17 @@ def worker_loop(batch_size, batch_delay)

batch = filter_queue_client.read_batch.to_java # metrics are started in read_batch
batch_size = batch.filteredSize
events = batch.to_a
if batch_size > 0
@events_consumed.add(batch_size)
filter_batch(batch)
events = filter_batch(events)
end
flush_filters_to_batch(batch, :final => false) if signal.flush?
if batch.filteredSize > 0
output_batch(batch, output_events_map)

if signal.flush?
events = flush_filters_to_batch(events, :final => false)
end
if events.size > 0
output_batch(events, output_events_map)
filter_queue_client.close_batch(batch)
end
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
Expand All @@ -380,18 +384,17 @@ def worker_loop(batch_size, batch_delay)
# for this we need to create a new empty batch to contain the final flushed events
batch = filter_queue_client.to_java.newBatch
filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
flush_filters_to_batch(batch, :final => true)
output_batch(batch, output_events_map)
events = batch.to_a
events = flush_filters_to_batch(events, :final => true)
output_batch(events, output_events_map)
filter_queue_client.close_batch(batch)
end

def filter_batch(batch)
filter_func(batch.to_a).each do |e|
#these are both original and generated events
batch.merge(e) unless e.cancelled?
end
filter_queue_client.add_filtered_metrics(batch.filtered_size)
@events_filtered.add(batch.filteredSize)
def filter_batch(events)
result = filter_func(events)
filter_queue_client.add_filtered_metrics(result.size)
@events_filtered.add(result.size)
result
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code
# but if an exception is raised up to the worker thread they are considered
Expand All @@ -406,13 +409,15 @@ def filter_batch(batch)
end

# Take an array of events and send them to the correct output
def output_batch(batch, output_events_map)
def output_batch(events, output_events_map)
# Build a mapping of { output_plugin => [events...]}
batch.to_a.each do |event|
# We ask the AST to tell us which outputs to send each event to
# Then, we stick it in the correct bin
output_func(event).each do |output|
output_events_map[output].push(event)
events.each do |event|
unless event.cancelled?
# We ask the AST to tell us which outputs to send each event to
# Then, we stick it in the correct bin
output_func(event).each do |output|
output_events_map[output].push(event)
end
end
end
# Now that we have our output to event mapping we can just invoke each output
Expand All @@ -422,7 +427,7 @@ def output_batch(batch, output_events_map)
events.clear
end

filter_queue_client.add_output_metrics(batch.filtered_size)
filter_queue_client.add_output_metrics(events.size)
end

def resolve_cluster_uuids
Expand Down Expand Up @@ -600,15 +605,16 @@ def uptime
#
# @param batch [ReadClient::ReadBatch]
# @param options [Hash]
def flush_filters_to_batch(batch, options = {})
def flush_filters_to_batch(events, options = {})
result = events
flush_filters(options) do |event|
unless event.cancelled?
@logger.debug? and @logger.debug("Pushing flushed events", default_logging_keys(:event => event))
batch.merge(event)
result << event
end
end

@flushing.set(false)
result
end # flush_filters_to_batch

def plugin_threads_info
Expand Down
60 changes: 44 additions & 16 deletions logstash-core/spec/logstash/pipeline_pq_file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ def close
EOS
end

let(:pipeline_settings) { { "queue.type" => queue_type, "pipeline.workers" => worker_thread_count, "pipeline.id" => pipeline_id} }
let(:pipeline_settings) {{
"queue.type" => queue_type,
"pipeline.workers" => worker_thread_count,
"pipeline.id" => pipeline_id
}}

let(:pipeline_config) { mock_pipeline_config(pipeline_id, config, pipeline_settings_obj) }
subject { described_class.new(pipeline_config, metric) }

let(:counting_output) { PipelinePqFileOutput.new({ "id" => output_id }) }
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
Expand All @@ -95,7 +98,6 @@ def close
let(:number_of_events) { 100_000 }
let(:page_capacity) { 1 * 1024 * 512 } # 1 128
let(:max_bytes) { 1024 * 1024 * 1024 } # 1 gb
let(:queue_type) { "persisted" } # "memory" "memory_acked"
let(:times) { [] }

let(:pipeline_thread) do
Expand All @@ -105,6 +107,8 @@ def close
Thread.new { s.run }
end

let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") }

before :each do
FileUtils.mkdir_p(this_queue_folder)

Expand Down Expand Up @@ -139,19 +143,43 @@ def close
# Dir.rm_rf(this_queue_folder)
end

let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") }
shared_examples "a well behaved pipeline" do
it "populates the core metrics" do
_metric = collected_metric[:stats][:pipelines][:main][:events]
expect(_metric[:duration_in_millis].value).not_to be_nil
expect(_metric[:in].value).to eq(number_of_events)
expect(_metric[:filtered].value).to eq(number_of_events)
expect(_metric[:out].value).to eq(number_of_events)
STDOUT.puts " pipeline: #{subject.class}"
STDOUT.puts " queue.type: #{pipeline_settings_obj.get("queue.type")}"
STDOUT.puts " queue.page_capacity: #{pipeline_settings_obj.get("queue.page_capacity") / 1024}KB"
STDOUT.puts " queue.max_bytes: #{pipeline_settings_obj.get("queue.max_bytes") / 1024}KB"
STDOUT.puts " workers: #{worker_thread_count}"
STDOUT.puts " events: #{number_of_events}"
STDOUT.puts " took: #{times.first}s"
end
end

it "populates the pipelines core metrics" do
_metric = collected_metric[:stats][:pipelines][:main][:events]
expect(_metric[:duration_in_millis].value).not_to be_nil
expect(_metric[:in].value).to eq(number_of_events)
expect(_metric[:filtered].value).to eq(number_of_events)
expect(_metric[:out].value).to eq(number_of_events)
STDOUT.puts " queue.type: #{pipeline_settings_obj.get("queue.type")}"
STDOUT.puts " queue.page_capacity: #{pipeline_settings_obj.get("queue.page_capacity") / 1024}KB"
STDOUT.puts " queue.max_bytes: #{pipeline_settings_obj.get("queue.max_bytes") / 1024}KB"
STDOUT.puts " workers: #{worker_thread_count}"
STDOUT.puts " events: #{number_of_events}"
STDOUT.puts " took: #{times.first}s"
context "using PQ" do
let(:queue_type) { "persisted" } # "memory", "persisted"
context "with Ruby execution" do
subject { LogStash::Pipeline.new(pipeline_config, metric) }
it_behaves_like "a well behaved pipeline"
end
context "with Java execution" do
subject { LogStash::JavaPipeline.new(pipeline_config, metric) }
it_behaves_like "a well behaved pipeline"
end
end
context "using MQ" do
let(:queue_type) { "memory" } # "memory", "persisted"
context "with Ruby execution" do
subject { LogStash::Pipeline.new(pipeline_config, metric) }
it_behaves_like "a well behaved pipeline"
end
context "with Java execution" do
subject { LogStash::JavaPipeline.new(pipeline_config, metric) }
it_behaves_like "a well behaved pipeline"
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,16 @@
message = data.get("message")
expect(messages).to include(message)
messages.delete(message)
# read_batch.cancel("value-#{i}") if i > 2 # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
if message.match /value-[3-4]/
data.cancel
read_batch.merge(LogStash::Event.new({ "message" => message.gsub(/value/, 'generated') }))
end
end
# expect(read_batch.cancelled_size).to eq(2) # disabled for https://github.com/elastic/logstash/issues/6055
received = []
read_batch.to_a.each do |data|
received << data.get("message")
end
expect(received.size).to eq(3)
(0..2).each {|i| expect(received).to include("value-#{i}")}
(3..4).each {|i| expect(received).to include("generated-#{i}")}
end

it "handles Java proxied read-batch object" do
Expand Down
22 changes: 10 additions & 12 deletions logstash-core/src/main/java/org/logstash/ackedqueue/AckedBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
package org.logstash.ackedqueue;

import java.io.IOException;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyHash;
import java.util.ArrayList;
import java.util.Collection;
import org.logstash.Event;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;

import static org.logstash.RubyUtil.RUBY;

public final class AckedBatch {
private Batch batch;
Expand All @@ -36,14 +37,11 @@ public static AckedBatch create(Batch batch) {
return ackedBatch;
}

public RubyHash toRubyHash(final Ruby runtime) {
final RubyBoolean trueValue = runtime.getTrue();
final RubyHash result = RubyHash.newHash(runtime);
this.batch.getElements().forEach(e -> result.fastASet(
JrubyEventExtLibrary.RubyEvent.newRubyEvent(runtime, (Event) e),
trueValue
)
);
public Collection<RubyEvent> events() {
final ArrayList<RubyEvent> result = new ArrayList<>(this.batch.size());
for (final Queueable e : batch.getElements()) {
result.add(RubyEvent.newRubyEvent(RUBY, (Event) e));
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
package org.logstash.ackedqueue;

import org.jruby.RubyArray;
import org.jruby.RubyHash;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.execution.MemoryReadBatch;
import org.logstash.execution.QueueBatch;
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

import static org.logstash.RubyUtil.RUBY;
Expand All @@ -36,74 +35,55 @@ public final class AckedReadBatch implements QueueBatch {

private AckedBatch ackedBatch;

private RubyHash originals;

private RubyHash generated;
private Collection<RubyEvent> events;

public static AckedReadBatch create(
final JRubyAckedQueueExt queue,
final int size,
final long timeout)
{
return new AckedReadBatch(queue, size, timeout);
}

private AckedReadBatch(
final JRubyAckedQueueExt queue,
final int size,
final long timeout)
{
AckedBatch batch;
try {
batch = queue.readBatch(size, timeout);
final AckedBatch batch = queue.readBatch(size, timeout);
return (batch == null) ? new AckedReadBatch() : new AckedReadBatch(batch);
} catch (IOException e) {
throw new IllegalStateException(e);
}
if (batch == null) {
originals = RubyHash.newHash(RUBY);
ackedBatch = null;
} else {
ackedBatch = batch;
originals = ackedBatch.toRubyHash(RUBY);
}
generated = RubyHash.newHash(RUBY);
}

@Override
public void merge(final RubyEvent event) {
if (!event.isNil() && !originals.containsKey(event)) {
generated.put(event, RUBY.getTrue());
}
public static AckedReadBatch create() {
return new AckedReadBatch();
}

private AckedReadBatch() {
ackedBatch = null;
events = new ArrayList<>();
}

private AckedReadBatch(AckedBatch batch) {
ackedBatch = batch;
events = batch.events();
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public RubyArray to_a() {
final RubyArray result = RUBY.newArray(filteredSize());
for (final RubyEvent event : (Collection<RubyEvent>) originals.keys()) {
if (!MemoryReadBatch.isCancelled(event)) {
result.append(event);
}
}
for (final RubyEvent event : (Collection<RubyEvent>) generated.keys()) {
if (!MemoryReadBatch.isCancelled(event)) {
result.append(event);
public RubyArray<RubyEvent> to_a() {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> result = RUBY.newArray(events.size());
for (final RubyEvent e : events) {
if (!MemoryReadBatch.isCancelled(e)) {
result.append(e);
}
}
return result;
}

@SuppressWarnings({"unchecked"})
@Override
public Collection<RubyEvent> collection() {
// This only returns the originals and does not filter cancelled one
// because it is only used in the WorkerLoop where only originals
// non-cancelled exists. We should revisit this AckedReadBatch
// implementation and get rid of this dual original/generated idea.
// The MemoryReadBatch does not use such a strategy.
return originals.directKeySet();
public Collection<RubyEvent> events() {
// This does not filter cancelled events because it is
// only used in the WorkerLoop where there are no cancelled
// events yet.
return events;
}

@Override
public void close() throws IOException {
if (ackedBatch != null) {
ackedBatch.close();
Expand All @@ -112,6 +92,6 @@ public void close() throws IOException {

@Override
public int filteredSize() {
return originals.size() + generated.size();
return events.size();
}
}
Loading

0 comments on commit 5a25c6f

Please sign in to comment.