Skip to content

Commit

Permalink
ensure input plugin close is called upon termination or pipeline reload
Browse files Browse the repository at this point in the history
  • Loading branch information
colinsurprenant authored and elasticsearch-bot committed Aug 21, 2020
1 parent b47cdc3 commit e8d1073
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 35 deletions.
46 changes: 30 additions & 16 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -378,32 +378,33 @@ def inputworker(plugin)
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
default_logging_keys(:plugin => plugin.class.config_name, :exception => e.message, :backtrace => e.backtrace))
@logger.debug(
"Input plugin raised exception during shutdown, ignoring it.",
default_logging_keys(
:plugin => plugin.class.config_name,
:exception => e.message,
:backtrace => e.backtrace))
return
end

# otherwise, report error and restart
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
default_logging_keys(
:plugin => plugin.inspect,
:error => e.message,
:exception => e.class,
:stacktrace => e.backtrace.join("\n"))))
@logger.error(I18n.t(
"logstash.pipeline.worker-error-debug",
default_logging_keys(
:plugin => plugin.inspect,
:error => e.message,
:exception => e.class,
:stacktrace => e.backtrace.join("\n"))))

# Assuming the failure that caused this exception is transient,
# let's sleep for a bit and execute #run again
sleep(1)
begin
plugin.do_close
rescue => close_exception
@logger.debug("Input plugin raised exception while closing, ignoring",
default_logging_keys(:plugin => plugin.class.config_name, :exception => close_exception.message,
:backtrace => close_exception.backtrace))
end
close_plugin_and_ignore(plugin)
retry
ensure
close_plugin_and_ignore(plugin)
end
end # def inputworker
end

# initiate the pipeline shutdown sequence
# this method is intended to be called from outside the pipeline thread
Expand Down Expand Up @@ -519,6 +520,19 @@ def inspect

private

def close_plugin_and_ignore(plugin)
begin
plugin.do_close
rescue => e
@logger.warn(
"plugin raised exception while closing, ignoring",
default_logging_keys(
:plugin => plugin.class.config_name,
:exception => e.message,
:backtrace => e.backtrace))
end
end

# @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception
def init_worker_loop
begin
Expand Down
46 changes: 30 additions & 16 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -474,32 +474,33 @@ def inputworker(plugin)
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
default_logging_keys(:plugin => plugin.class.config_name, :exception => e.message, :backtrace => e.backtrace))
@logger.debug(
"Input plugin raised exception during shutdown, ignoring it.",
default_logging_keys(
:plugin => plugin.class.config_name,
:exception => e.message,
:backtrace => e.backtrace))
return
end

# otherwise, report error and restart
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
default_logging_keys(
:plugin => plugin.inspect,
:error => e.message,
:exception => e.class,
:stacktrace => e.backtrace.join("\n"))))
@logger.error(I18n.t(
"logstash.pipeline.worker-error-debug",
default_logging_keys(
:plugin => plugin.inspect,
:error => e.message,
:exception => e.class,
:stacktrace => e.backtrace.join("\n"))))

# Assuming the failure that caused this exception is transient,
# let's sleep for a bit and execute #run again
sleep(1)
begin
plugin.do_close
rescue => close_exception
@logger.debug("Input plugin raised exception while closing, ignoring",
default_logging_keys(:plugin => plugin.class.config_name, :exception => close_exception.message,
:backtrace => close_exception.backtrace))
end
close_plugin_and_ignore(plugin)
retry
ensure
close_plugin_and_ignore(plugin)
end
end # def inputworker
end

# initiate the pipeline shutdown sequence
# this method is intended to be called from outside the pipeline thread
Expand Down Expand Up @@ -654,6 +655,19 @@ def inspect

private

def close_plugin_and_ignore(plugin)
begin
plugin.do_close
rescue => e
@logger.warn(
"plugin raised exception while closing, ignoring",
default_logging_keys(
:plugin => plugin.class.config_name,
:exception => e.message,
:backtrace => e.backtrace))
end
end

def maybe_setup_out_plugins
if @outputs_registered.make_true
register_plugins(@outputs)
Expand Down
6 changes: 4 additions & 2 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,14 @@ def flush(options)
eos
}

context "output close" do
context "input and output close" do
let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) }
let(:output) { pipeline.outputs.first }
let(:input) { pipeline.inputs.first }

it "should call close of output without output-workers" do
it "should call close of input and output without output-workers" do
expect(output).to receive(:do_close).once
expect(input).to receive(:do_close).once
pipeline.start
pipeline.shutdown
end
Expand Down
5 changes: 4 additions & 1 deletion logstash-core/spec/logstash/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -371,18 +371,21 @@ def flush(options)
eos
}

context "output close" do
context "inputs and output close" do
let(:pipeline) { mock_pipeline_from_string(test_config_without_output_workers) }
let(:output) { pipeline.outputs.first }
let(:input) { pipeline.inputs.first }

before do
allow(output).to receive(:do_close)
allow(input).to receive(:do_close)
end

it "should call close of output without output-workers" do
pipeline.start
pipeline.shutdown
expect(output).to have_received(:do_close).once
expect(input).to have_received(:do_close).once
end
end
end
Expand Down

0 comments on commit e8d1073

Please sign in to comment.