Skip to content
This repository was archived by the owner on Feb 2, 2021. It is now read-only.

Fix duplicate bytes #25

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_azure-storage-append-blob.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def append_blob(content, metadata)
begin
size = [content.length - position, AZURE_BLOCK_SIZE_LIMIT].min
log.debug "azure_storage_append_blob: append_blob.chunk: content[#{position}..#{position + size}]"
@bs.append_blob_block(@azure_container, @azure_storage_path, content[position..position + size])
@bs.append_blob_block(@azure_container, @azure_storage_path, content[position..position + size - 1])
position += size
break if position >= content.length
rescue Azure::Core::Http::HTTPError => e
Expand Down
73 changes: 60 additions & 13 deletions test/plugin/test_out_azure_storage_append_blob.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@ class AzureStorageAppendBlobOutTest < Test::Unit::TestCase
path log
).freeze

def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::AzureStorageAppendBlobOut).configure(conf)
def create_driver(conf: CONFIG, service: nil)
d = Fluent::Test::Driver::Output.new(Fluent::Plugin::AzureStorageAppendBlobOut).configure(conf)
d.instance.instance_variable_set(:@bs, service)
d.instance.instance_variable_set(:@azure_storage_path, 'storage_path')
d
end

sub_test_case 'test config' do
test 'config should reject with no azure container' do
assert_raise Fluent::ConfigError do
create_driver(%(
create_driver conf: %(
azure_storage_account test_storage_account
azure_storage_access_key MY_FAKE_SECRET
time_slice_format %Y%m%d-%H
time_slice_wait 10m
path log
))
)
end
end

Expand All @@ -54,7 +57,7 @@ def create_driver(conf = CONFIG)
end

test 'config with managed identity enabled should set instance variables' do
d = create_driver(MSI_CONFIG)
d = create_driver conf: MSI_CONFIG
assert_equal 'test_storage_account', d.instance.azure_storage_account
assert_equal 'test_container', d.instance.azure_container
assert_equal true, d.instance.use_msi
Expand All @@ -68,7 +71,7 @@ def create_driver(conf = CONFIG)
sub_test_case 'test path slicing' do
test 'test path_slicing' do
config = CONFIG.clone.gsub(/path\slog/, 'path log/%Y/%m/%d')
d = create_driver(config)
d = create_driver conf: config
path_slicer = d.instance.instance_variable_get(:@path_slicer)
path = d.instance.instance_variable_get(:@path)
slice = path_slicer.call(path)
Expand All @@ -78,7 +81,7 @@ def create_driver(conf = CONFIG)
test 'path slicing utc' do
config = CONFIG.clone.gsub(/path\slog/, 'path log/%Y/%m/%d')
config << "\nutc\n"
d = create_driver(config)
d = create_driver conf: config
path_slicer = d.instance.instance_variable_get(:@path_slicer)
path = d.instance.instance_variable_get(:@path)
slice = path_slicer.call(path)
Expand All @@ -104,6 +107,12 @@ def initialize(status=404)
class FakeBlobService
def initialize(status)
@response = Azure::Core::Http::HttpResponse.new(FakeResponse.new(status))
@blocks = []
end
attr_reader :blocks

def append_blob_block(container, path, data)
@blocks.append(data)
end

def get_container_properties(container)
Expand All @@ -115,23 +124,61 @@ def get_container_properties(container)

sub_test_case 'test container_exists' do
test 'container 404 returns false' do
d = create_driver
d.instance.instance_variable_set(:@bs, FakeBlobService.new(404))
d = create_driver service: FakeBlobService.new(404)
assert_false d.instance.container_exists? "anything"
end

test 'existing container returns true' do
d = create_driver
d.instance.instance_variable_set(:@bs, FakeBlobService.new(200))
d = create_driver service: FakeBlobService.new(200)
assert_true d.instance.container_exists? "anything"
end

test 'unexpected exception raises' do
d = create_driver
d.instance.instance_variable_set(:@bs, FakeBlobService.new(500))
d = create_driver service: FakeBlobService.new(500)
assert_raise_kind_of Azure::Core::Http::HTTPError do
d.instance.container_exists? "anything"
end
end
end

# Override the block size limit so that mocked requests do not require huge buffers
class Fluent::Plugin::AzureStorageAppendBlobOut
AZURE_BLOCK_SIZE_LIMIT = 10
end

sub_test_case 'test append blob buffering' do
def fake_appended_blocks(content)
# run the append on the fake blob service, return a list of append request buffers
svc = FakeBlobService.new(200)
d = create_driver service: svc
d.instance.send(:append_blob, content, nil)
svc.blocks
end

test 'short buffer appends once' do
content = '123456789'
blocks = fake_appended_blocks content
assert_equal [content], blocks
end

test 'single character appends once' do
content = '1'
blocks = fake_appended_blocks content
assert_equal [content], blocks
end

test 'empty appends once' do
content = ''
blocks = fake_appended_blocks content
assert_equal [''], blocks
end

test 'long buffer appends multiple times' do
limit = Fluent::Plugin::AzureStorageAppendBlobOut::AZURE_BLOCK_SIZE_LIMIT
buf_1 = 'a' * limit
buf_2 = 'a' * 3
blocks = fake_appended_blocks buf_1 + buf_2
assert_equal [buf_1, buf_2], blocks
end
end
end