Skip to content

Commit

Permalink
Merge pull request #2559 from ganmacs/cache-msgpack
Browse files Browse the repository at this point in the history
Cache msgpack packer/unpacker
  • Loading branch information
repeatedly authored Aug 16, 2019
2 parents 975ca8b + b8021e9 commit c6c6c03
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 33 deletions.
48 changes: 24 additions & 24 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,30 +50,30 @@ def slice(index, num)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def each(&block)
def each(unapcker: nil, &block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def to_msgpack_stream(time_int: false)
return to_msgpack_stream_forced_integer if time_int
out = msgpack_packer
def to_msgpack_stream(time_int: false, packer: nil)
return to_msgpack_stream_forced_integer(packer: packer) if time_int
out = packer || msgpack_packer
each {|time,record|
out.write([time,record])
}
out.to_s
out.full_pack
end

def to_compressed_msgpack_stream(time_int: false)
packed = to_msgpack_stream(time_int: time_int)
def to_compressed_msgpack_stream(time_int: false, packer: nil)
packed = to_msgpack_stream(time_int: time_int, packer: packer)
compress(packed)
end

def to_msgpack_stream_forced_integer
out = msgpack_packer
def to_msgpack_stream_forced_integer(packer: nil)
out = packer || msgpack_packer
each {|time,record|
out.write([time.to_i,record])
}
out.to_s
out.full_pack
end
end

Expand Down Expand Up @@ -107,7 +107,7 @@ def slice(index, num)
end
end

def each(&block)
def each(unpacker: nil, &block)
block.call(@time, @record)
nil
end
Expand Down Expand Up @@ -143,7 +143,7 @@ def slice(index, num)
ArrayEventStream.new(@entries.slice(index, num))
end

def each(&block)
def each(unpacker: nil, &block)
@entries.each(&block)
nil
end
Expand Down Expand Up @@ -190,7 +190,7 @@ def slice(index, num)
MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num))
end

def each(&block)
def each(unpacker: nil, &block)
time_array = @time_array
record_array = @record_array
for i in 0..time_array.length-1
Expand Down Expand Up @@ -234,11 +234,11 @@ def repeatable?
true
end

def ensure_unpacked!
def ensure_unpacked!(unpacker: nil)
return if @unpacked_times && @unpacked_records
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
(unpacker || msgpack_unpacker).feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
end
Expand All @@ -254,15 +254,15 @@ def slice(index, num)
MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end

def each(&block)
def each(unpacker: nil, &block)
if @unpacked_times
@unpacked_times.each_with_index do |time, i|
block.call(time, @unpacked_records[i])
end
else
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
(unpacker || msgpack_unpacker).feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
block.call(time, record)
Expand All @@ -272,7 +272,7 @@ def each(&block)
nil
end

def to_msgpack_stream(time_int: false)
def to_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because @data is always packed binary in this class
@data
end
Expand All @@ -290,17 +290,17 @@ def empty?
super
end

def ensure_unpacked!
def ensure_unpacked!(unpacker: nil)
ensure_decompressed!
super
end

def each(&block)
def each(unpacker: nil, &block)
ensure_decompressed!
super
end

def to_msgpack_stream(time_int: false)
def to_msgpack_stream(time_int: false, packer: nil)
ensure_decompressed!
super
end
Expand All @@ -322,15 +322,15 @@ module ChunkMessagePackEventStreamer
include MessagePackFactory::Mixin
# chunk.extend(ChunkEventStreamer)
# => chunk.each{|time, record| ... }
def each(&block)
def each(unpacker: nil, &block)
open do |io|
msgpack_unpacker(io).each(&block)
(unpacker || msgpack_unpacker(io)).each(&block)
end
nil
end
alias :msgpack_each :each

def to_msgpack_stream(time_int: false)
def to_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because data is already packed and written in chunk
read
end
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/event_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require 'fluent/match'
require 'fluent/event'
require 'fluent/filter'
require 'fluent/msgpack_factory'

module Fluent
#
Expand Down Expand Up @@ -182,7 +183,7 @@ def filter_stream(tag, es)

def optimized_filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
filtered_record = record
filtered_time = time

Expand Down
8 changes: 8 additions & 0 deletions lib/fluent/msgpack_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,13 @@ def self.init
factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
@@engine_factory = factory
end

def self.thread_local_msgpack_packer
Thread.current[:local_msgpack_packer] ||= MessagePackFactory.engine_factory.packer
end

def self.thread_local_msgpack_unpacker
Thread.current[:local_msgpack_unpacker] ||= MessagePackFactory.engine_factory.unpacker
end
end
end
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def write_metadata(update: true)
c: @created_at,
m: (update ? Fluent::Clock.real_now : @modified_at),
})
bin = msgpack_packer.pack(data).to_s
bin = Fluent::MessagePackFactory.thread_local_msgpack_packer.pack(data).full_pack
size = [bin.bytesize].pack('N')
@meta.seek(0, IO::SEEK_SET)
@meta.write(BUFFER_HEADER + size + bin)
Expand Down
15 changes: 8 additions & 7 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require 'fluent/plugin/base'
require 'fluent/plugin/buffer'
require 'fluent/plugin_helper/record_accessor'
require 'fluent/msgpack_factory'
require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'
Expand Down Expand Up @@ -912,10 +913,10 @@ def write_guard(&block)
end
end

FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream }
FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream }
FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true) }
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true) }
FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }

def generate_format_proc
if @buffer && @buffer.compress == :gzip
Expand All @@ -937,7 +938,7 @@ def generate_format_proc
def handle_stream_with_custom_format(tag, es, enqueue: false)
meta_and_data = {}
records = 0
es.each do |time, record|
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= []
res = format(tag, time, record)
Expand All @@ -957,7 +958,7 @@ def handle_stream_with_standard_format(tag, es, enqueue: false)
format_proc = generate_format_proc
meta_and_data = {}
records = 0
es.each do |time, record|
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= MultiEventStream.new
meta_and_data[meta].add(time, record)
Expand All @@ -977,7 +978,7 @@ def handle_stream_simple(tag, es, enqueue: false)
if @custom_format
records = 0
data = []
es.each do |time, record|
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
res = format(tag, time, record)
if res
data << res
Expand Down

0 comments on commit c6c6c03

Please sign in to comment.