Skip to content

Fix #239 - interpolated action "update" don't get params "upsert" and… #385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 40 additions & 46 deletions lib/logstash/outputs/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ module Common
DOC_CONFLICT_CODE = 409

# When you use external versioning, you are communicating that you want
# to ignore conflicts. More obviously, since an external version is a
Copy link
Contributor Author

@dchauviere dchauviere Feb 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guyboertje my editor seems to cleanup unneeded space, if its a problem for merging (or just reading changes), I can restore them

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to worry in this case.

# to ignore conflicts. More obviously, since an external version is a
# constant part of the incoming document, we should not retry, as retrying
# will never succeed.
# will never succeed.
VERSION_TYPES_PERMITTING_CONFLICT = ["external", "external_gt", "external_gte"]

def register
Expand All @@ -36,8 +36,45 @@ def multi_receive(events)

# Convert the event into a 3-tuple of action, params, and event
def event_action_tuple(event)
params = event_action_params(event)

action = event.sprintf(@action)

params = {
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => event.sprintf(@index),
:_type => get_event_type(event),
:_routing => @routing ? event.sprintf(@routing) : nil
}

if @pipeline
params[:pipeline] = event.sprintf(@pipeline)
end

if @parent
if @join_field
join_value = event.get(@join_field)
parent_value = event.sprintf(@parent)
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
params[:_routing] = event.sprintf(@parent)
else
params[:parent] = event.sprintf(@parent)
end
end

if action == 'update'
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_script] = event.sprintf(@script) if @script != ""
params[:_retry_on_conflict] = @retry_on_conflict
end

if @version
params[:version] = event.sprintf(@version)
end

if @version_type
params[:version_type] = event.sprintf(@version_type)
end

[action, params, event]
end

Expand Down Expand Up @@ -160,49 +197,6 @@ def submit(actions)
actions_to_retry
end

# get the action parameters for the given event
def event_action_params(event)
type = get_event_type(event)

params = {
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => event.sprintf(@index),
:_type => type,
:_routing => @routing ? event.sprintf(@routing) : nil
}

if @pipeline
params[:pipeline] = event.sprintf(@pipeline)
end

if @parent
if @join_field
join_value = event.get(@join_field)
parent_value = event.sprintf(@parent)
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
params[:_routing] = event.sprintf(@parent)
else
params[:parent] = event.sprintf(@parent)
end
end

if @action == 'update'
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_script] = event.sprintf(@script) if @script != ""
params[:_retry_on_conflict] = @retry_on_conflict
end

if @version
params[:version] = event.sprintf(@version)
end

if @version_type
params[:version_type] = event.sprintf(@version_type)
end

params
end

# Determine the correct value for the 'type' field for the given event
DEFAULT_EVENT_TYPE="doc".freeze
def get_event_type(event)
Expand Down
56 changes: 38 additions & 18 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

let(:manticore_urls) { subject.client.pool.urls }
let(:manticore_url) { manticore_urls.first }

describe "getting a document type" do
it "should default to 'doc'" do
expect(subject.send(:get_event_type, LogStash::Event.new)).to eql("doc")
Expand Down Expand Up @@ -88,33 +88,33 @@
end
end
end

describe "with auth" do
let(:user) { "myuser" }
let(:password) { ::LogStash::Util::Password.new("mypassword") }

shared_examples "an authenticated config" do
it "should set the URL auth correctly" do
expect(manticore_url.user).to eq user
end
end

context "as part of a URL" do
let(:options) {
super.merge("hosts" => ["http://#{user}:#{password.value}@localhost:9200"])
}

include_examples("an authenticated config")
end

context "as a hash option" do
let(:options) {
super.merge!(
"user" => user,
"password" => password
)
}

include_examples("an authenticated config")
end
end
Expand Down Expand Up @@ -218,7 +218,7 @@

context "429 errors" do
let(:event) { ::LogStash::Event.new("foo" => "bar") }
let(:error) do
let(:error) do
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
429, double("url").as_null_object, double("request body"), double("response body")
)
Expand Down Expand Up @@ -252,7 +252,7 @@
end
end
end

context "with timeout set" do
let(:listener) { Flores::Random.tcp_listener }
let(:port) { listener[2] }
Expand Down Expand Up @@ -290,6 +290,16 @@
end
end

context "with a sprintf action equals to update" do
let(:options) { {"action" => "%{myactionfield}", "upsert" => '{"message": "some text"}' } }

let(:event) { LogStash::Event.new("myactionfield" => "update", "message" => "blah") }

it "should obtain specific action's params from event_action_tuple" do
expect(subject.event_action_tuple(event)[1]).to include(:_upsert)
end
end

context "with an invalid action" do
let(:options) { {"action" => "SOME Garbaaage"} }
let(:do_register) { false } # this is what we want to test, so we disable the before(:each) call
Expand All @@ -301,8 +311,8 @@
end

describe "SSL end to end" do
let(:manticore_double) do
double("manticoreX#{self.inspect}")
let(:manticore_double) do
double("manticoreX#{self.inspect}")
end

before(:each) do
Expand All @@ -311,18 +321,18 @@
allow(manticore_double).to receive(:head).with(any_args).and_return(response_double)
allow(manticore_double).to receive(:get).with(any_args).and_return(response_double)
allow(manticore_double).to receive(:close)

allow(::Manticore::Client).to receive(:new).and_return(manticore_double)
subject.register
end

shared_examples("an encrypted client connection") do
it "should enable SSL in manticore" do
expect(subject.client.pool.urls.map(&:scheme).uniq).to eql(['https'])
end
end


context "With the 'ssl' option" do
let(:options) { {"ssl" => true}}

Expand All @@ -337,24 +347,34 @@

describe "retry_on_conflict" do
let(:num_retries) { 123 }
let(:event) { LogStash::Event.new("message" => "blah") }
let(:event) { LogStash::Event.new("myactionfield" => "update", "message" => "blah") }
let(:options) { { 'retry_on_conflict' => num_retries } }

context "with a regular index" do
let(:options) { super.merge("action" => "index") }

it "should interpolate the requested action value when creating an event_action_tuple" do
it "should not set the retry_on_conflict parameter when creating an event_action_tuple" do
action, params, event_data = subject.event_action_tuple(event)
expect(params).not_to include({:_retry_on_conflict => num_retries})
end
end

context "using a plain update" do
let(:options) { super.merge("action" => "update", "retry_on_conflict" => num_retries, "document_id" => 1) }
let(:options) { super.merge("action" => "update", "retry_on_conflict" => num_retries, "document_id" => 1) }

it "should interpolate the requested action value when creating an event_action_tuple" do
it "should set the retry_on_conflict parameter when creating an event_action_tuple" do
action, params, event_data = subject.event_action_tuple(event)
expect(params).to include({:_retry_on_conflict => num_retries})
end
end

context "with a sprintf action that resolves to update" do
let(:options) { super.merge("action" => "%{myactionfield}", "retry_on_conflict" => num_retries, "document_id" => 1) }

it "should set the retry_on_conflict parameter when creating an event_action_tuple" do
action, params, event_data = subject.event_action_tuple(event)
expect(params).to include({:_retry_on_conflict => num_retries})
expect(action).to eq("update")
end
end
end
Expand Down