Skip to content

Commit

Permalink
rename teardown to close
Browse files Browse the repository at this point in the history
remove edge case when running from rubinius

add do_stop and do_close to avoid using super
  • Loading branch information
jsvd authored and colinsurprenant committed Sep 18, 2015
1 parent 5adf2b0 commit 112b281
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 39 deletions.
14 changes: 7 additions & 7 deletions docs/asciidoc/static/include/pluginbody.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -725,24 +725,24 @@ endif::receive_method[]

// Teardown is now in the base class... can be pruned?
// /////////////////////////////////////////////////////////////////////////////
// If teardown_method is defined (should only be for input or output plugin page)
// If close_method is defined (should only be for input or output plugin page)
// /////////////////////////////////////////////////////////////////////////////
// ifdef::teardown_method[]
// ifdef::close_method[]
// [float]
// ==== `teardown` Method
// ==== `close` Method
// [source,ruby]
// [subs="attributes"]
// ----------------------------------
// public
// def teardown
// def close
// @udp.close if @udp && !@udp.closed?
// end
// ----------------------------------
// The `teardown` method is not present in all input or output plugins. It is
// The `close` method is not present in all input or output plugins. It is
// called when a shutdown happens to ensure that sockets, files, connections and
// threads are all closed down properly. If your plugin uses these connections,
// you should include a teardown method.
// endif::teardown_method[]
// you should include a close method.
// endif::close_method[]

==== Building the Plugin

Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/codecs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def encode(event)
end # def encode

public
def teardown; end;
def close; end;

# @param block [Proc(event, data)] the callback proc passing the original event and the encoded event
public
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/filters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def filter?(event)
end

public
def teardown
def close
# Nothing to do by default.
end
end # class LogStash::Filters::Base
12 changes: 10 additions & 2 deletions lib/logstash/inputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,20 @@ def tag(newtag)
@tags << newtag
end # def tag

# if you override stop, don't forget to call super
# as the first action
public
# override stop if you need to do more than do_stop to
# enforce the input plugin to return from `run`.
# e.g. a tcp plugin might need to close the tcp socket
# so blocking read operation aborts
def stop
# override if necessary
end

public
def do_stop
@logger.debug("stopping", :plugin => self)
@stop_called.make_true
stop
end

# stop? should never be overriden
Expand Down
14 changes: 4 additions & 10 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,6 @@ def run

def wait_inputs
@input_threads.each(&:join)
rescue Interrupt
# rbx does weird things during do SIGINT that I haven't debugged
# so we catch Interrupt here and signal a shutdown. For some reason the
# signal handler isn't invoked it seems? I dunno, haven't looked much into
# it.
shutdown
end

def shutdown_filters
Expand Down Expand Up @@ -192,7 +186,7 @@ def inputworker(plugin)
sleep(1)
retry
ensure
plugin.teardown
plugin.do_close
end
end # def inputworker

Expand Down Expand Up @@ -220,7 +214,7 @@ def filterworker
@logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace)
end

@filters.each(&:teardown)
@filters.each(&:do_close)
end # def filterworker

def outputworker
Expand All @@ -234,7 +228,7 @@ def outputworker
end
ensure
@outputs.each do |output|
output.worker_plugins.each(&:teardown)
output.worker_plugins.each(&:do_close)
end
end # def outputworker

Expand All @@ -251,7 +245,7 @@ def shutdown
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs)

@inputs.each(&:stop)
@inputs.each(&:do_stop)
end # def shutdown

def plugin(plugin_type, name, *args)
Expand Down
15 changes: 11 additions & 4 deletions lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ def initialize(params=nil)
@logger = Cabin::Channel.get(LogStash)
end

# Subclasses should implement this teardown method if you need to perform any
# special tasks during shutdown (like flushing, etc.)
# if you override teardown, don't forget to call super
# close is called during shutdown, after the plugin worker
# main task terminates
public
def teardown
def do_close
@logger.debug("closing", :plugin => self)
close
end

# Subclasses should implement this close method if you need to perform any
# special tasks during shutdown (like flushing, etc.)
public
def close
# ..
end

def to_s
Expand Down
26 changes: 13 additions & 13 deletions spec/core/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def register
def run(queue)
end

def teardown
def close
end
end

Expand All @@ -27,19 +27,19 @@ def encode(event)
event
end

def teardown
def close
end
end

class DummyOutput < LogStash::Outputs::Base
config_name "dummyoutput"
milestone 2

attr_reader :num_teardowns
attr_reader :num_closes

def initialize(params={})
super
@num_teardowns = 0
@num_closes = 0
end

def register
Expand All @@ -48,8 +48,8 @@ def register
def receive(event)
end

def teardown
@num_teardowns += 1
def close
@num_closes += 1
end
end

Expand All @@ -59,7 +59,7 @@ class TestPipeline < LogStash::Pipeline

describe LogStash::Pipeline do

context "teardown" do
context "close" do

before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
Expand Down Expand Up @@ -93,24 +93,24 @@ class TestPipeline < LogStash::Pipeline
eos
}

context "output teardown" do
it "should call teardown of output without output-workers" do
context "output close" do
it "should call close of output without output-workers" do
pipeline = TestPipeline.new(test_config_without_output_workers)
pipeline.run

expect(pipeline.outputs.size ).to eq(1)
expect(pipeline.outputs.first.worker_plugins.size ).to eq(1)
expect(pipeline.outputs.first.worker_plugins.first.num_teardowns ).to eq(1)
expect(pipeline.outputs.first.worker_plugins.first.num_closes ).to eq(1)
end

it "should call output teardown correctly with output workers" do
it "should call output close correctly with output workers" do
pipeline = TestPipeline.new(test_config_with_output_workers)
pipeline.run

expect(pipeline.outputs.size ).to eq(1)
expect(pipeline.outputs.first.num_teardowns).to eq(0)
expect(pipeline.outputs.first.num_closes).to eq(0)
pipeline.outputs.first.worker_plugins.each do |plugin|
expect(plugin.num_teardowns ).to eq(1)
expect(plugin.num_closes ).to eq(1)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/filters/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def filter(event)
end

it "should provide class public API" do
[:register, :filter, :multi_filter, :execute, :threadsafe?, :filter_matched, :filter?, :teardown].each do |method|
[:register, :filter, :multi_filter, :execute, :threadsafe?, :filter_matched, :filter?, :close].each do |method|
expect(subject).to respond_to(method)
end
end
Expand Down

0 comments on commit 112b281

Please sign in to comment.