-
Notifications
You must be signed in to change notification settings - Fork 306
Route badly interpolated dynamic index to DLQ #1084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0fd1044
d5282fd
175ae5a
35b0a57
05e68bf
f381cff
b6b8981
6379134
6a66e14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -320,6 +320,7 @@ This plugin supports the following configuration options plus the | |
| <<plugins-{type}s-{plugin}-data_stream_sync_fields>> |<<boolean,boolean>>|No | ||
| <<plugins-{type}s-{plugin}-data_stream_type>> |<<string,string>>|No | ||
| <<plugins-{type}s-{plugin}-dlq_custom_codes>> |<<number,number>>|No | ||
| <<plugins-{type}s-{plugin}-dlq_on_failed_indexname_interpolation>> |<<boolean,boolean>>|No | ||
| <<plugins-{type}s-{plugin}-doc_as_upsert>> |<<boolean,boolean>>|No | ||
| <<plugins-{type}s-{plugin}-document_id>> |<<string,string>>|No | ||
| <<plugins-{type}s-{plugin}-document_type>> |<<string,string>>|No | ||
|
@@ -533,6 +534,14 @@ This list is an addition to the ordinary error codes considered for this feature | |
It's considered a configuration error to re-use the same predefined codes for success, DLQ or conflict. | ||
The option accepts a list of natural numbers corresponding to HTTP errors codes. | ||
|
||
[id="plugins-{type}s-{plugin}-dlq_on_failed_indexname_interpolation"] | ||
===== `dlq_on_failed_indexname_interpolation` | ||
|
||
* Value type is <<boolean,boolean>> | ||
* Default value is `true`. | ||
|
||
If enabled, failed index name interpolation events go into dead letter queue. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be rephrased if we align on |
||
|
||
[id="plugins-{type}s-{plugin}-doc_as_upsert"] | ||
===== `doc_as_upsert` | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -261,6 +261,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base | |||||||||||||
# The option accepts a list of natural numbers corresponding to HTTP errors codes. | ||||||||||||||
config :dlq_custom_codes, :validate => :number, :list => true, :default => [] | ||||||||||||||
|
||||||||||||||
# if enabled, failed index name interpolation events go into dead letter queue. | ||||||||||||||
config :dlq_on_failed_indexname_interpolation, :validate => :boolean, :default => true | ||||||||||||||
|
||||||||||||||
attr_reader :client | ||||||||||||||
attr_reader :default_index | ||||||||||||||
attr_reader :default_ilm_rollover_alias | ||||||||||||||
|
@@ -362,11 +365,43 @@ def config_init(params) | |||||||||||||
# Receive an array of events and immediately attempt to index them (no buffering) | ||||||||||||||
def multi_receive(events) | ||||||||||||||
wait_for_successful_connection if @after_successful_connection_done | ||||||||||||||
retrying_submit map_events(events) | ||||||||||||||
events_mapped = safe_interpolation_map_events(events) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you think if we reject index name unresolved events and only bring back successful events? |
||||||||||||||
retrying_submit(events_mapped.successful_events) | ||||||||||||||
unless events_mapped.failed_events.empty? | ||||||||||||||
@logger.error("Can't map some events, needs to be handled by DLQ #{events_mapped.failed_events}") | ||||||||||||||
send_failed_resolutions_to_dlq(events_mapped.failed_events) | ||||||||||||||
end | ||||||||||||||
end | ||||||||||||||
|
||||||||||||||
# @param: Arrays of EventActionTuple | ||||||||||||||
private | ||||||||||||||
def send_failed_resolutions_to_dlq(failed_action_tuples) | ||||||||||||||
failed_action_tuples.each do |action| | ||||||||||||||
handle_dlq_status(action, "warn", "Could not resolve dynamic index") | ||||||||||||||
end | ||||||||||||||
end | ||||||||||||||
|
||||||||||||||
MapEventsResult = Struct.new(:successful_events, :failed_events) | ||||||||||||||
|
||||||||||||||
private | ||||||||||||||
def safe_interpolation_map_events(events) | ||||||||||||||
successful_events = [] # list of LogStash::Outputs::ElasticSearch::EventActionTuple | ||||||||||||||
failed_events = [] # list of LogStash::Event | ||||||||||||||
events.each do |event| | ||||||||||||||
begin | ||||||||||||||
successful_events << @event_mapper.call(event) | ||||||||||||||
rescue IndexInterpolationError, e | ||||||||||||||
action = event.sprintf(@action || 'index') | ||||||||||||||
event_action_tuple = EventActionTuple.new(action, [], event) | ||||||||||||||
failed_events << event_action_tuple | ||||||||||||||
end | ||||||||||||||
end | ||||||||||||||
MapEventsResult.new(successful_events, failed_events) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
end | ||||||||||||||
|
||||||||||||||
public | ||||||||||||||
def map_events(events) | ||||||||||||||
events.map(&@event_mapper) | ||||||||||||||
safe_interpolation_map_events(events).successful_events | ||||||||||||||
end | ||||||||||||||
|
||||||||||||||
def wait_for_successful_connection | ||||||||||||||
|
@@ -441,12 +476,24 @@ def initialize(action, params, event, event_data = nil) | |||||||||||||
|
||||||||||||||
end | ||||||||||||||
|
||||||||||||||
class IndexInterpolationError < ArgumentError | ||||||||||||||
attr_reader :bad_formatted_index | ||||||||||||||
|
||||||||||||||
def initialize(bad_formatted_index) | ||||||||||||||
super("Badly formatted index, after interpolation still contains placeholder: [#{bad_formatted_index}]") | ||||||||||||||
|
||||||||||||||
@bad_formatted_index = bad_formatted_index | ||||||||||||||
end | ||||||||||||||
end | ||||||||||||||
|
||||||||||||||
# @return Hash (initial) parameters for given event | ||||||||||||||
# @private shared event params factory between index and data_stream mode | ||||||||||||||
def common_event_params(event) | ||||||||||||||
sprintf_index = @event_target.call(event) | ||||||||||||||
raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation | ||||||||||||||
params = { | ||||||||||||||
:_id => @document_id ? event.sprintf(@document_id) : nil, | ||||||||||||||
:_index => @event_target.call(event), | ||||||||||||||
:_index => sprintf_index, | ||||||||||||||
routing_field_name => @routing ? event.sprintf(@routing) : nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -206,19 +206,23 @@ def next_sleep_interval(current_interval) | |
doubled > @retry_max_interval ? @retry_max_interval : doubled | ||
end | ||
|
||
def handle_dlq_status(message, action, status, response) | ||
def handle_dlq_response(message, action, status, response) | ||
_, action_params = action.event, [action[0], action[1], action[2]] | ||
|
||
# TODO: Change this to send a map with { :status => status, :action => action } in the future | ||
detailed_message = "#{message} status: #{status}, action: #{action_params}, response: #{response}" | ||
|
||
log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn | ||
|
||
handle_dlq_status(action, log_level, detailed_message) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Special thanks for including |
||
end | ||
|
||
def handle_dlq_status(action, log_level, message) | ||
# To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior) | ||
if @dlq_writer | ||
event, action = action.event, [action[0], action[1], action[2]] | ||
# TODO: Change this to send a map with { :status => status, :action => action } in the future | ||
@dlq_writer.write(event, "#{message} status: #{status}, action: #{action}, response: #{response}") | ||
@dlq_writer.write(action.event, "#{message}") | ||
else | ||
if dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' | ||
level = :error | ||
else | ||
level = :warn | ||
end | ||
@logger.send level, message, status: status, action: action, response: response | ||
@logger.send log_level, message | ||
end | ||
end | ||
|
||
|
@@ -269,7 +273,7 @@ def submit(actions) | |
@logger.warn "Failed action", status: status, action: action, response: response if log_failure_type?(error) | ||
next | ||
elsif @dlq_codes.include?(status) | ||
handle_dlq_status("Could not index event to Elasticsearch.", action, status, response) | ||
handle_dlq_response("Could not index event to Elasticsearch.", action, status, response) | ||
@document_level_metrics.increment(:non_retryable_failures) | ||
next | ||
else | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,5 +1,6 @@ | ||||||
require_relative "../../../spec/es_spec_helper" | ||||||
require "logstash/outputs/elasticsearch" | ||||||
require 'cgi' | ||||||
|
||||||
describe "TARGET_BULK_BYTES", :integration => true do | ||||||
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } | ||||||
|
@@ -45,16 +46,57 @@ | |||||
end | ||||||
end | ||||||
|
||||||
describe "indexing" do | ||||||
def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false); require 'open3' | ||||||
cmd = "curl -s -v --show-error #{curl_opts} -X #{method.to_s.upcase} -k #{url}" | ||||||
begin | ||||||
out, err, status = Open3.capture3(cmd) | ||||||
rescue Errno::ENOENT | ||||||
fail "curl not available, make sure curl binary is installed and available on $PATH" | ||||||
end | ||||||
|
||||||
if status.success? | ||||||
http_status = err.match(/< HTTP\/1.1 (\d+)/)[1] || '0' # < HTTP/1.1 200 OK\r\n | ||||||
|
||||||
if http_status.strip[0].to_i > 2 | ||||||
error = (LogStash::Json.load(out)['error']) rescue nil | ||||||
if error | ||||||
if retrieve_err_payload | ||||||
return error | ||||||
else | ||||||
fail "#{cmd.inspect} received an error: #{http_status}\n\n#{error.inspect}" | ||||||
end | ||||||
else | ||||||
warn out | ||||||
fail "#{cmd.inspect} unexpected response: #{http_status}\n\n#{err}" | ||||||
end | ||||||
end | ||||||
|
||||||
LogStash::Json.load(out) | ||||||
else | ||||||
warn out | ||||||
fail "#{cmd.inspect} process failed: #{status}\n\n#{err}" | ||||||
end | ||||||
end | ||||||
|
||||||
describe "indexing with sprintf resolution", :integration => true do | ||||||
let(:message) { "Hello from #{__FILE__}" } | ||||||
let(:event) { LogStash::Event.new("message" => message, "type" => type) } | ||||||
let(:index) { 10.times.collect { rand(10).to_s }.join("") } | ||||||
let (:index) { "%{[index_name]}_dynamic" } | ||||||
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" } | ||||||
let(:event_count) { 1 + rand(2) } | ||||||
let(:config) { "not implemented" } | ||||||
let(:event_count) { 1 } | ||||||
let(:user) { "simpleuser" } | ||||||
let(:password) { "abc123" } | ||||||
let(:config) do | ||||||
{ | ||||||
"hosts" => [ get_host_port ], | ||||||
"user" => user, | ||||||
"password" => password, | ||||||
"index" => index | ||||||
} | ||||||
end | ||||||
let(:events) { event_count.times.map { event }.to_a } | ||||||
subject { LogStash::Outputs::ElasticSearch.new(config) } | ||||||
|
||||||
let(:es_url) { "http://#{get_host_port}" } | ||||||
let(:index_url) { "#{es_url}/#{index}" } | ||||||
|
||||||
|
@@ -63,33 +105,65 @@ | |||||
let(:es_admin) { 'admin' } # default user added in ES -> 8.x requires auth credentials for /_refresh etc | ||||||
let(:es_admin_pass) { 'elastic' } | ||||||
|
||||||
def curl_and_get_json_response(url, method: :get); require 'open3' | ||||||
cmd = "curl -s -v --show-error #{curl_opts} -X #{method.to_s.upcase} -k #{url}" | ||||||
begin | ||||||
out, err, status = Open3.capture3(cmd) | ||||||
rescue Errno::ENOENT | ||||||
fail "curl not available, make sure curl binary is installed and available on $PATH" | ||||||
end | ||||||
let(:initial_events) { [] } | ||||||
|
||||||
if status.success? | ||||||
http_status = err.match(/< HTTP\/1.1 (\d+)/)[1] || '0' # < HTTP/1.1 200 OK\r\n | ||||||
let(:do_register) { true } | ||||||
|
||||||
if http_status.strip[0].to_i > 2 | ||||||
error = (LogStash::Json.load(out)['error']) rescue nil | ||||||
if error | ||||||
fail "#{cmd.inspect} received an error: #{http_status}\n\n#{error.inspect}" | ||||||
else | ||||||
warn out | ||||||
fail "#{cmd.inspect} unexpected response: #{http_status}\n\n#{err}" | ||||||
end | ||||||
end | ||||||
before do | ||||||
subject.register if do_register | ||||||
subject.multi_receive(initial_events) if initial_events | ||||||
end | ||||||
|
||||||
LogStash::Json.load(out) | ||||||
else | ||||||
warn out | ||||||
fail "#{cmd.inspect} process failed: #{status}\n\n#{err}" | ||||||
after do | ||||||
subject.do_close | ||||||
end | ||||||
|
||||||
let(:event) { LogStash::Event.new("message" => message, "type" => type, "index_name" => "test") } | ||||||
|
||||||
it "should index successfully when field is resolved" do | ||||||
expected_index_name = "test_dynamic" | ||||||
subject.multi_receive(events) | ||||||
|
||||||
# curl_and_get_json_response "#{es_url}/_refresh", method: :post | ||||||
|
||||||
result = curl_and_get_json_response "#{es_url}/#{expected_index_name}" | ||||||
|
||||||
expect(result[expected_index_name]).not_to be(nil) | ||||||
end | ||||||
|
||||||
context "when dynamic field doesn't resolve the index_name" do | ||||||
let(:event) { LogStash::Event.new("message" => message, "type" => type) } | ||||||
let(:dlq_writer) { double('DLQ writer') } | ||||||
before { subject.instance_variable_set('@dlq_writer', dlq_writer) } | ||||||
|
||||||
it "should doesn't create an index name with unresolved placeholders" do | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
expect(dlq_writer).to receive(:write).once.with(event, /Could not resolve dynamic index/) | ||||||
subject.multi_receive(events) | ||||||
|
||||||
escaped_index_name = CGI.escape("%{[index_name]}_dynamic") | ||||||
result = curl_and_get_json_response "#{es_url}/#{escaped_index_name}", retrieve_err_payload: true | ||||||
expect(result["root_cause"].first()["type"]).to eq("index_not_found_exception") | ||||||
end | ||||||
end | ||||||
end | ||||||
|
||||||
describe "indexing" do | ||||||
let(:message) { "Hello from #{__FILE__}" } | ||||||
let(:event) { LogStash::Event.new("message" => message, "type" => type) } | ||||||
let(:index) { 10.times.collect { rand(10).to_s }.join("") } | ||||||
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" } | ||||||
let(:event_count) { 1 + rand(2) } | ||||||
let(:config) { "not implemented" } | ||||||
let(:events) { event_count.times.map { event }.to_a } | ||||||
subject { LogStash::Outputs::ElasticSearch.new(config) } | ||||||
|
||||||
let(:es_url) { "http://#{get_host_port}" } | ||||||
let(:index_url) { "#{es_url}/#{index}" } | ||||||
|
||||||
let(:curl_opts) { nil } | ||||||
|
||||||
let(:es_admin) { 'admin' } # default user added in ES -> 8.x requires auth credentials for /_refresh etc | ||||||
let(:es_admin_pass) { 'elastic' } | ||||||
|
||||||
let(:initial_events) { [] } | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the explanation, for me
dlq_on_unresolved_index_names
would be good for me. We interpolate the index name to resolve it. The end goal seems to me resolving the index name.