Skip to content

Commit b401631

Browse files
authored
Route badly interpolated dynamic index to DLQ (#1084)
* Added unit test to verify that bad replaced dynamic index name doesn't create an Elasticsearch index * Added scaffolding to do not send to ES index dynamic names that are not fully resolved * Fixed test to check no index is created in case of missed placeholder resolution in the index name * Minor, removed commented code * Route unresolved index name events to DLQ * Extracted common dlq sending method * Minor, removed commented code * Added feature flag to eventually disable the DLQ-ing of not replaced indexnames * Bumped version
1 parent bbba6ae commit b401631

File tree

7 files changed

+186
-51
lines changed

7 files changed

+186
-51
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.9.0
2+
- Feature: force unresolved dynamic index names to be sent into DLQ. This feature could be explicitly disabled using `dlq_on_failed_indexname_interpolation` setting [#1084](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1084)
3+
14
## 11.8.0
25
- Feature: Adds a new `dlq_custom_codes` option to customize DLQ codes [#1067](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1067)
36

docs/index.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ This plugin supports the following configuration options plus the
320320
| <<plugins-{type}s-{plugin}-data_stream_sync_fields>> |<<boolean,boolean>>|No
321321
| <<plugins-{type}s-{plugin}-data_stream_type>> |<<string,string>>|No
322322
| <<plugins-{type}s-{plugin}-dlq_custom_codes>> |<<number,number>>|No
323+
| <<plugins-{type}s-{plugin}-dlq_on_failed_indexname_interpolation>> |<<boolean,boolean>>|No
323324
| <<plugins-{type}s-{plugin}-doc_as_upsert>> |<<boolean,boolean>>|No
324325
| <<plugins-{type}s-{plugin}-document_id>> |<<string,string>>|No
325326
| <<plugins-{type}s-{plugin}-document_type>> |<<string,string>>|No
@@ -533,6 +534,14 @@ This list is an addition to the ordinary error codes considered for this feature
533534
It's considered a configuration error to re-use the same predefined codes for success, DLQ or conflict.
534535
The option accepts a list of natural numbers corresponding to HTTP errors codes.
535536

537+
[id="plugins-{type}s-{plugin}-dlq_on_failed_indexname_interpolation"]
538+
===== `dlq_on_failed_indexname_interpolation`
539+
540+
* Value type is <<boolean,boolean>>
541+
* Default value is `true`.
542+
543+
If enabled, failed index name interpolation events go into dead letter queue.
544+
536545
[id="plugins-{type}s-{plugin}-doc_as_upsert"]
537546
===== `doc_as_upsert`
538547

lib/logstash/outputs/elasticsearch.rb

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
261261
# The option accepts a list of natural numbers corresponding to HTTP errors codes.
262262
config :dlq_custom_codes, :validate => :number, :list => true, :default => []
263263

264+
# if enabled, failed index name interpolation events go into dead letter queue.
265+
config :dlq_on_failed_indexname_interpolation, :validate => :boolean, :default => true
266+
264267
attr_reader :client
265268
attr_reader :default_index
266269
attr_reader :default_ilm_rollover_alias
@@ -362,11 +365,43 @@ def config_init(params)
362365
# Receive an array of events and immediately attempt to index them (no buffering)
363366
def multi_receive(events)
364367
wait_for_successful_connection if @after_successful_connection_done
365-
retrying_submit map_events(events)
368+
events_mapped = safe_interpolation_map_events(events)
369+
retrying_submit(events_mapped.successful_events)
370+
unless events_mapped.failed_events.empty?
371+
@logger.error("Can't map some events, needs to be handled by DLQ #{events_mapped.failed_events}")
372+
send_failed_resolutions_to_dlq(events_mapped.failed_events)
373+
end
374+
end
375+
376+
# @param: Arrays of EventActionTuple
377+
private
378+
def send_failed_resolutions_to_dlq(failed_action_tuples)
379+
failed_action_tuples.each do |action|
380+
handle_dlq_status(action, "warn", "Could not resolve dynamic index")
381+
end
382+
end
383+
384+
MapEventsResult = Struct.new(:successful_events, :failed_events)
385+
386+
private
387+
def safe_interpolation_map_events(events)
388+
successful_events = [] # list of LogStash::Outputs::ElasticSearch::EventActionTuple
389+
failed_events = [] # list of LogStash::Event
390+
events.each do |event|
391+
begin
392+
successful_events << @event_mapper.call(event)
393+
rescue IndexInterpolationError, e
394+
action = event.sprintf(@action || 'index')
395+
event_action_tuple = EventActionTuple.new(action, [], event)
396+
failed_events << event_action_tuple
397+
end
398+
end
399+
MapEventsResult.new(successful_events, failed_events)
366400
end
367401

402+
public
368403
def map_events(events)
369-
events.map(&@event_mapper)
404+
safe_interpolation_map_events(events).successful_events
370405
end
371406

372407
def wait_for_successful_connection
@@ -441,12 +476,24 @@ def initialize(action, params, event, event_data = nil)
441476

442477
end
443478

479+
class IndexInterpolationError < ArgumentError
480+
attr_reader :bad_formatted_index
481+
482+
def initialize(bad_formatted_index)
483+
super("Badly formatted index, after interpolation still contains placeholder: [#{bad_formatted_index}]")
484+
485+
@bad_formatted_index = bad_formatted_index
486+
end
487+
end
488+
444489
# @return Hash (initial) parameters for given event
445490
# @private shared event params factory between index and data_stream mode
446491
def common_event_params(event)
492+
sprintf_index = @event_target.call(event)
493+
raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation
447494
params = {
448495
:_id => @document_id ? event.sprintf(@document_id) : nil,
449-
:_index => @event_target.call(event),
496+
:_index => sprintf_index,
450497
routing_field_name => @routing ? event.sprintf(@routing) : nil
451498
}
452499

lib/logstash/plugin_mixins/elasticsearch/common.rb

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -206,19 +206,23 @@ def next_sleep_interval(current_interval)
206206
doubled > @retry_max_interval ? @retry_max_interval : doubled
207207
end
208208

209-
def handle_dlq_status(message, action, status, response)
209+
def handle_dlq_response(message, action, status, response)
210+
_, action_params = action.event, [action[0], action[1], action[2]]
211+
212+
# TODO: Change this to send a map with { :status => status, :action => action } in the future
213+
detailed_message = "#{message} status: #{status}, action: #{action_params}, response: #{response}"
214+
215+
log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn
216+
217+
handle_dlq_status(action, log_level, detailed_message)
218+
end
219+
220+
def handle_dlq_status(action, log_level, message)
210221
# To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior)
211222
if @dlq_writer
212-
event, action = action.event, [action[0], action[1], action[2]]
213-
# TODO: Change this to send a map with { :status => status, :action => action } in the future
214-
@dlq_writer.write(event, "#{message} status: #{status}, action: #{action}, response: #{response}")
223+
@dlq_writer.write(action.event, "#{message}")
215224
else
216-
if dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception'
217-
level = :error
218-
else
219-
level = :warn
220-
end
221-
@logger.send level, message, status: status, action: action, response: response
225+
@logger.send log_level, message
222226
end
223227
end
224228

@@ -269,7 +273,7 @@ def submit(actions)
269273
@logger.warn "Failed action", status: status, action: action, response: response if log_failure_type?(error)
270274
next
271275
elsif @dlq_codes.include?(status)
272-
handle_dlq_status("Could not index event to Elasticsearch.", action, status, response)
276+
handle_dlq_response("Could not index event to Elasticsearch.", action, status, response)
273277
@document_level_metrics.increment(:non_retryable_failures)
274278
next
275279
else

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.8.0'
3+
s.version = '11.9.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/outputs/index_spec.rb

Lines changed: 101 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require_relative "../../../spec/es_spec_helper"
22
require "logstash/outputs/elasticsearch"
3+
require 'cgi'
34

45
describe "TARGET_BULK_BYTES", :integration => true do
56
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
@@ -45,16 +46,57 @@
4546
end
4647
end
4748

48-
describe "indexing" do
49+
def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false); require 'open3'
50+
cmd = "curl -s -v --show-error #{curl_opts} -X #{method.to_s.upcase} -k #{url}"
51+
begin
52+
out, err, status = Open3.capture3(cmd)
53+
rescue Errno::ENOENT
54+
fail "curl not available, make sure curl binary is installed and available on $PATH"
55+
end
56+
57+
if status.success?
58+
http_status = err.match(/< HTTP\/1.1 (\d+)/)[1] || '0' # < HTTP/1.1 200 OK\r\n
59+
60+
if http_status.strip[0].to_i > 2
61+
error = (LogStash::Json.load(out)['error']) rescue nil
62+
if error
63+
if retrieve_err_payload
64+
return error
65+
else
66+
fail "#{cmd.inspect} received an error: #{http_status}\n\n#{error.inspect}"
67+
end
68+
else
69+
warn out
70+
fail "#{cmd.inspect} unexpected response: #{http_status}\n\n#{err}"
71+
end
72+
end
73+
74+
LogStash::Json.load(out)
75+
else
76+
warn out
77+
fail "#{cmd.inspect} process failed: #{status}\n\n#{err}"
78+
end
79+
end
80+
81+
describe "indexing with sprintf resolution", :integration => true do
4982
let(:message) { "Hello from #{__FILE__}" }
5083
let(:event) { LogStash::Event.new("message" => message, "type" => type) }
51-
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
84+
let (:index) { "%{[index_name]}_dynamic" }
5285
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
53-
let(:event_count) { 1 + rand(2) }
54-
let(:config) { "not implemented" }
86+
let(:event_count) { 1 }
87+
let(:user) { "simpleuser" }
88+
let(:password) { "abc123" }
89+
let(:config) do
90+
{
91+
"hosts" => [ get_host_port ],
92+
"user" => user,
93+
"password" => password,
94+
"index" => index
95+
}
96+
end
5597
let(:events) { event_count.times.map { event }.to_a }
5698
subject { LogStash::Outputs::ElasticSearch.new(config) }
57-
99+
58100
let(:es_url) { "http://#{get_host_port}" }
59101
let(:index_url) { "#{es_url}/#{index}" }
60102

@@ -63,33 +105,65 @@
63105
let(:es_admin) { 'admin' } # default user added in ES -> 8.x requires auth credentials for /_refresh etc
64106
let(:es_admin_pass) { 'elastic' }
65107

66-
def curl_and_get_json_response(url, method: :get); require 'open3'
67-
cmd = "curl -s -v --show-error #{curl_opts} -X #{method.to_s.upcase} -k #{url}"
68-
begin
69-
out, err, status = Open3.capture3(cmd)
70-
rescue Errno::ENOENT
71-
fail "curl not available, make sure curl binary is installed and available on $PATH"
72-
end
108+
let(:initial_events) { [] }
73109

74-
if status.success?
75-
http_status = err.match(/< HTTP\/1.1 (\d+)/)[1] || '0' # < HTTP/1.1 200 OK\r\n
110+
let(:do_register) { true }
76111

77-
if http_status.strip[0].to_i > 2
78-
error = (LogStash::Json.load(out)['error']) rescue nil
79-
if error
80-
fail "#{cmd.inspect} received an error: #{http_status}\n\n#{error.inspect}"
81-
else
82-
warn out
83-
fail "#{cmd.inspect} unexpected response: #{http_status}\n\n#{err}"
84-
end
85-
end
112+
before do
113+
subject.register if do_register
114+
subject.multi_receive(initial_events) if initial_events
115+
end
86116

87-
LogStash::Json.load(out)
88-
else
89-
warn out
90-
fail "#{cmd.inspect} process failed: #{status}\n\n#{err}"
117+
after do
118+
subject.do_close
119+
end
120+
121+
let(:event) { LogStash::Event.new("message" => message, "type" => type, "index_name" => "test") }
122+
123+
it "should index successfully when field is resolved" do
124+
expected_index_name = "test_dynamic"
125+
subject.multi_receive(events)
126+
127+
# curl_and_get_json_response "#{es_url}/_refresh", method: :post
128+
129+
result = curl_and_get_json_response "#{es_url}/#{expected_index_name}"
130+
131+
expect(result[expected_index_name]).not_to be(nil)
132+
end
133+
134+
context "when dynamic field doesn't resolve the index_name" do
135+
let(:event) { LogStash::Event.new("message" => message, "type" => type) }
136+
let(:dlq_writer) { double('DLQ writer') }
137+
before { subject.instance_variable_set('@dlq_writer', dlq_writer) }
138+
139+
it "should doesn't create an index name with unresolved placeholders" do
140+
expect(dlq_writer).to receive(:write).once.with(event, /Could not resolve dynamic index/)
141+
subject.multi_receive(events)
142+
143+
escaped_index_name = CGI.escape("%{[index_name]}_dynamic")
144+
result = curl_and_get_json_response "#{es_url}/#{escaped_index_name}", retrieve_err_payload: true
145+
expect(result["root_cause"].first()["type"]).to eq("index_not_found_exception")
91146
end
92147
end
148+
end
149+
150+
describe "indexing" do
151+
let(:message) { "Hello from #{__FILE__}" }
152+
let(:event) { LogStash::Event.new("message" => message, "type" => type) }
153+
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
154+
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
155+
let(:event_count) { 1 + rand(2) }
156+
let(:config) { "not implemented" }
157+
let(:events) { event_count.times.map { event }.to_a }
158+
subject { LogStash::Outputs::ElasticSearch.new(config) }
159+
160+
let(:es_url) { "http://#{get_host_port}" }
161+
let(:index_url) { "#{es_url}/#{index}" }
162+
163+
let(:curl_opts) { nil }
164+
165+
let(:es_admin) { 'admin' } # default user added in ES -> 8.x requires auth credentials for /_refresh etc
166+
let(:es_admin_pass) { 'elastic' }
93167

94168
let(:initial_events) { [] }
95169

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,7 @@
768768

769769
context 'handling elasticsearch document-level status meant for the DLQ' do
770770
let(:options) { { "manage_template" => false } }
771+
let(:action) { LogStash::Outputs::ElasticSearch::EventActionTuple.new(:action, :params, LogStash::Event.new("foo" => "bar")) }
771772

772773
context 'when @dlq_writer is nil' do
773774
before { subject.instance_variable_set '@dlq_writer', nil }
@@ -777,31 +778,28 @@
777778
it 'should log at ERROR level' do
778779
subject.instance_variable_set(:@logger, double("logger").as_null_object)
779780
mock_response = { 'index' => { 'error' => { 'type' => 'invalid_index_name_exception' } } }
780-
subject.handle_dlq_status("Could not index event to Elasticsearch.",
781-
[:action, :params, :event], :some_status, mock_response)
781+
subject.handle_dlq_response("Could not index event to Elasticsearch.", action, :some_status, mock_response)
782782
end
783783
end
784784

785785
context 'when getting any other exception' do
786786
it 'should log at WARN level' do
787787
logger = double("logger").as_null_object
788788
subject.instance_variable_set(:@logger, logger)
789-
expect(logger).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response))
789+
expect(logger).to receive(:warn).with(a_string_including "Could not index event to Elasticsearch. status: some_status, action: [:action, :params, {")
790790
mock_response = { 'index' => { 'error' => { 'type' => 'illegal_argument_exception' } } }
791-
subject.handle_dlq_status("Could not index event to Elasticsearch.",
792-
[:action, :params, :event], :some_status, mock_response)
791+
subject.handle_dlq_response("Could not index event to Elasticsearch.", action, :some_status, mock_response)
793792
end
794793
end
795794

796795
context 'when the response does not include [error]' do
797796
it 'should not fail, but just log a warning' do
798797
logger = double("logger").as_null_object
799798
subject.instance_variable_set(:@logger, logger)
800-
expect(logger).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response))
799+
expect(logger).to receive(:warn).with(a_string_including "Could not index event to Elasticsearch. status: some_status, action: [:action, :params, {")
801800
mock_response = { 'index' => {} }
802801
expect do
803-
subject.handle_dlq_status("Could not index event to Elasticsearch.",
804-
[:action, :params, :event], :some_status, mock_response)
802+
subject.handle_dlq_response("Could not index event to Elasticsearch.", action, :some_status, mock_response)
805803
end.to_not raise_error
806804
end
807805
end
@@ -821,7 +819,7 @@
821819
expect(dlq_writer).to receive(:write).once.with(event, /Could not index/)
822820
mock_response = { 'index' => { 'error' => { 'type' => 'illegal_argument_exception' } } }
823821
action = LogStash::Outputs::ElasticSearch::EventActionTuple.new(:action, :params, event)
824-
subject.handle_dlq_status("Could not index event to Elasticsearch.", action, 404, mock_response)
822+
subject.handle_dlq_response("Could not index event to Elasticsearch.", action, 404, mock_response)
825823
end
826824
end
827825

0 commit comments

Comments
 (0)