Skip to content

Commit 7dde2de

Browse files
author
Guy Boertje
authored
Merge pull request logstash-plugins#385 from dchauviere/issue_239
Fix logstash-plugins#239 - interpolated action "update" don't get params "upsert" and…
2 parents 92c52f9 + 5272a72 commit 7dde2de

File tree

2 files changed

+78
-64
lines changed

2 files changed

+78
-64
lines changed

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ module Common
1010
DOC_CONFLICT_CODE = 409
1111

1212
# When you use external versioning, you are communicating that you want
13-
# to ignore conflicts. More obviously, since an external version is a
13+
# to ignore conflicts. More obviously, since an external version is a
1414
# constant part of the incoming document, we should not retry, as retrying
15-
# will never succeed.
15+
# will never succeed.
1616
VERSION_TYPES_PERMITTING_CONFLICT = ["external", "external_gt", "external_gte"]
1717

1818
def register
@@ -36,8 +36,45 @@ def multi_receive(events)
3636

3737
# Convert the event into a 3-tuple of action, params, and event
3838
def event_action_tuple(event)
39-
params = event_action_params(event)
39+
4040
action = event.sprintf(@action)
41+
42+
params = {
43+
:_id => @document_id ? event.sprintf(@document_id) : nil,
44+
:_index => event.sprintf(@index),
45+
:_type => get_event_type(event),
46+
:_routing => @routing ? event.sprintf(@routing) : nil
47+
}
48+
49+
if @pipeline
50+
params[:pipeline] = event.sprintf(@pipeline)
51+
end
52+
53+
if @parent
54+
if @join_field
55+
join_value = event.get(@join_field)
56+
parent_value = event.sprintf(@parent)
57+
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
58+
params[:_routing] = event.sprintf(@parent)
59+
else
60+
params[:parent] = event.sprintf(@parent)
61+
end
62+
end
63+
64+
if action == 'update'
65+
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
66+
params[:_script] = event.sprintf(@script) if @script != ""
67+
params[:_retry_on_conflict] = @retry_on_conflict
68+
end
69+
70+
if @version
71+
params[:version] = event.sprintf(@version)
72+
end
73+
74+
if @version_type
75+
params[:version_type] = event.sprintf(@version_type)
76+
end
77+
4178
[action, params, event]
4279
end
4380

@@ -160,49 +197,6 @@ def submit(actions)
160197
actions_to_retry
161198
end
162199

163-
# get the action parameters for the given event
164-
def event_action_params(event)
165-
type = get_event_type(event)
166-
167-
params = {
168-
:_id => @document_id ? event.sprintf(@document_id) : nil,
169-
:_index => event.sprintf(@index),
170-
:_type => type,
171-
:_routing => @routing ? event.sprintf(@routing) : nil
172-
}
173-
174-
if @pipeline
175-
params[:pipeline] = event.sprintf(@pipeline)
176-
end
177-
178-
if @parent
179-
if @join_field
180-
join_value = event.get(@join_field)
181-
parent_value = event.sprintf(@parent)
182-
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
183-
params[:_routing] = event.sprintf(@parent)
184-
else
185-
params[:parent] = event.sprintf(@parent)
186-
end
187-
end
188-
189-
if @action == 'update'
190-
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
191-
params[:_script] = event.sprintf(@script) if @script != ""
192-
params[:_retry_on_conflict] = @retry_on_conflict
193-
end
194-
195-
if @version
196-
params[:version] = event.sprintf(@version)
197-
end
198-
199-
if @version_type
200-
params[:version_type] = event.sprintf(@version_type)
201-
end
202-
203-
params
204-
end
205-
206200
# Determine the correct value for the 'type' field for the given event
207201
DEFAULT_EVENT_TYPE="doc".freeze
208202
def get_event_type(event)

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
let(:manticore_urls) { subject.client.pool.urls }
4141
let(:manticore_url) { manticore_urls.first }
42-
42+
4343
describe "getting a document type" do
4444
it "should default to 'doc'" do
4545
expect(subject.send(:get_event_type, LogStash::Event.new)).to eql("doc")
@@ -88,33 +88,33 @@
8888
end
8989
end
9090
end
91-
91+
9292
describe "with auth" do
9393
let(:user) { "myuser" }
9494
let(:password) { ::LogStash::Util::Password.new("mypassword") }
95-
95+
9696
shared_examples "an authenticated config" do
9797
it "should set the URL auth correctly" do
9898
expect(manticore_url.user).to eq user
9999
end
100100
end
101-
101+
102102
context "as part of a URL" do
103103
let(:options) {
104104
super.merge("hosts" => ["http://#{user}:#{password.value}@localhost:9200"])
105105
}
106-
106+
107107
include_examples("an authenticated config")
108108
end
109-
109+
110110
context "as a hash option" do
111111
let(:options) {
112112
super.merge!(
113113
"user" => user,
114114
"password" => password
115115
)
116116
}
117-
117+
118118
include_examples("an authenticated config")
119119
end
120120
end
@@ -218,7 +218,7 @@
218218

219219
context "429 errors" do
220220
let(:event) { ::LogStash::Event.new("foo" => "bar") }
221-
let(:error) do
221+
let(:error) do
222222
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
223223
429, double("url").as_null_object, double("request body"), double("response body")
224224
)
@@ -252,7 +252,7 @@
252252
end
253253
end
254254
end
255-
255+
256256
context "with timeout set" do
257257
let(:listener) { Flores::Random.tcp_listener }
258258
let(:port) { listener[2] }
@@ -290,6 +290,16 @@
290290
end
291291
end
292292

293+
context "with a sprintf action equals to update" do
294+
let(:options) { {"action" => "%{myactionfield}", "upsert" => '{"message": "some text"}' } }
295+
296+
let(:event) { LogStash::Event.new("myactionfield" => "update", "message" => "blah") }
297+
298+
it "should obtain specific action's params from event_action_tuple" do
299+
expect(subject.event_action_tuple(event)[1]).to include(:_upsert)
300+
end
301+
end
302+
293303
context "with an invalid action" do
294304
let(:options) { {"action" => "SOME Garbaaage"} }
295305
let(:do_register) { false } # this is what we want to test, so we disable the before(:each) call
@@ -301,8 +311,8 @@
301311
end
302312

303313
describe "SSL end to end" do
304-
let(:manticore_double) do
305-
double("manticoreX#{self.inspect}")
314+
let(:manticore_double) do
315+
double("manticoreX#{self.inspect}")
306316
end
307317

308318
before(:each) do
@@ -311,18 +321,18 @@
311321
allow(manticore_double).to receive(:head).with(any_args).and_return(response_double)
312322
allow(manticore_double).to receive(:get).with(any_args).and_return(response_double)
313323
allow(manticore_double).to receive(:close)
314-
324+
315325
allow(::Manticore::Client).to receive(:new).and_return(manticore_double)
316326
subject.register
317327
end
318-
328+
319329
shared_examples("an encrypted client connection") do
320330
it "should enable SSL in manticore" do
321331
expect(subject.client.pool.urls.map(&:scheme).uniq).to eql(['https'])
322332
end
323333
end
324334

325-
335+
326336
context "With the 'ssl' option" do
327337
let(:options) { {"ssl" => true}}
328338

@@ -337,24 +347,34 @@
337347

338348
describe "retry_on_conflict" do
339349
let(:num_retries) { 123 }
340-
let(:event) { LogStash::Event.new("message" => "blah") }
350+
let(:event) { LogStash::Event.new("myactionfield" => "update", "message" => "blah") }
341351
let(:options) { { 'retry_on_conflict' => num_retries } }
342352

343353
context "with a regular index" do
344354
let(:options) { super.merge("action" => "index") }
345355

346-
it "should interpolate the requested action value when creating an event_action_tuple" do
356+
it "should not set the retry_on_conflict parameter when creating an event_action_tuple" do
347357
action, params, event_data = subject.event_action_tuple(event)
348358
expect(params).not_to include({:_retry_on_conflict => num_retries})
349359
end
350360
end
351361

352362
context "using a plain update" do
353-
let(:options) { super.merge("action" => "update", "retry_on_conflict" => num_retries, "document_id" => 1) }
363+
let(:options) { super.merge("action" => "update", "retry_on_conflict" => num_retries, "document_id" => 1) }
354364

355-
it "should interpolate the requested action value when creating an event_action_tuple" do
365+
it "should set the retry_on_conflict parameter when creating an event_action_tuple" do
366+
action, params, event_data = subject.event_action_tuple(event)
367+
expect(params).to include({:_retry_on_conflict => num_retries})
368+
end
369+
end
370+
371+
context "with a sprintf action that resolves to update" do
372+
let(:options) { super.merge("action" => "%{myactionfield}", "retry_on_conflict" => num_retries, "document_id" => 1) }
373+
374+
it "should set the retry_on_conflict parameter when creating an event_action_tuple" do
356375
action, params, event_data = subject.event_action_tuple(event)
357376
expect(params).to include({:_retry_on_conflict => num_retries})
377+
expect(action).to eq("update")
358378
end
359379
end
360380
end

0 commit comments

Comments
 (0)