Skip to content

Commit

Permalink
monitor worker threads exceptions to not crash logstash, just the fai…
Browse files Browse the repository at this point in the history
…led pipeline

The worker threads were not correctly monitored for a worker loop exception resulting in a complete logstash crash upon any exception even when multiple pipelines are running. Now only the failed pipeline is terminated. If pipeline reloading is enabled, it is possible to edit the config and have that failed pipeline reloaded.
  • Loading branch information
colinsurprenant authored and elasticsearch-bot committed Jul 13, 2020
1 parent d706e50 commit 62519ac
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 29 deletions.
4 changes: 3 additions & 1 deletion logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ def converge_state_and_update

converge_result
rescue => e
logger.error("An exception happened when converging configuration", :exception => e.class, :message => e.message, :backtrace => e.backtrace)
attributes = {:exception => e.class, :message => e.message}
attributes.merge!({:backtrace => e.backtrace}) if logger.debug?
logger.error("An exception happened when converging configuration", attributes)
end

# Calculate the Logstash uptime in milliseconds
Expand Down
73 changes: 51 additions & 22 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

require "thread"
require "concurrent"
require "thwait"
require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
Expand Down Expand Up @@ -119,19 +120,31 @@ def start
@finished_run.make_false

@thread = Thread.new do
error_log_params = ->(e) {
default_logging_keys(
:exception => e,
:backtrace => e.backtrace,
"pipeline.sources" => pipeline_source_details
)
}

begin
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
ThreadContext.put("pipeline.id", pipeline_id)
run
@finished_run.make_true
rescue => e
close
pipeline_log_params = default_logging_keys(
:exception => e,
:backtrace => e.backtrace,
"pipeline.sources" => pipeline_source_details)
logger.error("Pipeline aborted due to error", pipeline_log_params)
# no need to log at ERROR level since this log will be redundant to the log in
# the worker loop thread global rescue clause
logger.debug("Pipeline terminated by worker error", error_log_params.call(e))
ensure
# we must trap any exception here to make sure the following @finished_execution
# is always set to true regardless of any exception before in the close method call
begin
close
rescue => e
logger.error("Pipeline close error, ignoring", error_log_params.call(e))
end
@finished_execution.make_true
end
end
Expand Down Expand Up @@ -176,21 +189,18 @@ def run

transition_to_running
start_flusher # Launches a non-blocking thread for flush events
wait_inputs
transition_to_stopped

@logger.debug("Input plugins stopped! Will shutdown filter/output workers.", default_logging_keys)

shutdown_flusher
shutdown_workers
begin
monitor_inputs_and_workers
ensure
transition_to_stopped

close
shutdown_flusher
shutdown_workers

close
end
@logger.debug("Pipeline has been shutdown", default_logging_keys)

# exit code
return 0
end # def run
end

def transition_to_running
@running.make_true
Expand Down Expand Up @@ -279,7 +289,16 @@ def start_workers
thread = Thread.new do
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
worker_loop.run
begin
worker_loop.run
rescue => e
# WorkerLoop.run() catches all Java Exception class and re-throws as IllegalStateException with the
# original exception as the cause
@logger.error(
"Pipeline worker error, the pipeline will be stopped",
default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace)
)
end
end
@worker_threads << thread
end
Expand Down Expand Up @@ -309,10 +328,20 @@ def resolve_cluster_uuids
end.to_a.compact
end

def wait_inputs
@input_threads.each do |thread|
thread.join # Thread or java.lang.Thread (both have #join)
def monitor_inputs_and_workers
twait = ThreadsWait.new(*(@input_threads + @worker_threads))

while !@input_threads.empty?
terminated_thread = twait.next_wait
if @input_threads.delete(terminated_thread).nil?
# this is a worker thread termination
# delete it from @worker_threads so that wait_for_workers does not wait for it
@worker_threads.delete(terminated_thread)
raise("Worker thread terminated in pipeline.id: #{pipeline_id}")
end
end

@logger.debug("Input plugins stopped! Will shutdown filter/output workers.", default_logging_keys)
end

def start_inputs
Expand Down
6 changes: 4 additions & 2 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,10 @@ def filter_batch(events)
#
# Users need to check their configuration or see if there is a bug in the
# plugin.
@logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
default_logging_keys("exception" => e.message, "backtrace" => e.backtrace))
@logger.error(
"Pipeline worker error, the pipeline will be stopped",
default_logging_keys("exception" => e.message, "backtrace" => e.backtrace)
)

raise e
end
Expand Down
42 changes: 42 additions & 0 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ def threadsafe?() false; end
def close() end
end

class DummyCrashingFilter < LogStash::Filters::Base
config_name "dummycrashingfilter"
milestone 2

def register; end

def filter(event)
raise("crashing filter")
end
end

class DummySafeFilter < LogStash::Filters::Base
config_name "dummysafefilter"
milestone 2
Expand Down Expand Up @@ -226,6 +237,37 @@ def flush(options)
end
end

context "a crashing worker" do
subject { mock_java_pipeline_from_string(config, pipeline_settings_obj) }

let(:pipeline_settings) { { "pipeline.batch.size" => 1, "pipeline.workers" => 1 } }
let(:config) do
<<-EOS
input { generator {} }
filter { dummycrashingfilter {} }
output { dummyoutput {} }
EOS
end
let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new }

before :each do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummycrashingfilter").and_return(DummyCrashingFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
end

after :each do
subject.shutdown
end

it "does not raise in the main thread, terminates the run thread and finishes execution" do
expect { subject.start && subject.thread.join }.to_not raise_error
expect(subject.finished_execution?).to be_truthy
end
end

describe "defaulting the pipeline workers based on thread safety" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ public void run() {
execution.compute(batch, true, true);
readClient.closeBatch(batch);
} catch (final Exception ex) {
LOGGER.error(
"Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
ex
);
throw new IllegalStateException(ex);
}
}
Expand Down

0 comments on commit 62519ac

Please sign in to comment.