From 112b28149e7ddc96cd29a89eecaf52598e096ee2 Mon Sep 17 00:00:00 2001 From: Joao Duarte Date: Thu, 10 Sep 2015 16:35:35 +0100 Subject: [PATCH] rename teardown to close remove edge case when running from rubinius add do_stop and do_close to avoid using super --- .../static/include/pluginbody.asciidoc | 14 +++++----- lib/logstash/codecs/base.rb | 2 +- lib/logstash/filters/base.rb | 2 +- lib/logstash/inputs/base.rb | 12 +++++++-- lib/logstash/pipeline.rb | 14 +++------- lib/logstash/plugin.rb | 15 ++++++++--- spec/core/pipeline_spec.rb | 26 +++++++++---------- spec/filters/base_spec.rb | 2 +- 8 files changed, 48 insertions(+), 39 deletions(-) diff --git a/docs/asciidoc/static/include/pluginbody.asciidoc b/docs/asciidoc/static/include/pluginbody.asciidoc index 77cf2cb558d..89def49dd3f 100644 --- a/docs/asciidoc/static/include/pluginbody.asciidoc +++ b/docs/asciidoc/static/include/pluginbody.asciidoc @@ -725,24 +725,24 @@ endif::receive_method[] // Teardown is now in the base class... can be pruned? // ///////////////////////////////////////////////////////////////////////////// -// If teardown_method is defined (should only be for input or output plugin page) +// If close_method is defined (should only be for input or output plugin page) // ///////////////////////////////////////////////////////////////////////////// -// ifdef::teardown_method[] +// ifdef::close_method[] // [float] -// ==== `teardown` Method +// ==== `close` Method // [source,ruby] // [subs="attributes"] // ---------------------------------- // public -// def teardown +// def close // @udp.close if @udp && !@udp.closed? // end // ---------------------------------- -// The `teardown` method is not present in all input or output plugins. It is +// The `close` method is not present in all input or output plugins. It is // called when a shutdown happens to ensure that sockets, files, connections and // threads are all closed down properly. If your plugin uses these connections, -// you should include a teardown method. -// endif::teardown_method[] +// you should include a close method. +// endif::close_method[] ==== Building the Plugin diff --git a/lib/logstash/codecs/base.rb b/lib/logstash/codecs/base.rb index 662f054dfde..25fad9da702 100644 --- a/lib/logstash/codecs/base.rb +++ b/lib/logstash/codecs/base.rb @@ -28,7 +28,7 @@ def encode(event) end # def encode public - def teardown; end; + def close; end; # @param block [Proc(event, data)] the callback proc passing the original event and the encoded event public diff --git a/lib/logstash/filters/base.rb b/lib/logstash/filters/base.rb index 61bf7887554..9b08f0b974f 100644 --- a/lib/logstash/filters/base.rb +++ b/lib/logstash/filters/base.rb @@ -235,7 +235,7 @@ def filter?(event) end public - def teardown + def close # Nothing to do by default. end end # class LogStash::Filters::Base diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb index bde8d3bff4d..b72490bae45 100644 --- a/lib/logstash/inputs/base.rb +++ b/lib/logstash/inputs/base.rb @@ -101,12 +101,20 @@ def tag(newtag) @tags << newtag end # def tag - # if you override stop, don't forget to call super - # as the first action public + # override stop if you need to do more than do_stop to + # enforce the input plugin to return from `run`. + # e.g. a tcp plugin might need to close the tcp socket + # so blocking read operation aborts def stop + # override if necessary + end + + public + def do_stop @logger.debug("stopping", :plugin => self) @stop_called.make_true + stop end # stop? should never be overriden diff --git a/lib/logstash/pipeline.rb b/lib/logstash/pipeline.rb index f63f87f8e04..44eeb2fcf46 100644 --- a/lib/logstash/pipeline.rb +++ b/lib/logstash/pipeline.rb @@ -99,12 +99,6 @@ def run def wait_inputs @input_threads.each(&:join) - rescue Interrupt - # rbx does weird things during do SIGINT that I haven't debugged - # so we catch Interrupt here and signal a shutdown. For some reason the - # signal handler isn't invoked it seems? I dunno, haven't looked much into - # it. - shutdown end def shutdown_filters @@ -192,7 +186,7 @@ def inputworker(plugin) sleep(1) retry ensure - plugin.teardown + plugin.do_close end end # def inputworker @@ -220,7 +214,7 @@ def filterworker @logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace) end - @filters.each(&:teardown) + @filters.each(&:do_close) end # def filterworker def outputworker @@ -234,7 +228,7 @@ def outputworker end ensure @outputs.each do |output| - output.worker_plugins.each(&:teardown) + output.worker_plugins.each(&:do_close) end end # def outputworker @@ -251,7 +245,7 @@ def shutdown InflightEventsReporter.logger = @logger InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs) - @inputs.each(&:stop) + @inputs.each(&:do_stop) end # def shutdown def plugin(plugin_type, name, *args) diff --git a/lib/logstash/plugin.rb b/lib/logstash/plugin.rb index 1b194f21082..e4ed6171ecc 100644 --- a/lib/logstash/plugin.rb +++ b/lib/logstash/plugin.rb @@ -28,12 +28,19 @@ def initialize(params=nil) @logger = Cabin::Channel.get(LogStash) end - # Subclasses should implement this teardown method if you need to perform any - # special tasks during shutdown (like flushing, etc.) - # if you override teardown, don't forget to call super + # close is called during shutdown, after the plugin worker + # main task terminates public - def teardown + def do_close @logger.debug("closing", :plugin => self) + close + end + + # Subclasses should implement this close method if you need to perform any + # special tasks during shutdown (like flushing, etc.) + public + def close + # .. end def to_s diff --git a/spec/core/pipeline_spec.rb b/spec/core/pipeline_spec.rb index 89ad07ef4bd..d0021d4a396 100644 --- a/spec/core/pipeline_spec.rb +++ b/spec/core/pipeline_spec.rb @@ -11,7 +11,7 @@ def register def run(queue) end - def teardown + def close end end @@ -27,7 +27,7 @@ def encode(event) event end - def teardown + def close end end @@ -35,11 +35,11 @@ class DummyOutput < LogStash::Outputs::Base config_name "dummyoutput" milestone 2 - attr_reader :num_teardowns + attr_reader :num_closes def initialize(params={}) super - @num_teardowns = 0 + @num_closes = 0 end def register @@ -48,8 +48,8 @@ def register def receive(event) end - def teardown - @num_teardowns += 1 + def close + @num_closes += 1 end end @@ -59,7 +59,7 @@ class TestPipeline < LogStash::Pipeline describe LogStash::Pipeline do -context "teardown" do +context "close" do before(:each) do allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput) @@ -93,24 +93,24 @@ class TestPipeline < LogStash::Pipeline eos } - context "output teardown" do - it "should call teardown of output without output-workers" do + context "output close" do + it "should call close of output without output-workers" do pipeline = TestPipeline.new(test_config_without_output_workers) pipeline.run expect(pipeline.outputs.size ).to eq(1) expect(pipeline.outputs.first.worker_plugins.size ).to eq(1) - expect(pipeline.outputs.first.worker_plugins.first.num_teardowns ).to eq(1) + expect(pipeline.outputs.first.worker_plugins.first.num_closes ).to eq(1) end - it "should call output teardown correctly with output workers" do + it "should call output close correctly with output workers" do pipeline = TestPipeline.new(test_config_with_output_workers) pipeline.run expect(pipeline.outputs.size ).to eq(1) - expect(pipeline.outputs.first.num_teardowns).to eq(0) + expect(pipeline.outputs.first.num_closes).to eq(0) pipeline.outputs.first.worker_plugins.each do |plugin| - expect(plugin.num_teardowns ).to eq(1) + expect(plugin.num_closes ).to eq(1) end end end diff --git a/spec/filters/base_spec.rb b/spec/filters/base_spec.rb index 321b72965d1..0dfa31480b8 100644 --- a/spec/filters/base_spec.rb +++ b/spec/filters/base_spec.rb @@ -24,7 +24,7 @@ def filter(event) end it "should provide class public API" do - [:register, :filter, :multi_filter, :execute, :threadsafe?, :filter_matched, :filter?, :teardown].each do |method| + [:register, :filter, :multi_filter, :execute, :threadsafe?, :filter_matched, :filter?, :close].each do |method| expect(subject).to respond_to(method) end end