Skip to content

Commit 9efbcc0

Browse files
committed
Make this plugin a 'threadsafe' output (requires LS >= 2.2.0)
Fixes logstash-plugins#377 Before acceptance this will need elastic/elasticsearch-ruby#281 Merged in and used for the ruby client
1 parent 54a3a99 commit 9efbcc0

16 files changed

+58
-366
lines changed

lib/logstash/outputs/elasticsearch.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
require "logstash/outputs/base"
55
require "logstash/json"
66
require "concurrent"
7-
require "stud/buffer"
87
require "socket" # for Socket.gethostname
98
require "thread" # for safe queueing
109
require "uri" # for escaping user input
@@ -51,6 +50,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
5150
require "logstash/outputs/elasticsearch/common_configs"
5251
require "logstash/outputs/elasticsearch/common"
5352

53+
# This is needed to properly pool connections
54+
declare_threadsafe!
55+
5456
# Protocol agnostic (i.e. non-http, non-java specific) configs go here
5557
include(LogStash::Outputs::ElasticSearch::CommonConfigs)
5658

@@ -135,7 +137,6 @@ def build_client
135137
def close
136138
@stopping.make_true
137139
@client.stop_sniffing!
138-
@buffer.stop
139140
end
140141

141142
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }

lib/logstash/outputs/elasticsearch/buffer.rb

Lines changed: 0 additions & 124 deletions
This file was deleted.

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
require "logstash/outputs/elasticsearch/template_manager"
2-
require "logstash/outputs/elasticsearch/buffer"
32

43
module LogStash; module Outputs; class ElasticSearch;
54
module Common
@@ -13,16 +12,11 @@ def register
1312
setup_hosts # properly sets @hosts
1413
build_client
1514
install_template
16-
setup_buffer_and_handler
1715
check_action_validity
1816

1917
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts)
2018
end
2119

22-
def receive(event)
23-
@buffer << event_action_tuple(event)
24-
end
25-
2620
# Receive an array of events and immediately attempt to index them (no buffering)
2721
def multi_receive(events)
2822
events.each_slice(@flush_size) do |slice|
@@ -37,10 +31,6 @@ def event_action_tuple(event)
3731
[action, params, event]
3832
end
3933

40-
def flush
41-
@buffer.flush
42-
end
43-
4434
def setup_hosts
4535
@hosts = Array(@hosts)
4636
if @hosts.empty?
@@ -53,12 +43,6 @@ def install_template
5343
TemplateManager.install_template(self)
5444
end
5545

56-
def setup_buffer_and_handler
57-
@buffer = ::LogStash::Outputs::ElasticSearch::Buffer.new(@logger, @flush_size, @idle_flush_time) do |actions|
58-
retrying_submit(actions)
59-
end
60-
end
61-
6246
def check_action_validity
6347
raise LogStash::ConfigurationError, "No action specified!" unless @action
6448

lib/logstash/outputs/elasticsearch/common_configs.rb

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,21 +88,11 @@ def self.included(mod)
8888
mod.config :port, :obsolete => "Please use the 'hosts' setting instead. Hosts entries can be in 'host:port' format."
8989

9090
# This plugin uses the bulk index API for improved indexing performance.
91-
# In Logstashes >= 2.2 this setting defines the maximum sized bulk request Logstash will make
91+
# This setting defines the maximum sized bulk request Logstash will make
9292
# You you may want to increase this to be in line with your pipeline's batch size.
9393
# If you specify a number larger than the batch size of your pipeline it will have no effect,
9494
# save for the case where a filter increases the size of an inflight batch by outputting
9595
# events.
96-
#
97-
# In Logstashes <= 2.1 this plugin uses its own internal buffer of events.
98-
# This config option sets that size. In these older logstashes this size may
99-
# have a significant impact on heap usage, whereas in 2.2+ it will never increase it.
100-
# To make efficient bulk API calls, we will buffer a certain number of
101-
# events before flushing that out to Elasticsearch. This setting
102-
# controls how many events will be buffered before sending a batch
103-
# of events. Increasing the `flush_size` has an effect on Logstash's heap size.
104-
# Remember to also increase the heap size using `LS_HEAP_SIZE` if you are sending big documents
105-
# or have increased the `flush_size` to a higher value.
10696
mod.config :flush_size, :validate => :number, :default => 500
10797

10898
# The amount of time since last flush before a flush is forced.

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def start_sniffing!
5757
if options[:sniffing]
5858
@sniffer_thread = Thread.new do
5959
loop do
60-
@request_mutex.synchronize { sniff! }
60+
sniff!
6161
sleep (options[:sniffing_delay].to_f || 30)
6262
end
6363
end
@@ -70,8 +70,12 @@ def stop_sniffing!
7070

7171
def sniff!
7272
client.transport.reload_connections! if options[:sniffing]
73-
hosts_by_name = client.transport.hosts.map {|h| h["name"]}.sort
74-
@logger.debug({"count" => hosts_by_name.count, "hosts" => hosts_by_name})
73+
74+
if @logger.debug?
75+
hosts_by_name = client.transport.hosts.map {|h| h["name"]}.sort
76+
@logger.debug({"count" => hosts_by_name.count, "hosts" => hosts_by_name})
77+
end
78+
7579
rescue StandardError => e
7680
@logger.error("Error while sniffing connection",
7781
:message => e.message,

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Gem::Specification.new do |s|
2424
s.add_runtime_dependency 'elasticsearch', ['>= 1.0.13', '~> 1.0']
2525
s.add_runtime_dependency 'stud', ['>= 0.0.17', '~> 0.0']
2626
s.add_runtime_dependency 'cabin', ['~> 0.6']
27-
s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0"
27+
s.add_runtime_dependency "logstash-core", ">= 2.2.0", "< 3.0.0"
2828

2929
s.add_development_dependency 'ftw', '~> 0.0.42'
3030
s.add_development_dependency 'logstash-codec-plain'

spec/integration/outputs/create_spec.rb

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ def get_es_output(action, id = nil)
2929
it "should create new documents with or without id" do
3030
subject = get_es_output("create", "id123")
3131
subject.register
32-
subject.receive(LogStash::Event.new("message" => "sample message here"))
33-
subject.flush
32+
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
3433
@es.indices.refresh
3534
# Wait or fail until everything's indexed.
3635
Stud::try(3.times) do
@@ -42,8 +41,7 @@ def get_es_output(action, id = nil)
4241
it "should create new documents without id" do
4342
subject = get_es_output("create")
4443
subject.register
45-
subject.receive(LogStash::Event.new("message" => "sample message here"))
46-
subject.flush
44+
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
4745
@es.indices.refresh
4846
# Wait or fail until everything's indexed.
4947
Stud::try(3.times) do

spec/integration/outputs/index_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
before do
1212
subject.register
1313
event_count.times do
14-
subject.receive(LogStash::Event.new("message" => "Hello World!", "type" => type))
14+
subject.multi_receive([LogStash::Event.new("message" => "Hello World!", "type" => type)])
1515
end
1616
end
1717

spec/integration/outputs/parent_spec.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
ftw.put!("#{index_url}/#{type}_parent/test", :body => pdoc.to_json)
2020

2121
subject.register
22-
event_count.times do
23-
subject.receive(LogStash::Event.new("link_to" => "test", "message" => "Hello World!", "type" => type))
24-
end
22+
subject.multi_receive(event_count.times.map { LogStash::Event.new("link_to" => "test", "message" => "Hello World!", "type" => type) })
2523
end
2624

2725

0 commit comments

Comments
 (0)