Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions lib/fluent/plugin/out_redis_counter.rb
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,17 @@ def write(chunk)
MessagePack::Unpacker.new(io).each { |message|
(tag, time, record) = message
@patterns.select { |pattern|
pattern.is_match?(record)
}.each{ |pattern|
table[pattern.get_count_key(time, record)] += pattern.get_count_value(record)
if pattern.is_match?(record)
key = pattern.get_count_key(time, record)
if pattern.list_value_format
list = table[key]
list = table[key] = [] unless list.is_a?(Array)
list << pattern.get_list_value(time, record)
else
table[key] += pattern.get_count_value(record)
end
break if pattern.last
end
}
}
rescue EOFError
Expand All @@ -68,7 +76,13 @@ def write(chunk)
}.each_slice(@max_pipelining) { |items|
@redis.pipelined do
items.each do |key, value|
@redis.incrby(key, value)
if value.is_a?(Array)
@redis.lpush(key, value)
elsif value.is_a?(Float)
@redis.incrbyfloat(key, value)
else
@redis.incrby(key, value)
end
end
end
}
Expand All @@ -93,7 +107,7 @@ def key(record)
end

class Pattern
attr_reader :matches, :count_value, :count_value_key
attr_reader :matches, :count_value, :count_value_key, :last, :required_keys, :list_value_format

def initialize(conf_element)
if !conf_element.has_key?('count_key') && !conf_element.has_key?('count_key_format')
Expand Down Expand Up @@ -137,6 +151,24 @@ def initialize(conf_element)
name = key['match_'.size .. key.size]
@matches[name] = Regexp.new(value)
}

@last = conf_element.has_key?('last') ? conf_element['last'] == 'true' : false
@required_keys = []
if conf_element.has_key?('required_keys')
@required_keys = conf_element['required_keys'].split(',').map(&:strip).uniq
end
@list_value_format = nil
if conf_element.has_key?('list_value_format')
if conf_element.has_key?('localtime') && conf_element.has_key?('utc')
raise RedisCounterException, 'both "localtime" and "utc" are specified.'
end
is_localtime = true
if conf_element.has_key?('utc')
is_localtime = false
end
@list_value_format = [conf_element['list_value_format'], is_localtime]
@list_value_formatter = RecordValueFormatter.new(@list_value_format[0])
end
end

def is_match?(record)
Expand All @@ -145,6 +177,9 @@ def is_match?(record)
return false
end
}
@required_keys.each { |key|
return false unless record.has_key?(key)
}
return true
end

Expand All @@ -161,13 +196,23 @@ def get_count_key(time, record)
def get_count_value(record)
if @count_value_key
ret = record[@count_value_key] || 0
return ret.kind_of?(Integer) ? ret : 0
return (ret.kind_of?(Integer) || ret.kind_of?(Float)) ? ret : 0
else
if @count_value
return @count_value
end
end
end

def get_list_value(time, record)
if @list_value_format
list_value = @list_value_formatter.key(record)
formatter = TimeFormatter.new(list_value, @list_value_format[1])
formatter.format(time)
else
''
end
end
end
end
end
143 changes: 143 additions & 0 deletions test/plugin/out_redis_counter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,147 @@ def test_write_with_count_value_key
assert_equal '123', driver.instance.redis.get("item_sum_count:200"), "it should be ignore when count_value_key is not number"
end

def test_write_without_last
conf = %[
db_number 1
<pattern>
match_status ^2[0-9]{2}$
match_url ^https
count_key a
</pattern>
<pattern>
count_key b
count_value 2
</pattern>
]

driver = create_driver conf
driver.emit({"status" => "200", "url" => "https://foo.com"})
driver.run
assert_equal '1', driver.instance.redis.get("a")
assert_equal '2', driver.instance.redis.get("b")

driver = create_driver conf
driver.emit({"status" => "404", "url" => "https://foo.com/404"})
driver.run
assert_equal '1', driver.instance.redis.get("a")
assert_equal '4', driver.instance.redis.get("b")
end

def test_write_with_last
conf = %[
db_number 1
<pattern>
match_status ^2[0-9]{2}$
match_url ^https
count_key a
last true
</pattern>
<pattern>
count_key b
count_value 2
</pattern>
]

driver = create_driver conf
driver.emit({"status" => "200", "url" => "https://foo.com"})
driver.run
assert_equal '1', driver.instance.redis.get("a")
assert_nil driver.instance.redis.get("b")

driver = create_driver conf
driver.emit({"status" => "404", "url" => "https://foo.com/404"})
driver.run
assert_equal '1', driver.instance.redis.get("a")
assert_equal '2', driver.instance.redis.get("b")
end

def test_write_with_required_keys
conf = %[
db_number 1
<pattern>
required_keys x
match_status ^2[0-9]{2}$
match_url ^https
count_key a
</pattern>
<pattern>
required_keys x,y
match_status ^2[0-9]{2}$
match_url ^https
count_key b
</pattern>
]

driver = create_driver conf
driver.emit({"status" => "200", "url" => "https://foo.com", "x" => "foo"})
driver.run
assert_equal '1', driver.instance.redis.get("a")
assert_nil driver.instance.redis.get("b")

driver = create_driver conf
driver.emit({"status" => "200", "url" => "https://foo.com", "y" => "bar"})
driver.run
assert_equal '1', driver.instance.redis.get("a")
assert_nil driver.instance.redis.get("b")

driver = create_driver conf
driver.emit({"status" => "200", "url" => "https://foo.com", "x" => "foo", "y" => "bar"})
driver.run
assert_equal '2', driver.instance.redis.get("a")
assert_equal '1', driver.instance.redis.get("b")
end

def test_write_with_float_value
conf = %[
db_number 1
<pattern>
match_status ^2[0-9]{2}$
match_url ^https
count_key a
count_value_key x
</pattern>
]

driver = create_driver conf
driver.emit({"status" => "200", "url" => "https://foo.com", "x" => 5.0})
driver.run
assert_equal '5', driver.instance.redis.get("a")

driver = create_driver conf
driver.emit({"status" => "200", "url" => "https://foo.com", "x" => 6.78})
driver.run
assert_equal '11.78', driver.instance.redis.get("a")
end

def test_configure_list_value_format
driver = create_driver %[
<pattern>
count_key a
list_value_format %_{prefix}-foo-%Y%m%d%H%M%S-%_{type}-%_{customer_id}
utc
</pattern>
]
utc_time = Time.parse('2012-06-21 12:12:45 +0900').to_i
record = {'prefix' => 'pre', 'type' => 'bar', 'customer_id' => 321}
assert_equal 'pre-foo-20120621031245-bar-321', driver.instance.patterns[0].get_list_value(utc_time, record)
end

def test_write_with_list_value_format
conf = %[
<pattern>
count_key a
list_value_format %_{prefix}-foo-%Y%m%d%H%M%S-%_{type}-%_{customer_id}
utc
</pattern>
]

driver = create_driver conf
driver.emit({'prefix' => 'pre1', 'type' => 'bar', 'customer_id' => 321}, Time.gm(2013, 11, 3, 12, 34, 56))
driver.emit({'prefix' => 'pre2', 'type' => 'foo', 'customer_id' => 654}, Time.gm(2013, 11, 3, 19, 20, 21))
driver.run
assert_equal 'pre2-foo-20131103192021-foo-654', driver.instance.redis.lpop('a')
assert_equal 'pre1-foo-20131103123456-bar-321', driver.instance.redis.lpop('a')
assert_nil driver.instance.redis.lpop('a')
end
end