Skip to content

add create/create_unless_exists actions support to node client (from pickypg) #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
#
# - index: indexes a document (an event from Logstash).
# - delete: deletes a document by id
# - create: indexes a document, fails if a document by that id already exists in the index.
# following action is not supported by HTTP protocol
# - create_unless_exists: creates a document, fails if no id is provided
#
# For more details on actions, check out the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation]
config :action, :validate => :string, :default => "index"
Expand Down
17 changes: 16 additions & 1 deletion lib/logstash/outputs/elasticsearch/protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,23 @@ def build_request(action, args, source)
when "delete"
request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index])
request.id(args[:_id])
when "create"
request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
request.id(args[:_id]) if args[:_id]
request.source(source)
request.opType("create")
when "create_unless_exists"
unless args[:_id].nil?
request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
request.id(args[:_id])
request.source(source)
request.opType("create")
else
raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.")
end
else
raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.")
#when "update"
#when "create"
end # case action

request.type(args[:_type]) if args[:_type]
Expand Down
99 changes: 97 additions & 2 deletions spec/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,103 @@
end
end

describe "node client create actions", :elasticsearch => true do
require "logstash/outputs/elasticsearch"
require "elasticsearch"
let(:es) { Elasticsearch::Client.new }

def get_es_output(action, id = nil)
settings = {
"manage_template" => true,
"index" => "logstash-create",
"template_overwrite" => true,
"protocol" => "node",
"host" => "localhost",
"action" => action
}
settings['document_id'] = id unless id.nil?
LogStash::Outputs::ElasticSearch.new(settings)
end

before :each do
# Delete all templates first.
# Clean ES of data before we start.
es.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
es.indices.delete(:index => "*") rescue nil
end

context "when action => create" do
it "should create new documents with or without id" do
subject = get_es_output("create", "id123")
subject.register
subject.receive(LogStash::Event.new("message" => "sample message here"))
subject.buffer_flush(:final => true)
es.indices.refresh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do a get here immediately to assert the existence of the doc (without a refresh)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed, otherwise does not get indexed in time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aye, for search we need a refresh..get is real time :) anyhow, can leave this as is

# Wait or fail until everything's indexed.
Stud::try(3.times) do
r = es.search
insist { r["hits"]["total"] } == 1
end
end

it "should creat new documents without id" do
subject = get_es_output("create")
subject.register
subject.receive(LogStash::Event.new("message" => "sample message here"))
subject.buffer_flush(:final => true)
es.indices.refresh
# Wait or fail until everything's indexed.
Stud::try(3.times) do
r = es.search
insist { r["hits"]["total"] } == 1
end
end
end

context "when action => create_unless_exists" do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test where a) we populate ES with a doc with the same ID and then b) try to create_unless_exists? that should fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

the second record returns a 400 error code from ES because ID exists already.

Given how pipeline (Stud::Buffer/submit) abstracts away the failure via catching the exception and logging, it is difficult to expose this in tests. We can refactor this later on. Until then, the test checks that only one message makes it through. The other will be dropped (error 400 is not a retryable 502/429)

it "should create new documents when specific id is specified" do
subject = get_es_output("create_unless_exists", "id123")
subject.register
subject.receive(LogStash::Event.new("message" => "sample message here"))
subject.buffer_flush(:final => true)
es.indices.refresh
# Wait or fail until everything's indexed.
Stud::try(3.times) do
r = es.search
insist { r["hits"]["total"] } == 1
end
end

it "should fail to create a document when no id is specified" do
subject = get_es_output("create_unless_exists")
subject.register
subject.receive(LogStash::Event.new("message" => "sample message here"))
subject.buffer_flush(:final => true)
es.indices.refresh
# Wait or fail until everything's indexed.
Stud::try(3.times) do
r = es.search
insist { r["hits"]["total"] } == 0
end
end

it "should unsuccesfully submit two records with the same document id" do
subject = get_es_output("create_unless_exists", "id123")
subject.register
subject.receive(LogStash::Event.new("message" => "sample message here"))
subject.receive(LogStash::Event.new("message" => "sample message here")) # 400 status failure (same id)
subject.buffer_flush(:final => true)
es.indices.refresh
# Wait or fail until everything's indexed.
Stud::try(3.times) do
r = es.search
insist { r["hits"]["total"] } == 1
end
end
end
end

describe "testing index_type", :elasticsearch => true do
describe "no type value" do
# Generate a random index name
Expand Down Expand Up @@ -381,8 +478,6 @@ def settings_with_index(index)
end

describe "failures in bulk class expected behavior", :elasticsearch => true do


let(:template) { '{"template" : "not important, will be updated by :index"}' }
let(:event1) { LogStash::Event.new("somevalue" => 100, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) }
let(:action1) { ["index", {:_id=>nil, :_index=>"logstash-2014.11.17", :_type=>"logs"}, event1] }
Expand Down