diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 2fe2a2525d..f25d8a21fc 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -50,30 +50,30 @@ def slice(index, num) raise NotImplementedError, "DO NOT USE THIS CLASS directly." end - def each(&block) + def each(unapcker: nil, &block) raise NotImplementedError, "DO NOT USE THIS CLASS directly." end - def to_msgpack_stream(time_int: false) - return to_msgpack_stream_forced_integer if time_int - out = msgpack_packer + def to_msgpack_stream(time_int: false, packer: nil) + return to_msgpack_stream_forced_integer(packer: packer) if time_int + out = packer || msgpack_packer each {|time,record| out.write([time,record]) } - out.to_s + out.full_pack end - def to_compressed_msgpack_stream(time_int: false) - packed = to_msgpack_stream(time_int: time_int) + def to_compressed_msgpack_stream(time_int: false, packer: nil) + packed = to_msgpack_stream(time_int: time_int, packer: packer) compress(packed) end - def to_msgpack_stream_forced_integer - out = msgpack_packer + def to_msgpack_stream_forced_integer(packer: nil) + out = packer || msgpack_packer each {|time,record| out.write([time.to_i,record]) } - out.to_s + out.full_pack end end @@ -107,7 +107,7 @@ def slice(index, num) end end - def each(&block) + def each(unpacker: nil, &block) block.call(@time, @record) nil end @@ -143,7 +143,7 @@ def slice(index, num) ArrayEventStream.new(@entries.slice(index, num)) end - def each(&block) + def each(unpacker: nil, &block) @entries.each(&block) nil end @@ -190,7 +190,7 @@ def slice(index, num) MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num)) end - def each(&block) + def each(unpacker: nil, &block) time_array = @time_array record_array = @record_array for i in 0..time_array.length-1 @@ -234,11 +234,11 @@ def repeatable? true end - def ensure_unpacked! + def ensure_unpacked!(unpacker: nil) return if @unpacked_times && @unpacked_records @unpacked_times = [] @unpacked_records = [] - msgpack_unpacker.feed_each(@data) do |time, record| + (unpacker || msgpack_unpacker).feed_each(@data) do |time, record| @unpacked_times << time @unpacked_records << record end @@ -254,7 +254,7 @@ def slice(index, num) MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num)) end - def each(&block) + def each(unpacker: nil, &block) if @unpacked_times @unpacked_times.each_with_index do |time, i| block.call(time, @unpacked_records[i]) @@ -262,7 +262,7 @@ def each(&block) else @unpacked_times = [] @unpacked_records = [] - msgpack_unpacker.feed_each(@data) do |time, record| + (unpacker || msgpack_unpacker).feed_each(@data) do |time, record| @unpacked_times << time @unpacked_records << record block.call(time, record) @@ -272,7 +272,7 @@ def each(&block) nil end - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) # time_int is always ignored because @data is always packed binary in this class @data end @@ -290,17 +290,17 @@ def empty? super end - def ensure_unpacked! + def ensure_unpacked!(unpacker: nil) ensure_decompressed! super end - def each(&block) + def each(unpacker: nil, &block) ensure_decompressed! super end - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) ensure_decompressed! super end @@ -322,15 +322,15 @@ module ChunkMessagePackEventStreamer include MessagePackFactory::Mixin # chunk.extend(ChunkEventStreamer) # => chunk.each{|time, record| ... } - def each(&block) + def each(unpacker: nil, &block) open do |io| - msgpack_unpacker(io).each(&block) + (unpacker || msgpack_unpacker(io)).each(&block) end nil end alias :msgpack_each :each - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) # time_int is always ignored because data is already packed and written in chunk read end diff --git a/lib/fluent/event_router.rb b/lib/fluent/event_router.rb index 23db13a66c..365fba4b94 100644 --- a/lib/fluent/event_router.rb +++ b/lib/fluent/event_router.rb @@ -17,6 +17,7 @@ require 'fluent/match' require 'fluent/event' require 'fluent/filter' +require 'fluent/msgpack_factory' module Fluent # @@ -182,7 +183,7 @@ def filter_stream(tag, es) def optimized_filter_stream(tag, es) new_es = MultiEventStream.new - es.each do |time, record| + es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| filtered_record = record filtered_time = time diff --git a/lib/fluent/msgpack_factory.rb b/lib/fluent/msgpack_factory.rb index 54d0b9033e..0b34a59c04 100644 --- a/lib/fluent/msgpack_factory.rb +++ b/lib/fluent/msgpack_factory.rb @@ -58,5 +58,13 @@ def self.init factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime) @@engine_factory = factory end + + def self.thread_local_msgpack_packer + Thread.current[:local_msgpack_packer] ||= MessagePackFactory.engine_factory.packer + end + + def self.thread_local_msgpack_unpacker + Thread.current[:local_msgpack_unpacker] ||= MessagePackFactory.engine_factory.unpacker + end end end diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 81245ad8d5..934ffd3c3f 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -254,7 +254,7 @@ def write_metadata(update: true) c: @created_at, m: (update ? Fluent::Clock.real_now : @modified_at), }) - bin = msgpack_packer.pack(data).to_s + bin = Fluent::MessagePackFactory.thread_local_msgpack_packer.pack(data).full_pack size = [bin.bytesize].pack('N') @meta.seek(0, IO::SEEK_SET) @meta.write(BUFFER_HEADER + size + bin) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 16f07ac8bb..5208f445aa 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -18,6 +18,7 @@ require 'fluent/plugin/base' require 'fluent/plugin/buffer' require 'fluent/plugin_helper/record_accessor' +require 'fluent/msgpack_factory' require 'fluent/log' require 'fluent/plugin_id' require 'fluent/plugin_helper' @@ -912,10 +913,10 @@ def write_guard(&block) end end - FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream } - FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream } - FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true) } - FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true) } + FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } def generate_format_proc if @buffer && @buffer.compress == :gzip @@ -937,7 +938,7 @@ def generate_format_proc def handle_stream_with_custom_format(tag, es, enqueue: false) meta_and_data = {} records = 0 - es.each do |time, record| + es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= [] res = format(tag, time, record) @@ -957,7 +958,7 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) format_proc = generate_format_proc meta_and_data = {} records = 0 - es.each do |time, record| + es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= MultiEventStream.new meta_and_data[meta].add(time, record) @@ -977,7 +978,7 @@ def handle_stream_simple(tag, es, enqueue: false) if @custom_format records = 0 data = [] - es.each do |time, record| + es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| res = format(tag, time, record) if res data << res