Skip to content

Commit

Permalink
Merge pull request #4178 from daipom/make-sure-to-reset-local-unpacker
Browse files Browse the repository at this point in the history
MessagePackFactory: Make sure to reset local unpacker to prevent received broken data from affecting other receiving data
  • Loading branch information
ashie authored May 16, 2023
2 parents 0a6d706 + 3c4fbf0 commit dc7e4b2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
7 changes: 6 additions & 1 deletion lib/fluent/msgpack_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ def self.thread_local_msgpack_packer
end

def self.thread_local_msgpack_unpacker
Thread.current[:local_msgpack_unpacker] ||= MessagePackFactory.engine_factory.unpacker
unpacker = Thread.current[:local_msgpack_unpacker]
if unpacker.nil?
return Thread.current[:local_msgpack_unpacker] = MessagePackFactory.engine_factory.unpacker
end
unpacker.reset
unpacker
end
end
end
32 changes: 32 additions & 0 deletions test/test_msgpack_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,36 @@ class MessagePackFactoryTest < Test::Unit::TestCase
assert mp.msgpack_factory
assert mp.msgpack_factory
end

sub_test_case 'thread_local_msgpack_packer' do
test 'packer is cached' do
packer1 = Fluent::MessagePackFactory.thread_local_msgpack_packer
packer2 = Fluent::MessagePackFactory.thread_local_msgpack_packer
assert_equal packer1, packer2
end
end

sub_test_case 'thread_local_msgpack_unpacker' do
test 'unpacker is cached' do
unpacker1 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
unpacker2 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
assert_equal unpacker1, unpacker2
end

# We need to reset the buffer every time so that received incomplete data
# must not affect data from other senders.
test 'reset the internal buffer of unpacker every time' do
unpacker1 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
unpacker1.feed_each("\xA6foo") do |result|
flunk("This callback must not be called since the data is uncomplete.")
end

records = []
unpacker2 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
unpacker2.feed_each("\xA3foo") do |result|
records.append(result)
end
assert_equal ["foo"], records
end
end
end

0 comments on commit dc7e4b2

Please sign in to comment.