Skip to content

Commit

Permalink
initial commit for large block blob support
Browse files Browse the repository at this point in the history
  • Loading branch information
hanzhumsft authored and vinjiang committed Jan 5, 2017
1 parent bfe654e commit 455e349
Show file tree
Hide file tree
Showing 18 changed files with 120 additions and 60 deletions.
3 changes: 3 additions & 0 deletions BreakingChanges.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
Azure Storage Client Library for C++
History of Breaking Changes

Breaking Changes in vNext:
- Default Rest API version is 2016-05-31.

Breaking Changes in v2.5:
- Upgraded Casablanca dependency to 2.9.1

Expand Down
4 changes: 4 additions & 0 deletions Changelog.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Azure Storage Client Library for C++
History of Changes

Changes in vNext:
- Default Rest API version is 2016-05-31.
- Supported large block size to 100MB, single blob upload threshold to 256MB.

Changes in v2.6:
- Supported parallel download for blobs and files
- Supported installation from Vcpkg
Expand Down
20 changes: 10 additions & 10 deletions Microsoft.WindowsAzure.Storage/includes/was/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -1531,8 +1531,8 @@ namespace azure { namespace storage {
m_disable_content_md5_validation(false),
m_parallelism_factor(1),
m_single_blob_upload_threshold(protocol::default_single_blob_upload_threshold),
m_stream_write_size(protocol::max_block_size),
m_stream_read_size(protocol::max_block_size),
m_stream_write_size(protocol::default_stream_write_size),
m_stream_read_size(protocol::default_stream_read_size),
m_absorb_conditional_errors_on_retry(false)
{
}
Expand Down Expand Up @@ -1662,7 +1662,7 @@ namespace azure { namespace storage {
/// Gets the maximum size of a blob in bytes that may be uploaded as a single blob.
/// </summary>
/// <returns>The maximum size of a blob, in bytes, that may be uploaded as a single blob,
/// ranging from between 1 and 64 MB inclusive.</returns>
/// ranging from between 1 and 256 MB inclusive.</returns>
utility::size64_t single_blob_upload_threshold_in_bytes() const
{
return m_single_blob_upload_threshold;
Expand All @@ -1672,10 +1672,10 @@ namespace azure { namespace storage {
/// Sets the maximum size of a blob in bytes that may be uploaded as a single blob.
/// </summary>
/// <param name="value">The maximum size of a blob, in bytes, that may be uploaded as a single blob,
/// ranging from between 1 and 64 MB inclusive.</param>
/// ranging from between 1 and 256 MB inclusive.</param>
void set_single_blob_upload_threshold_in_bytes(utility::size64_t value)
{
utility::assert_in_bounds<utility::size64_t>(_XPLATSTR("value"), value, 1 * 1024 * 1024, 64 * 1024 * 1024);
utility::assert_in_bounds<utility::size64_t>(_XPLATSTR("value"), value, 1 * 1024 * 1024, 256 * 1024 * 1024);
m_single_blob_upload_threshold = value;
}

Expand Down Expand Up @@ -1704,7 +1704,7 @@ namespace azure { namespace storage {
/// Gets the minimum number of bytes to buffer when reading from a blob stream.
/// </summary>
/// <returns>The minimum number of bytes to buffer, being at least 16KB.</returns>
size_t stream_read_size_in_bytes() const
option_with_default<size_t> stream_read_size_in_bytes() const
{
return m_stream_read_size;
}
Expand All @@ -1722,19 +1722,19 @@ namespace azure { namespace storage {
/// <summary>
/// Gets the minimum number of bytes to buffer when writing to a blob stream.
/// </summary>
/// <returns>The minimum number of bytes to buffer, ranging from between 16 KB and 4 MB inclusive.</returns>
size_t stream_write_size_in_bytes() const
/// <returns>The minimum number of bytes to buffer, ranging from between 16 KB and 100 MB inclusive.</returns>
option_with_default<size_t> stream_write_size_in_bytes() const
{
return m_stream_write_size;
}

/// <summary>
/// Sets the minimum number of bytes to buffer when writing to a blob stream.
/// </summary>
/// <param name="value">The minimum number of bytes to buffer, ranging from between 16 KB and 4 MB inclusive.</param>
/// <param name="value">The minimum number of bytes to buffer, ranging from between 16 KB and 100 MB inclusive.</param>
void set_stream_write_size_in_bytes(size_t value)
{
utility::assert_in_bounds<size_t>(_XPLATSTR("value"), value, 16 * 1024, 4 * 1024 * 1024);
utility::assert_in_bounds<size_t>(_XPLATSTR("value"), value, 16 * 1024, 100 * 1024 * 1024);
m_stream_write_size = value;
}

Expand Down
9 changes: 9 additions & 0 deletions Microsoft.WindowsAzure.Storage/includes/was/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ namespace azure { namespace storage {
merge(value.m_has_value ? (const T&)value : fallback_value);
}

/// <summary>
/// Indicates whether a specified value is set.
/// </summary>
/// <returns>A boolean indicating whether a specified value is set.</retruns>
bool has_value() const
{
return m_has_value;
}

private:

T m_value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ DAT(ms_header_time_next_visible, _XPLATSTR("x-ms-time-next-visible"))
DAT(ms_header_share_quota, _XPLATSTR("x-ms-share-quota"))

// header values
DAT(header_value_storage_version, _XPLATSTR("2015-12-11"))
DAT(header_value_storage_version, _XPLATSTR("2016-05-31"))
DAT(header_value_true, _XPLATSTR("true"))
DAT(header_value_false, _XPLATSTR("false"))
DAT(header_value_locked, _XPLATSTR("locked"))
Expand Down Expand Up @@ -331,6 +331,7 @@ DAT(error_blob_type_mismatch, "Blob type of the blob reference doesn't match blo
DAT(error_closed_stream, "Cannot access a closed stream.")
DAT(error_lease_id_on_source, "A lease condition cannot be specified on the source of a copy.")
DAT(error_incorrect_length, "Incorrect number of bytes received.")
DAT(error_blob_over_max_block_limit, "The total blocks required for this upload exceeds the maximum block limit. Please increase the block size if applicable and ensure the Blob size is not greater than the maximum Blob size limit.")
DAT(error_md5_mismatch, "Calculated MD5 does not match existing property.")
DAT(error_missing_md5, "MD5 does not exist. If you do not want to force validation, please disable use_transactional_md5.")
DAT(error_sas_missing_credentials, "Cannot create Shared Access Signature unless Shared Key credentials are used.")
Expand Down
15 changes: 12 additions & 3 deletions Microsoft.WindowsAzure.Storage/includes/wascore/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@
namespace azure { namespace storage { namespace protocol {

// size constants
const size_t max_block_size = 4 * 1024 * 1024;
const size_t single_block_size = 4 * 1024 * 1024;
const size_t max_block_number = 50000;
const size_t max_block_size = 100 * 1024 * 1024;
const utility::size64_t max_block_blob_size = static_cast<utility::size64_t>(max_block_number) * max_block_size;
const size_t max_append_block_size = 4 * 1024 * 1024;
const size_t max_page_size = 4 * 1024 * 1024;
const size_t max_range_size = 4 * 1024 * 1024;
const utility::size64_t max_single_blob_upload_threshold = 256 * 1024 * 1024;

const size_t default_stream_write_size = 4 * 1024 * 1024;
const size_t default_stream_read_size = 4 * 1024 * 1024;
const size_t default_buffer_size = 64 * 1024;
const utility::size64_t default_single_blob_upload_threshold = 32 * 1024 * 1024;
const utility::size64_t default_single_blob_upload_threshold = 128 * 1024 * 1024;
const utility::size64_t default_single_blob_download_threshold = 32 * 1024 * 1024;
const utility::size64_t default_single_block_download_threshold = 4 * 1024 * 1024;
const size_t transactional_md5_block_size = 4 * 1024 * 1024;

// duration constants
const std::chrono::seconds default_retry_interval(3);
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.WindowsAzure.Storage/includes/wascore/filestream.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace azure { namespace storage { namespace core {
: m_file(file), m_file_length(length), m_condition(access_condition), m_options(options), m_context(context),
m_semaphore(options.parallelism_factor()),m_current_file_offset(0)
{
m_buffer_size = protocol::max_block_size;
m_next_buffer_size = protocol::max_block_size;
m_buffer_size = protocol::max_range_size;
m_next_buffer_size = protocol::max_range_size;

if (m_options.use_transactional_md5())
{
Expand Down
13 changes: 5 additions & 8 deletions Microsoft.WindowsAzure.Storage/src/authentication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,11 @@ namespace azure { namespace storage { namespace protocol {
if ((key_size > ms_header_prefix_size) &&
std::equal(ms_header_prefix, ms_header_prefix + ms_header_prefix_size, key, [](const utility::char_t &c1, const utility::char_t &c2) {return c1 == c2;}))
{
if (!it->second.empty())
{
utility::string_t transformed_key(key);
std::transform(transformed_key.begin(), transformed_key.end(), transformed_key.begin(), core::utility_char_tolower);
m_result.append(transformed_key);
m_result.append(_XPLATSTR(":"));
append(it->second);
}
utility::string_t transformed_key(key);
std::transform(transformed_key.begin(), transformed_key.end(), transformed_key.begin(), core::utility_char_tolower);
m_result.append(transformed_key);
m_result.append(_XPLATSTR(":"));
append(it->second);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.WindowsAzure.Storage/src/cloud_append_blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ namespace azure { namespace storage {
properties->update_append_blob_committed_block_count(parsed_properties);
return utility::conversions::scan_string<int64_t>(protocol::get_header_value(response.headers(), protocol::ms_header_blob_append_offset));
});
return core::istream_descriptor::create(block_data, needs_md5).then([command, context, content_md5, modified_options, condition] (core::istream_descriptor request_body) -> pplx::task<int64_t>
return core::istream_descriptor::create(block_data, needs_md5, std::numeric_limits<utility::size64_t>::max(), protocol::max_append_block_size).then([command, context, content_md5, modified_options, condition] (core::istream_descriptor request_body) -> pplx::task<int64_t>
{
const utility::string_t& md5 = content_md5.empty() ? request_body.content_md5() : content_md5;
command->set_build_request(std::bind(protocol::append_block, md5, condition, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
Expand Down
10 changes: 5 additions & 5 deletions Microsoft.WindowsAzure.Storage/src/cloud_blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,9 @@ namespace azure { namespace storage {
auto smallest_offset = std::make_shared<utility::size64_t>(target_offset);
auto condition_variable = std::make_shared<std::condition_variable>();
std::mutex condition_variable_mutex;
for (utility::size64_t current_offset = target_offset; current_offset < target_offset + target_length; current_offset += protocol::single_block_size)
for (utility::size64_t current_offset = target_offset; current_offset < target_offset + target_length; current_offset += protocol::transactional_md5_block_size)
{
utility::size64_t current_length = protocol::single_block_size;
utility::size64_t current_length = protocol::transactional_md5_block_size;
if (current_offset + current_length > target_offset + target_length)
{
current_length = target_offset + target_length - current_offset;
Expand All @@ -649,7 +649,7 @@ namespace azure { namespace storage {
pplx::extensibility::scoped_rw_lock_t guard(mutex);
target.streambuf().seekpos(current_offset, std::ios_base::out);
target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait();
*smallest_offset += protocol::single_block_size;
*smallest_offset += protocol::transactional_md5_block_size;
released = true;
semaphore->unlock();
}
Expand All @@ -660,7 +660,7 @@ namespace azure { namespace storage {
if (*smallest_offset == current_offset)
{
target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait();
*smallest_offset += protocol::single_block_size;
*smallest_offset += protocol::transactional_md5_block_size;
condition_variable->notify_all();
released = true;
semaphore->unlock();
Expand All @@ -686,7 +686,7 @@ namespace azure { namespace storage {
if (*smallest_offset == current_offset)
{
target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait();
*smallest_offset += protocol::single_block_size;
*smallest_offset += protocol::transactional_md5_block_size;
}
else if (*smallest_offset > current_offset)
{
Expand Down
24 changes: 23 additions & 1 deletion Microsoft.WindowsAzure.Storage/src/cloud_block_blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ namespace azure { namespace storage {
protocol::preprocess_response_void(response, result, context);
properties->update_etag_and_last_modified(protocol::blob_response_parsers::parse_blob_properties(response));
});
return core::istream_descriptor::create(source, modified_options.store_blob_content_md5(), length).then([command, context, properties, metadata, condition, modified_options] (core::istream_descriptor request_body) -> pplx::task<void>
return core::istream_descriptor::create(source, modified_options.store_blob_content_md5(), length, protocol::max_single_blob_upload_threshold).then([command, context, properties, metadata, condition, modified_options] (core::istream_descriptor request_body) -> pplx::task<void>
{
if (!request_body.content_md5().empty())
{
Expand All @@ -191,6 +191,28 @@ namespace azure { namespace storage {
});
}

// Check if the total required blocks for the upload exceeds the maximum allowable block limit.
// Adjusts the block size to ensure a successful upload only if the value has not been explicitly set.
// Otherwise, throws a storage_exception if the default value has been changed or if the blob size exceeds the maximum capacity.
if (length != std::numeric_limits<utility::size64_t>::max())
{
auto totalBlocks = std::ceil(static_cast<double>(length) / modified_options.stream_write_size_in_bytes());

// Check if the total required blocks for the upload exceeds the maximum allowable block limit.
if (totalBlocks > protocol::max_block_number)
{
if (modified_options.stream_write_size_in_bytes().has_value() || length > protocol::max_block_blob_size)
{
throw storage_exception(protocol::error_blob_over_max_block_limit);
}
else
{
// Scale the block size to ensure a successful upload (only if the user did not specify a value).
modified_options.set_stream_write_size_in_bytes(static_cast<size_t>(std::ceil(static_cast<double>(length)) / protocol::max_block_number));
}
}
}

return open_write_async(condition, modified_options, context).then([source, length] (concurrency::streams::ostream blob_stream) -> pplx::task<void>
{
return core::stream_copy_async(source, blob_stream, length).then([blob_stream] (utility::size64_t) -> pplx::task<void>
Expand Down
12 changes: 6 additions & 6 deletions Microsoft.WindowsAzure.Storage/src/cloud_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ namespace azure { namespace storage {
properties->update_etag_and_last_modified(modified_properties);
properties->m_content_md5 = modified_properties.content_md5();
});
return core::istream_descriptor::create(stream, needs_md5, std::numeric_limits<utility::size64_t>::max(), protocol::max_block_size).then([command, context, start_offset, content_md5, modified_options](core::istream_descriptor request_body)->pplx::task<void>
return core::istream_descriptor::create(stream, needs_md5, std::numeric_limits<utility::size64_t>::max(), protocol::max_range_size).then([command, context, start_offset, content_md5, modified_options](core::istream_descriptor request_body)->pplx::task<void>
{
const utility::string_t& md5 = content_md5.empty() ? request_body.content_md5() : content_md5;
auto end_offset = start_offset + request_body.length() - 1;
Expand Down Expand Up @@ -630,9 +630,9 @@ namespace azure { namespace storage {
auto smallest_offset = std::make_shared<utility::size64_t>(target_offset);
auto condition_variable = std::make_shared<std::condition_variable>();
std::mutex condition_variable_mutex;
for (utility::size64_t current_offset = target_offset; current_offset < target_offset + target_length; current_offset += protocol::single_block_size)
for (utility::size64_t current_offset = target_offset; current_offset < target_offset + target_length; current_offset += protocol::transactional_md5_block_size)
{
utility::size64_t current_length = protocol::single_block_size;
utility::size64_t current_length = protocol::transactional_md5_block_size;
if (current_offset + current_length > target_offset + target_length)
{
current_length = target_offset + target_length - current_offset;
Expand All @@ -658,7 +658,7 @@ namespace azure { namespace storage {
pplx::extensibility::scoped_rw_lock_t guard(mutex);
target.streambuf().seekpos(current_offset, std::ios_base::out);
target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait();
*smallest_offset += protocol::single_block_size;
*smallest_offset += protocol::transactional_md5_block_size;
released = true;
semaphore->unlock();
}
Expand All @@ -669,7 +669,7 @@ namespace azure { namespace storage {
if (*smallest_offset == current_offset)
{
target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait();
*smallest_offset += protocol::single_block_size;
*smallest_offset += protocol::transactional_md5_block_size;
condition_variable->notify_all();
released = true;
semaphore->unlock();
Expand All @@ -695,7 +695,7 @@ namespace azure { namespace storage {
if (*smallest_offset == current_offset)
{
target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait();
*smallest_offset += protocol::single_block_size;
*smallest_offset += protocol::transactional_md5_block_size;
}
else if (*smallest_offset > current_offset)
{
Expand Down
Loading

0 comments on commit 455e349

Please sign in to comment.