Skip to content

Commit

Permalink
Fixed #144 and #145 parallel range download may write passed the end …
Browse files Browse the repository at this point in the history
…of the target output stream.
  • Loading branch information
JasonYang-MSFT authored and vinjiang committed Feb 7, 2018
1 parent eae40c9 commit 7e9cd85
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 18 deletions.
12 changes: 7 additions & 5 deletions Microsoft.WindowsAzure.Storage/src/cloud_blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,8 @@ namespace azure { namespace storage {
// download the rest data in parallel.
utility::size64_t target_offset = offset;
utility::size64_t target_length = length;
if (target_length >= std::numeric_limits<utility::size64_t>::max())
if (target_length >= std::numeric_limits<utility::size64_t>::max()
|| target_length > instance->properties().size() - offset)
{
target_length = instance->properties().size() - offset;
}
Expand All @@ -617,7 +618,7 @@ namespace azure { namespace storage {
modified_condition.set_if_match_etag(instance->properties().etag());
}

return pplx::task_from_result().then([instance, target, target_offset, target_length, single_blob_download_threshold, modified_condition, options, context]()
return pplx::task_from_result().then([instance, offset, target, target_offset, target_length, single_blob_download_threshold, modified_condition, options, context]()
{
auto semaphore = std::make_shared<core::async_semaphore>(options.parallelism_factor());
// lock to the target ostream
Expand All @@ -636,12 +637,13 @@ namespace azure { namespace storage {
{
current_length = target_offset + target_length - current_offset;
}
semaphore->lock_async().then([instance, &mutex, semaphore, condition_variable, &condition_variable_mutex, &writer, target, smallest_offset, current_offset, current_length, modified_condition, options, context]()
semaphore->lock_async().then([instance, &mutex, semaphore, condition_variable, &condition_variable_mutex, &writer, offset, target, smallest_offset, current_offset, current_length, modified_condition, options, context]()
{
concurrency::streams::container_buffer<std::vector<uint8_t>> buffer;
auto segment_ostream = buffer.create_ostream();
// if trasaction MD5 is enabled, it will be checked inside each download_single_range_to_stream_async.
instance->download_single_range_to_stream_async(segment_ostream, current_offset, current_length, modified_condition, options, context).then([buffer, segment_ostream, semaphore, condition_variable, &condition_variable_mutex, smallest_offset, current_offset, current_length, &mutex, target, &writer, options](pplx::task<void> download_task)
instance->download_single_range_to_stream_async(segment_ostream, current_offset, current_length, modified_condition, options, context)
.then([buffer, segment_ostream, semaphore, condition_variable, &condition_variable_mutex, smallest_offset, offset, current_offset, current_length, &mutex, target, &writer, options](pplx::task<void> download_task)
{
segment_ostream.close().then([download_task](pplx::task<void> close_task)
{
Expand All @@ -655,7 +657,7 @@ namespace azure { namespace storage {
if (target.can_seek())
{
pplx::extensibility::scoped_rw_lock_t guard(mutex);
target.streambuf().seekpos(current_offset, std::ios_base::out);
target.streambuf().seekpos(current_offset - offset, std::ios_base::out);
target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait();
*smallest_offset += protocol::transactional_md5_block_size;
released = true;
Expand Down
12 changes: 7 additions & 5 deletions Microsoft.WindowsAzure.Storage/src/cloud_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,8 @@ namespace azure { namespace storage {
// download the rest data in parallel.
utility::size64_t target_offset = offset;
utility::size64_t target_length = length;
if (target_length >= std::numeric_limits<utility::size64_t>::max())
if (target_length >= std::numeric_limits<utility::size64_t>::max()
|| target_length > instance->properties().size() - offset)
{
target_length = instance->properties().size() - offset;
}
Expand All @@ -626,7 +627,7 @@ namespace azure { namespace storage {
target_offset += single_file_download_threshold;
target_length -= single_file_download_threshold;

return pplx::task_from_result().then([instance, target, target_offset, target_length, single_file_download_threshold, condition, options, context]()
return pplx::task_from_result().then([instance, offset, target, target_offset, target_length, single_file_download_threshold, condition, options, context]()
{
auto semaphore = std::make_shared<core::async_semaphore>(options.parallelism_factor());
// lock to the target ostream
Expand All @@ -645,12 +646,13 @@ namespace azure { namespace storage {
{
current_length = target_offset + target_length - current_offset;
}
semaphore->lock_async().then([instance, &mutex, semaphore, condition_variable, &condition_variable_mutex, &writer, target, smallest_offset, current_offset, current_length, condition, options, context]()
semaphore->lock_async().then([instance, &mutex, semaphore, condition_variable, &condition_variable_mutex, &writer, offset, target, smallest_offset, current_offset, current_length, condition, options, context]()
{
concurrency::streams::container_buffer<std::vector<uint8_t>> buffer;
auto segment_ostream = buffer.create_ostream();
// if trasaction MD5 is enabled, it will be checked inside each download_single_range_to_stream_async.
instance->download_single_range_to_stream_async(segment_ostream, current_offset, current_length, condition, options, context, false, true).then([buffer, segment_ostream, semaphore, condition_variable, &condition_variable_mutex, smallest_offset, current_offset, current_length, &mutex, target, &writer, options](pplx::task<void> download_task)
instance->download_single_range_to_stream_async(segment_ostream, current_offset, current_length, condition, options, context)
.then([buffer, segment_ostream, semaphore, condition_variable, &condition_variable_mutex, smallest_offset, offset, current_offset, current_length, &mutex, target, &writer, options](pplx::task<void> download_task)
{
segment_ostream.close().then([download_task](pplx::task<void> close_task)
{
Expand All @@ -664,7 +666,7 @@ namespace azure { namespace storage {
if (target.can_seek())
{
pplx::extensibility::scoped_rw_lock_t guard(mutex);
target.streambuf().seekpos(current_offset, std::ios_base::out);
target.streambuf().seekpos(current_offset - offset, std::ios_base::out);
target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait();
*smallest_offset += protocol::transactional_md5_block_size;
released = true;
Expand Down
72 changes: 70 additions & 2 deletions Microsoft.WindowsAzure.Storage/tests/cloud_blob_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ SUITE(Blob)
option.set_parallelism_factor(2);
std::vector<uint8_t> data;
data.resize(target_length);
for (size_t i = 0; i < target_length; ++i)
{
data[i] = i % 255;
}
concurrency::streams::container_buffer<std::vector<uint8_t>> upload_buffer(data);
blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context);

Expand All @@ -752,6 +756,10 @@ SUITE(Blob)
option.set_parallelism_factor(2);
std::vector<uint8_t> data;
data.resize(target_length);
for (size_t i = 0; i < target_length; ++i)
{
data[i] = i % 255;
}
concurrency::streams::container_buffer<std::vector<uint8_t>> upload_buffer(data);
blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context);

Expand All @@ -765,7 +773,13 @@ SUITE(Blob)
CHECK(download_buffer.collection().size() == target_length);
CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin()));
}
}

/// <summary>
/// Test parallel download wit offset
/// </summary>
TEST_FIXTURE(blob_test_base, parallel_download_with_offset)
{
// blob with size larger than 32MB.
// With offset not zero.
{
Expand All @@ -776,6 +790,10 @@ SUITE(Blob)
option.set_parallelism_factor(2);
std::vector<uint8_t> data;
data.resize(target_length);
for (size_t i = 0; i < target_length; ++i)
{
data[i] = i % 255;
}
concurrency::streams::container_buffer<std::vector<uint8_t>> upload_buffer(data);
blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context);

Expand All @@ -789,7 +807,7 @@ SUITE(Blob)

check_parallelism(context, 2);
CHECK(blob.properties().size() == target_length);
CHECK(download_buffer.collection().size() == target_length);
CHECK(download_buffer.collection().size() == actual_length);
CHECK(std::equal(data.begin() + actual_offset, data.end(), download_buffer.collection().begin()));
}

Expand All @@ -803,6 +821,10 @@ SUITE(Blob)
option.set_parallelism_factor(2);
std::vector<uint8_t> data;
data.resize(target_length);
for (size_t i = 0; i < target_length; ++i)
{
data[i] = i % 255;
}
concurrency::streams::container_buffer<std::vector<uint8_t>> upload_buffer(data);
blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context);

Expand All @@ -811,11 +833,49 @@ SUITE(Blob)
concurrency::streams::container_buffer<std::vector<uint8_t>> download_buffer;

utility::size64_t actual_offset = rand() % 255 + 1;
utility::size64_t actual_length = target_length - actual_offset;
blob.download_range_to_stream(download_buffer.create_ostream(), actual_offset, std::numeric_limits<utility::size64_t>::max(), azure::storage::access_condition(), option, context);

check_parallelism(context, 2);
CHECK(blob.properties().size() == target_length);
CHECK(download_buffer.collection().size() == target_length);
CHECK(download_buffer.collection().size() == actual_length);
CHECK(std::equal(data.begin() + actual_offset, data.end(), download_buffer.collection().begin()));
}
}

/// <summary>
/// Test parallel download wit length too large
/// </summary>
TEST_FIXTURE(blob_test_base, parallel_download_with_length_too_large)
{
// blob with size larger than 32MB.
// With offset not zero.
{
auto blob_name = get_random_string(20);
auto blob = m_container.get_block_blob_reference(blob_name);
size_t target_length = 100 * 1024 * 1024;
azure::storage::blob_request_options option;
option.set_parallelism_factor(10);
std::vector<uint8_t> data;
data.resize(target_length);
for (size_t i = 0; i < target_length; ++i)
{
data[i] = i % 255;
}
concurrency::streams::container_buffer<std::vector<uint8_t>> upload_buffer(data);
blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context);

// download target blob in parallel.
azure::storage::operation_context context;
concurrency::streams::container_buffer<std::vector<uint8_t>> download_buffer;

utility::size64_t actual_offset = rand() % 255 + 1;
utility::size64_t actual_length = target_length - actual_offset;
blob.download_range_to_stream(download_buffer.create_ostream(), actual_offset, actual_length * 2, azure::storage::access_condition(), option, context);

check_parallelism(context, 10);
CHECK(blob.properties().size() == target_length);
CHECK(download_buffer.collection().size() == actual_length);
CHECK(std::equal(data.begin() + actual_offset, data.end(), download_buffer.collection().begin()));
}
}
Expand All @@ -833,6 +893,10 @@ SUITE(Blob)
option.set_use_transactional_md5(true);
std::vector<uint8_t> data;
data.resize(target_length);
for (size_t i = 0; i < target_length; ++i)
{
data[i] = i % 255;
}
concurrency::streams::container_buffer<std::vector<uint8_t>> upload_buffer(data);
blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context);

Expand All @@ -857,6 +921,10 @@ SUITE(Blob)
option.set_use_transactional_md5(true);
std::vector<uint8_t> data;
data.resize(target_length);
for (size_t i = 0; i < target_length; ++i)
{
data[i] = i % 255;
}
concurrency::streams::container_buffer<std::vector<uint8_t>> upload_buffer(data);
blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context);

Expand Down
Loading

0 comments on commit 7e9cd85

Please sign in to comment.