Skip to content

Commit 5ace218

Browse files
committed
Make this plugin a 'threadsafe' output (requires LS >= 2.2.0)
Fixes logstash-plugins#377 Before acceptance this will need elastic/elasticsearch-ruby#281 Merged in and used for the ruby client
1 parent 54a3a99 commit 5ace218

File tree

15 files changed

+54
-363
lines changed

15 files changed

+54
-363
lines changed

lib/logstash/outputs/elasticsearch/buffer.rb

Lines changed: 0 additions & 124 deletions
This file was deleted.

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
require "logstash/outputs/elasticsearch/template_manager"
2-
require "logstash/outputs/elasticsearch/buffer"
32

43
module LogStash; module Outputs; class ElasticSearch;
54
module Common
@@ -13,16 +12,11 @@ def register
1312
setup_hosts # properly sets @hosts
1413
build_client
1514
install_template
16-
setup_buffer_and_handler
1715
check_action_validity
1816

1917
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts)
2018
end
2119

22-
def receive(event)
23-
@buffer << event_action_tuple(event)
24-
end
25-
2620
# Receive an array of events and immediately attempt to index them (no buffering)
2721
def multi_receive(events)
2822
events.each_slice(@flush_size) do |slice|
@@ -37,10 +31,6 @@ def event_action_tuple(event)
3731
[action, params, event]
3832
end
3933

40-
def flush
41-
@buffer.flush
42-
end
43-
4434
def setup_hosts
4535
@hosts = Array(@hosts)
4636
if @hosts.empty?
@@ -53,12 +43,6 @@ def install_template
5343
TemplateManager.install_template(self)
5444
end
5545

56-
def setup_buffer_and_handler
57-
@buffer = ::LogStash::Outputs::ElasticSearch::Buffer.new(@logger, @flush_size, @idle_flush_time) do |actions|
58-
retrying_submit(actions)
59-
end
60-
end
61-
6246
def check_action_validity
6347
raise LogStash::ConfigurationError, "No action specified!" unless @action
6448

lib/logstash/outputs/elasticsearch/common_configs.rb

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,21 +88,11 @@ def self.included(mod)
8888
mod.config :port, :obsolete => "Please use the 'hosts' setting instead. Hosts entries can be in 'host:port' format."
8989

9090
# This plugin uses the bulk index API for improved indexing performance.
91-
# In Logstashes >= 2.2 this setting defines the maximum sized bulk request Logstash will make
91+
# This setting defines the maximum sized bulk request Logstash will make
9292
# You you may want to increase this to be in line with your pipeline's batch size.
9393
# If you specify a number larger than the batch size of your pipeline it will have no effect,
9494
# save for the case where a filter increases the size of an inflight batch by outputting
9595
# events.
96-
#
97-
# In Logstashes <= 2.1 this plugin uses its own internal buffer of events.
98-
# This config option sets that size. In these older logstashes this size may
99-
# have a significant impact on heap usage, whereas in 2.2+ it will never increase it.
100-
# To make efficient bulk API calls, we will buffer a certain number of
101-
# events before flushing that out to Elasticsearch. This setting
102-
# controls how many events will be buffered before sending a batch
103-
# of events. Increasing the `flush_size` has an effect on Logstash's heap size.
104-
# Remember to also increase the heap size using `LS_HEAP_SIZE` if you are sending big documents
105-
# or have increased the `flush_size` to a higher value.
10696
mod.config :flush_size, :validate => :number, :default => 500
10797

10898
# The amount of time since last flush before a flush is forced.

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,12 @@ def stop_sniffing!
7070

7171
def sniff!
7272
client.transport.reload_connections! if options[:sniffing]
73-
hosts_by_name = client.transport.hosts.map {|h| h["name"]}.sort
74-
@logger.debug({"count" => hosts_by_name.count, "hosts" => hosts_by_name})
73+
74+
if @logger.debug?
75+
hosts_by_name = client.transport.hosts.map {|h| h["name"]}.sort
76+
@logger.debug({"count" => hosts_by_name.count, "hosts" => hosts_by_name})
77+
end
78+
7579
rescue StandardError => e
7680
@logger.error("Error while sniffing connection",
7781
:message => e.message,

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Gem::Specification.new do |s|
2424
s.add_runtime_dependency 'elasticsearch', ['>= 1.0.13', '~> 1.0']
2525
s.add_runtime_dependency 'stud', ['>= 0.0.17', '~> 0.0']
2626
s.add_runtime_dependency 'cabin', ['~> 0.6']
27-
s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0"
27+
s.add_runtime_dependency "logstash-core", ">= 2.2.0", "< 3.0.0"
2828

2929
s.add_development_dependency 'ftw', '~> 0.0.42'
3030
s.add_development_dependency 'logstash-codec-plain'

spec/integration/outputs/create_spec.rb

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ def get_es_output(action, id = nil)
2929
it "should create new documents with or without id" do
3030
subject = get_es_output("create", "id123")
3131
subject.register
32-
subject.receive(LogStash::Event.new("message" => "sample message here"))
33-
subject.flush
32+
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
3433
@es.indices.refresh
3534
# Wait or fail until everything's indexed.
3635
Stud::try(3.times) do
@@ -42,8 +41,7 @@ def get_es_output(action, id = nil)
4241
it "should create new documents without id" do
4342
subject = get_es_output("create")
4443
subject.register
45-
subject.receive(LogStash::Event.new("message" => "sample message here"))
46-
subject.flush
44+
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
4745
@es.indices.refresh
4846
# Wait or fail until everything's indexed.
4947
Stud::try(3.times) do

spec/integration/outputs/index_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
before do
1212
subject.register
1313
event_count.times do
14-
subject.receive(LogStash::Event.new("message" => "Hello World!", "type" => type))
14+
subject.multi_receive([LogStash::Event.new("message" => "Hello World!", "type" => type)])
1515
end
1616
end
1717

spec/integration/outputs/parent_spec.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
ftw.put!("#{index_url}/#{type}_parent/test", :body => pdoc.to_json)
2020

2121
subject.register
22-
event_count.times do
23-
subject.receive(LogStash::Event.new("link_to" => "test", "message" => "Hello World!", "type" => type))
24-
end
22+
subject.multi_receive(event_count.times.map { LogStash::Event.new("link_to" => "test", "message" => "Hello World!", "type" => type) })
2523
end
2624

2725

spec/integration/outputs/retry_spec.rb

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,7 @@ def mock_actions_with_response(*resp)
5555
mock_actions_with_response({"errors" => false})
5656
expect(subject).to receive(:submit).with([action1, action2]).once.and_call_original
5757
subject.register
58-
subject.receive(event1)
59-
subject.receive(event2)
60-
subject.flush
61-
sleep(2)
58+
subject.multi_receive([event1, event2])
6259
end
6360

6461
it "retry exceptions within the submit body" do
@@ -73,8 +70,7 @@ def mock_actions_with_response(*resp)
7370
end
7471
end
7572

76-
subject.receive(event1)
77-
subject.flush
73+
subject.multi_receive([event1])
7874
end
7975

8076
it "should retry actions with response status of 503" do
@@ -86,12 +82,7 @@ def mock_actions_with_response(*resp)
8682
expect(subject).to receive(:submit).with([action2]).ordered.once.and_call_original
8783

8884
subject.register
89-
subject.receive(event1)
90-
subject.receive(event1)
91-
subject.receive(event1)
92-
subject.receive(event2)
93-
subject.flush
94-
sleep(3)
85+
subject.multi_receive([event1, event1, event1, event2])
9586
end
9687

9788
it "should retry actions with response status of 429" do
@@ -101,9 +92,7 @@ def mock_actions_with_response(*resp)
10192
{"errors" => false})
10293
expect(subject).to receive(:submit).with([action1]).twice.and_call_original
10394

104-
subject.receive(event1)
105-
subject.flush
106-
sleep(3)
95+
subject.multi_receive([event1])
10796
end
10897

10998
it "should retry an event infinitely until a non retryable status occurs" do
@@ -115,49 +104,38 @@ def mock_actions_with_response(*resp)
115104
{"errors" => true, "statuses" => [500]})
116105
expect(subject).to receive(:submit).with([action1]).exactly(6).times.and_call_original
117106
subject.register
118-
subject.receive(event1)
119-
subject.flush
120-
sleep(5)
107+
subject.multi_receive([event1])
121108
end
122109

123110
it "non-retryable errors like mapping errors (400) should be dropped and not be retried (unfortunately)" do
124111
subject.register
125-
subject.receive(invalid_event)
126112
expect(subject).to receive(:submit).once.and_call_original
113+
subject.multi_receive([invalid_event])
127114
subject.close
128115

129116
@es.indices.refresh
130-
sleep(5)
131-
Stud::try(10.times) do
132-
r = @es.search
133-
insist { r["hits"]["total"] } == 0
134-
end
117+
r = @es.search
118+
expect(r["hits"]["total"]).to eql(0)
135119
end
136120

137121
it "successful requests should not be appended to retry queue" do
138-
subject.register
139-
subject.receive(event1)
140122
expect(subject).to receive(:submit).once.and_call_original
123+
124+
subject.register
125+
subject.multi_receive([event1])
141126
subject.close
142127
@es.indices.refresh
143-
sleep(5)
144-
Stud::try(10.times) do
145-
r = @es.search
146-
insist { r["hits"]["total"] } == 1
147-
end
128+
r = @es.search
129+
expect(r["hits"]["total"]).to eql(1)
148130
end
149131

150132
it "should only index proper events" do
151133
subject.register
152-
subject.receive(invalid_event)
153-
subject.receive(event1)
134+
subject.multi_receive([invalid_event, event1])
154135
subject.close
155136

156137
@es.indices.refresh
157-
sleep(5)
158-
Stud::try(10.times) do
159-
r = @es.search
160-
insist { r["hits"]["total"] } == 1
161-
end
138+
r = @es.search
139+
expect(r["hits"]["total"]).to eql(1)
162140
end
163141
end

spec/integration/outputs/routing_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
before do
1313
subject.register
1414
event_count.times do
15-
subject.receive(LogStash::Event.new("message" => "Hello World!", "type" => type))
15+
subject.multi_receive([LogStash::Event.new("message" => "Hello World!", "type" => type)])
1616
end
1717
end
1818

0 commit comments

Comments
 (0)