Skip to content

Commit f9e080b

Browse files
committed
Remove scan option
Use `{ "sort": [ "_doc" ] }` instead. Clean up the previous stuff. fixes logstash-plugins#50 Fixes logstash-plugins#51
1 parent 732cb26 commit f9e080b

File tree

4 files changed

+19
-87
lines changed

4 files changed

+19
-87
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
## 4.0.0
2+
- Remove `scan` from list of options as this is no longer allowed in
3+
Elasticsearch 5.0.
4+
- Change default query to sort by \_doc, as this replicates the `scan`
5+
behavior
6+
- Improve documentation to show sort by \_doc, and how to add it to custom
7+
queries.
8+
19
## 3.0.2
210
- Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99
311

lib/logstash/inputs/elasticsearch.rb

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# # Read all documents from Elasticsearch matching the given query
1313
# elasticsearch {
1414
# hosts => "localhost"
15-
# query => '{ "query": { "match": { "statuscode": 200 } } }'
15+
# query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
1616
# }
1717
# }
1818
#
@@ -23,7 +23,8 @@
2323
# "match": {
2424
# "statuscode": 200
2525
# }
26-
# }
26+
# },
27+
# "sort": [ "_doc" ]
2728
# }'
2829
#
2930
class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
@@ -40,20 +41,16 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
4041
config :index, :validate => :string, :default => "logstash-*"
4142

4243
# The query to be executed. Read the Elasticsearch query DSL documentation
43-
# for more info
44+
# for more info
4445
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
45-
config :query, :validate => :string, :default => '{"query": { "match_all": {} } }'
46-
47-
# Enable the Elasticsearch "scan" search type. This will disable
48-
# sorting but increase speed and performance.
49-
config :scan, :validate => :boolean, :default => true
46+
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'
5047

5148
# This allows you to set the maximum number of hits returned per scroll.
5249
config :size, :validate => :number, :default => 1000
5350

5451
# This parameter controls the keepalive time in seconds of the scrolling
5552
# request and initiates the scrolling process. The timeout applies per
56-
# round trip (i.e. between the previous scan scroll request, to the next).
53+
# round trip (i.e. between the previous scroll request, to the next).
5754
config :scroll, :validate => :string, :default => "1m"
5855

5956
# If set, include Elasticsearch document information such as index, type, and
@@ -117,8 +114,6 @@ def register
117114
:size => @size
118115
}
119116

120-
@options[:search_type] = 'scan' if @scan
121-
122117
transport_options = {}
123118

124119
if @user && @password
@@ -146,14 +141,8 @@ def run(output_queue)
146141
# get first wave of data
147142
r = @client.search(@options)
148143

149-
# since 'scan' doesn't return data on the search call, do an extra scroll
150-
if @scan
151-
r = process_next_scroll(output_queue, r['_scroll_id'])
152-
has_hits = r['has_hits']
153-
else # not a scan, process the response
154-
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
155-
has_hits = r['hits']['hits'].any?
156-
end
144+
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
145+
has_hits = r['hits']['hits'].any?
157146

158147
while has_hits && !stop?
159148
r = process_next_scroll(output_queue, r['_scroll_id'])

logstash-input-elasticsearch.gemspec

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-elasticsearch'
4-
s.version = '3.0.2'
4+
s.version = '4.0.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Read from an Elasticsearch cluster, based on search query results"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -28,4 +28,3 @@ Gem::Specification.new do |s|
2828

2929
s.add_development_dependency 'logstash-devutils'
3030
end
31-

spec/inputs/elasticsearch_spec.rb

Lines changed: 2 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
input {
2929
elasticsearch {
3030
hosts => ["localhost"]
31-
scan => false
3231
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
3332
}
3433
}
@@ -74,63 +73,6 @@
7473
insist { event.get("message") } == [ "ohayo" ]
7574
end
7675

77-
it "should retrieve json event from elasticseach with scan" do
78-
config = %q[
79-
input {
80-
elasticsearch {
81-
hosts => ["localhost"]
82-
scan => true
83-
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
84-
}
85-
}
86-
]
87-
88-
scan_response = {
89-
"_scroll_id" => "DcrY3G1xff6SB",
90-
}
91-
92-
scroll_responses = [
93-
{
94-
"_scroll_id" => "cXVlcnlUaGVuRmV0Y2g",
95-
"took" => 27,
96-
"timed_out" => false,
97-
"_shards" => {
98-
"total" => 169,
99-
"successful" => 169,
100-
"failed" => 0
101-
},
102-
"hits" => {
103-
"total" => 1,
104-
"max_score" => 1.0,
105-
"hits" => [ {
106-
"_index" => "logstash-2014.10.12",
107-
"_type" => "logs",
108-
"_id" => "C5b2xLQwTZa76jBmHIbwHQ",
109-
"_score" => 1.0,
110-
"_source" => { "message" => ["ohayo"] }
111-
} ]
112-
}
113-
},
114-
{
115-
"_scroll_id" => "r453Wc1jh0caLJhSDg",
116-
"hits" => { "hits" => [] }
117-
}
118-
]
119-
120-
client = Elasticsearch::Client.new
121-
expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
122-
expect(client).to receive(:search).with(any_args).and_return(scan_response)
123-
expect(client).to receive(:scroll).with({ :body => "DcrY3G1xff6SB", :scroll => "1m" }).and_return(scroll_responses.first)
124-
expect(client).to receive(:scroll).with({ :body=> "cXVlcnlUaGVuRmV0Y2g", :scroll => "1m" }).and_return(scroll_responses.last)
125-
126-
event = input(config) do |pipeline, queue|
127-
queue.pop
128-
end
129-
130-
insist { event }.is_a?(LogStash::Event)
131-
insist { event.get("message") } == [ "ohayo" ]
132-
end
133-
13476
context "with Elasticsearch document information" do
13577
let!(:response) do
13678
{
@@ -150,7 +92,7 @@
15092
"_type" => "logs",
15193
"_id" => "C5b2xLQwTZa76jBmHIbwHQ",
15294
"_score" => 1.0,
153-
"_source" => {
95+
"_source" => {
15496
"message" => ["ohayo"],
15597
"metadata_with_hash" => { "awesome" => "logstash" },
15698
"metadata_with_string" => "a string"
@@ -181,7 +123,6 @@
181123
input {
182124
elasticsearch {
183125
hosts => ["localhost"]
184-
scan => false
185126
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
186127
docinfo => true
187128
}
@@ -196,7 +137,6 @@
196137
input {
197138
elasticsearch {
198139
hosts => ["localhost"]
199-
scan => false
200140
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
201141
docinfo => true
202142
docinfo_target => '#{metadata_field}'
@@ -213,15 +153,14 @@
213153
expect(event.get("[#{metadata_field}][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
214154
expect(event.get("[#{metadata_field}][awesome]")).to eq("logstash")
215155
end
216-
156+
217157
it 'thows an exception if the `docinfo_target` exist but is not of type hash' do
218158
metadata_field = 'metadata_with_string'
219159

220160
config_metadata_with_string = %Q[
221161
input {
222162
elasticsearch {
223163
hosts => ["localhost"]
224-
scan => false
225164
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
226165
docinfo => true
227166
docinfo_target => '#{metadata_field}'
@@ -253,7 +192,6 @@
253192
input {
254193
elasticsearch {
255194
hosts => ["localhost"]
256-
scan => false
257195
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
258196
docinfo => true
259197
docinfo_target => 'meta'
@@ -275,7 +213,6 @@
275213
input {
276214
elasticsearch {
277215
hosts => ["localhost"]
278-
scan => false
279216
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
280217
docinfo => true
281218
docinfo_fields => #{fields}
@@ -299,7 +236,6 @@
299236
input {
300237
elasticsearch {
301238
hosts => ["localhost"]
302-
scan => false
303239
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
304240
}
305241
}

0 commit comments

Comments
 (0)