Skip to content

Commit

Permalink
stop inputs upon a worker error before terminating the pipeline (#12339)
Browse files Browse the repository at this point in the history
This addresses an incomplete fix in #12019 starting in 7.8.1 where upon catching a worker exception (to avoid crashing the whole logstash per #12306) the input plugin(s) are not terminated prior to closing the pipeline leading to the input plugin(s) continuing execution and failing with IllegalStateException & Tried to write to a closed queue since closing the pipeline also correctly closes the queue.
  • Loading branch information
colinsurprenant authored Oct 13, 2020
1 parent 7ab7f85 commit d03d3ce
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 21 deletions.
58 changes: 49 additions & 9 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ module LogStash; class JavaPipeline < JavaBasePipeline

attr_reader \
:worker_threads,
:input_threads,
:events_consumed,
:events_filtered,
:started_at,
:thread

MAX_INFLIGHT_WARN_THRESHOLD = 10_000
SECOND = 1
MEMORY = "memory".freeze

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
@logger = self.logger
Expand All @@ -46,7 +49,7 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)

@worker_threads = []

@drain_queue = settings.get_value("queue.drain") || settings.get("queue.type") == "memory"
@drain_queue = settings.get_value("queue.drain") || settings.get("queue.type") == MEMORY

@events_filtered = java.util.concurrent.atomic.LongAdder.new
@events_consumed = java.util.concurrent.atomic.LongAdder.new
Expand Down Expand Up @@ -134,9 +137,7 @@ def start
run
@finished_run.make_true
rescue => e
# no need to log at ERROR level since this log will be redundant to the log in
# the worker loop thread global rescue clause
logger.debug("Pipeline terminated by worker error", error_log_params.call(e))
logger.error("Pipeline error", error_log_params.call(e))
ensure
# we must trap any exception here to make sure the following @finished_execution
# is always set to true regardless of any exception before in the close method call
Expand All @@ -146,6 +147,7 @@ def start
logger.error("Pipeline close error, ignoring", error_log_params.call(e))
end
@finished_execution.make_true
@logger.info("Pipeline terminated", "pipeline.id" => pipeline_id)
end
end

Expand Down Expand Up @@ -331,13 +333,38 @@ def resolve_cluster_uuids
def monitor_inputs_and_workers
twait = ThreadsWait.new(*(@input_threads + @worker_threads))

while !@input_threads.empty?
loop do
break if @input_threads.empty?

terminated_thread = twait.next_wait

if @input_threads.delete(terminated_thread).nil?
# this is a worker thread termination
# delete it from @worker_threads so that wait_for_workers does not wait for it
# this is an abnormal worker thread termination, we need to terminate the pipeline

@worker_threads.delete(terminated_thread)
raise("Worker thread terminated in pipeline.id: #{pipeline_id}")

# before terminating the pipeline we need to close the inputs
stop_inputs

# wait 10 seconds for all input threads to terminate
wait_input_threads_termination(10 * SECOND) do
@logger.warn("Waiting for input plugin to close", default_logging_keys)
sleep(1)
end

if inputs_running? && settings.get("queue.type") == MEMORY
# if there are still input threads alive they are probably blocked pushing on the memory queue
# because no worker is present to consume from the ArrayBlockingQueue
# if this is taking too long we have a problem
wait_input_threads_termination(10 * SECOND) do
dropped_batch = filter_queue_client.read_batch
@logger.error("Dropping events to unblock input plugin", default_logging_keys(:count => dropped_batch.filteredSize)) if dropped_batch.filteredSize > 0
end
end

raise("Unable to stop input plugin(s)") if inputs_running?

break
end
end

Expand Down Expand Up @@ -420,7 +447,6 @@ def shutdown
stop_inputs
wait_for_shutdown
clear_pipeline_metrics
@logger.info("Pipeline terminated", "pipeline.id" => pipeline_id)
end # def shutdown

def wait_for_shutdown
Expand Down Expand Up @@ -581,4 +607,18 @@ def preserve_event_order?(pipeline_workers)

false
end

def wait_input_threads_termination(timeout_seconds, &block)
start = Time.now
seconds = 0
while inputs_running? && (seconds < timeout_seconds)
block.call
seconds = Time.now - start
end
end

def inputs_running?
@input_threads.any?(&:alive?)
end

end; end
86 changes: 74 additions & 12 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
require 'support/pipeline/pipeline_helpers'
require "stud/try"
require 'timeout'
require "thread"

class DummyInput < LogStash::Inputs::Base
config_name "dummyinput"
Expand All @@ -38,18 +39,31 @@ def close
end
end

class DummyInputGenerator < LogStash::Inputs::Base
config_name "dummyinputgenerator"
class DummyManualInputGenerator < LogStash::Inputs::Base
config_name "dummymanualinputgenerator"
milestone 2

attr_accessor :keep_running

def initialize(*args)
super(*args)
@keep_running = Concurrent::AtomicBoolean.new(false)
@queue = nil
end

def register
end

def run(queue)
queue << Logstash::Event.new while !stop?
@queue = queue
while !stop? || @keep_running.true?
queue << LogStash::Event.new
sleep(0.5)
end
end

def close
def push_once
@queue << LogStash::Event.new
end
end

Expand Down Expand Up @@ -237,36 +251,84 @@ def flush(options)
end
end

context "a crashing worker" do
context "a crashing worker terminates the pipeline and all inputs and workers" do
subject { mock_java_pipeline_from_string(config, pipeline_settings_obj) }

let(:pipeline_settings) { { "pipeline.batch.size" => 1, "pipeline.workers" => 1 } }
let(:config) do
<<-EOS
input { generator {} }
input { dummymanualinputgenerator {} }
filter { dummycrashingfilter {} }
output { dummyoutput {} }
EOS
end
let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new }
let(:dummyinput) { DummyManualInputGenerator.new }

before :each do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(DummyManualInputGenerator).to receive(:new).with(any_args).and_return(dummyinput)

allow(LogStash::Plugin).to receive(:lookup).with("input", "dummymanualinputgenerator").and_return(DummyManualInputGenerator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummycrashingfilter").and_return(DummyCrashingFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end

after :each do
subject.shutdown
context "a crashing worker using memory queue" do
let(:pipeline_settings) { { "pipeline.batch.size" => 1, "pipeline.workers" => 1, "queue.type" => "memory"} }

it "does not raise in the main thread, terminates the run thread and finishes execution" do
# first make sure we keep the input plugin in the run method for now
dummyinput.keep_running.make_true

expect { subject.start }.to_not raise_error

# wait until there is no more worker thread since we have a single worker that should have died
wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey

# at this point the input plugin should have been asked to stop
wait(5).for {dummyinput.stop?}.to be_truthy

# allow the input plugin to exit the run method now
dummyinput.keep_running.make_false

# the pipeline thread should terminate normally
expect { subject.thread.join }.to_not raise_error
expect(subject.finished_execution?).to be_truthy

# when the pipeline has exited, no input threads should be alive
wait(5).for {subject.input_threads.any?(&:alive?)}.to be_falsey
end
end

context "a crashing worker using persisted queue" do
let(:pipeline_settings) { { "pipeline.batch.size" => 1, "pipeline.workers" => 1, "queue.type" => "persisted"} }

it "does not raise in the main thread, terminates the run thread and finishes execution" do
expect { subject.start && subject.thread.join }.to_not raise_error
# first make sure we keep the input plugin in the run method for now
dummyinput.keep_running.make_true

expect { subject.start }.to_not raise_error

# wait until there is no more worker thread since we have a single worker that should have died
wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey

# at this point the input plugin should have been asked to stop
wait(5).for {dummyinput.stop?}.to be_truthy

# allow the input plugin to exit the run method now
dummyinput.keep_running.make_false

# the pipeline thread should terminate normally
expect { subject.thread.join }.to_not raise_error
expect(subject.finished_execution?).to be_truthy

# when the pipeline has exited, no input threads should be alive
wait(5).for {subject.input_threads.any?(&:alive?)}.to be_falsey

expect{dummyinput.push_once}.to raise_error(/Tried to write to a closed queue/)
end
end
end

describe "defaulting the pipeline workers based on thread safety" do
before(:each) do
Expand Down

0 comments on commit d03d3ce

Please sign in to comment.