Skip to content

Commit

Permalink
support config blob file limit size
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Aug 8, 2022
1 parent 97fa91f commit 1553a85
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 315 deletions.
7 changes: 4 additions & 3 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,11 @@ struct Settings
M(SettingUInt64, dt_checksum_frame_size, DBMS_DEFAULT_BUFFER_SIZE, "Frame size for delta tree stable storage") \
\
M(SettingDouble, dt_storage_blob_heavy_gc_valid_rate, 0.2, "Max valid rate of deciding a blob can be compact") \
M(SettingDouble, dt_storage_blob_file_limit_size, 268435456, "Max size of single blob file. Some blob file may exceed this limit due to large write. And change of the config only affect newly created blob file.") \
M(SettingDouble, dt_storage_blob_block_alignment_bytes, 0, "Blob IO alignment size") \
M(SettingBool, dt_enable_read_thread, false, "Enable storage read thread or not") \
M(SettingDouble, dt_block_slots_scale, 1.0, "Block slots limit of a read request") \
M(SettingDouble, dt_active_segments_scale, 1.0, "Acitve segments limit of a read request") \
M(SettingBool, dt_enable_read_thread, false, "Enable storage read thread or not") \
M(SettingDouble, dt_block_slots_scale, 1.0, "Block slots limit of a read request") \
M(SettingDouble, dt_active_segments_scale, 1.0, "Acitve segments limit of a read request") \
\
M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \
M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/ConfigSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void mergeConfigFromSettings(const DB::Settings & settings, PageStorage::Config

// V3 setting which export to global setting
config.blob_heavy_gc_valid_rate = settings.dt_storage_blob_heavy_gc_valid_rate;
config.blob_file_limit_size = settings.dt_storage_blob_file_limit_size;
config.blob_block_alignment_bytes = settings.dt_storage_blob_block_alignment_bytes;
}

Expand Down
150 changes: 57 additions & 93 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,9 @@ void BlobStore::registerPaths()
Poco::File blob(fmt::format("{}/{}", path, blob_name));
auto blob_size = blob.getSize();
delegator->addPageFileUsedSize({blob_id, 0}, blob_size, path, true);
if (blob_size > config.file_limit_size)
{
blob_stats.createBigPageStatNotChecking(blob_id, lock_stats);
}
else
{
blob_stats.createStatNotChecking(blob_id, lock_stats);
}
blob_stats.createStatNotChecking(blob_id,
std::max(blob_size, config.file_limit_size.get()),
lock_stats);
}
else
{
Expand All @@ -130,7 +125,7 @@ FileUsageStatistics BlobStore::getFileUsageStatistics() const
for (const auto & stat : stats)
{
// We can access to these type without any locking.
if (stat->isReadOnly() || stat->isBigBlob())
if (stat->isReadOnly())
{
usage.total_disk_size += stat->sm_total_size;
usage.total_valid_size += stat->sm_valid_size;
Expand Down Expand Up @@ -236,6 +231,9 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr

const size_t all_page_data_size = wb.getTotalDataSize();

// If the WriteBatch is too big, we will split the Writes in the WriteBatch to different `BlobFile`.
// This can avoid allocating a big buffer for writing data and can smooth memory usage.
// TODO: may be a different config for this behavior?
if (all_page_data_size > config.file_limit_size)
{
return handleLargeWrite(wb, write_limiter);
Expand Down Expand Up @@ -412,7 +410,7 @@ void BlobStore::remove(const PageEntriesV3 & del_entries)
}
}

// After we remove postion of blob, we need recalculate the blob.
// After we remove position of blob, we need recalculate the blob.
for (const auto & blob_id : blob_updated)
{
const auto & stat = blob_stats.blobIdToStat(blob_id,
Expand Down Expand Up @@ -443,40 +441,33 @@ std::pair<BlobFileId, BlobFileOffset> BlobStore::getPosFromStats(size_t size)

auto lock_stat = [size, this, &stat]() {
auto lock_stats = blob_stats.lock();
if (size > config.file_limit_size)
BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats);
if (stat == nullptr)
{
auto blob_file_id = blob_stats.chooseBigStat(lock_stats);
stat = blob_stats.createBigStat(blob_file_id, lock_stats);

return stat->lock();
// No valid stat for putting data with `size`, create a new one
stat = blob_stats.createStat(blob_file_id,
std::max(size, config.file_limit_size.get()),
lock_stats);
}
else
{
BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats);
if (stat == nullptr)
{
// No valid stat for puting data with `size`, create a new one
stat = blob_stats.createStat(blob_file_id, lock_stats);
}

// We must get the lock from BlobStat under the BlobStats lock
// to ensure that BlobStat updates are serialized.
// Otherwise it may cause stat to fail to get the span for writing
// and throwing exception.
return stat->lock();
}
// We must get the lock from BlobStat under the BlobStats lock
// to ensure that BlobStat updates are serialized.
// Otherwise it may cause stat to fail to get the span for writing
// and throwing exception.
return stat->lock();
}();

// We need to assume that this insert will reduce max_cap.
// Because other threads may also be waiting for BlobStats to chooseStat during this time.
// If max_cap is not reduced, it may cause the same BlobStat to accept multiple buffers and exceed its max_cap.
// After the BlobStore records the buffer size, max_caps will also get an accurate update.
// So there won't get problem in reducing max_caps here.
auto old_max_cap = stat->sm_max_caps;
assert(stat->sm_max_caps >= size);
stat->sm_max_caps -= size;

// Get Postion from single stat
auto old_max_cap = stat->sm_max_caps;
// Get Position from single stat
BlobFileOffset offset = stat->getPosFromStat(size, lock_stat);

// Can't insert into this spacemap
Expand All @@ -500,7 +491,10 @@ void BlobStore::removePosFromStats(BlobFileId blob_id, BlobFileOffset offset, si
const auto & stat = blob_stats.blobIdToStat(blob_id);
{
auto lock = stat->lock();
need_remove_stat = stat->removePosFromStat(offset, size, lock);
auto remaining_valid_size = stat->removePosFromStat(offset, size, lock);
// BlobFile which is read-only or with capacity larger than config.file_limit_size won't be reused for other write,
// so it's safe and necessary to remove it here.
need_remove_stat = ((stat->isReadOnly() || stat->file_total_caps > config.file_limit_size) && remaining_valid_size == 0);
}

// We don't need hold the BlobStat lock(Also can't do that).
Expand Down Expand Up @@ -851,11 +845,10 @@ struct BlobStoreGCInfo
{
String toString() const
{
return fmt::format("{}. {}. {}. {}. {}.",
return fmt::format("{}. {}. {}. {}.",
toTypeString("Read-Only Blob", 0),
toTypeString("No GC Blob", 1),
toTypeString("Full GC Blob", 2),
toTypeString("Big Blob", 3),
toTypeTruncateString("Truncated Blob"));
}

Expand All @@ -874,11 +867,6 @@ struct BlobStoreGCInfo
blob_gc_info[2].emplace_back(std::make_pair(blob_id, valid_rate));
}

void appendToBigBlob(const BlobFileId blob_id, double valid_rate)
{
blob_gc_info[3].emplace_back(std::make_pair(blob_id, valid_rate));
}

void appendToTruncatedBlob(const BlobFileId blob_id, UInt64 origin_size, UInt64 truncated_size, double valid_rate)
{
blob_gc_truncate_info.emplace_back(std::make_tuple(blob_id, origin_size, truncated_size, valid_rate));
Expand All @@ -888,8 +876,7 @@ struct BlobStoreGCInfo
// 1. read only blob
// 2. no need gc blob
// 3. full gc blob
// 4. big blob
std::vector<std::pair<BlobFileId, double>> blob_gc_info[4];
std::vector<std::pair<BlobFileId, double>> blob_gc_info[3];

std::vector<std::tuple<BlobFileId, UInt64, UInt64, double>> blob_gc_truncate_info;

Expand Down Expand Up @@ -976,13 +963,6 @@ std::vector<BlobFileId> BlobStore::getGCStats()
continue;
}

if (stat->isBigBlob())
{
blobstore_gc_info.appendToBigBlob(stat->id, stat->sm_valid_rate);
LOG_FMT_TRACE(log, "Current [blob_id={}] is big-blob", stat->id);
continue;
}

auto lock = stat->lock();
auto right_margin = stat->smap->getUsedBoundary();

Expand Down Expand Up @@ -1108,10 +1088,22 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
}
};

const auto config_file_limit = config.file_limit_size.get();
// If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s to
auto alloc_size = config.file_limit_size.get();
// If `total_page_size` is greater than `config_file_limit`, we will try to write the page data into multiple `BlobFile`s to
// make the memory consumption smooth during GC.
auto alloc_size = total_page_size > config_file_limit ? config_file_limit : total_page_size;
if (total_page_size > alloc_size)
{
size_t biggest_page_size = 0;
for (const auto & [file_id, versioned_pageid_entry_list] : entries_need_gc)
{
for (const auto & [page_id, versioned, entry] : versioned_pageid_entry_list)
{
biggest_page_size = std::max(biggest_page_size, entry.size);
}
}
alloc_size = biggest_page_size;
}

BlobFileOffset remaining_page_size = total_page_size - alloc_size;

char * data_buf = static_cast<char *>(alloc(alloc_size));
Expand Down Expand Up @@ -1142,7 +1134,6 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
/// After writing data into the current blob file, we reuse the original buffer for future write.
if (offset_in_data + entry.size > alloc_size)
{
assert(alloc_size == config_file_limit);
assert(file_offset_begin == 0);
// Remove the span that is not actually used
if (offset_in_data != alloc_size)
Expand All @@ -1159,7 +1150,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
offset_in_data = 0;

// Acquire a span from stats for remaining data
auto next_alloc_size = (remaining_page_size > config_file_limit ? config_file_limit : remaining_page_size);
auto next_alloc_size = (remaining_page_size > alloc_size ? alloc_size : remaining_page_size);
remaining_page_size -= next_alloc_size;
std::tie(blobfile_id, file_offset_begin) = getPosFromStats(next_alloc_size);
}
Expand Down Expand Up @@ -1281,7 +1272,7 @@ std::lock_guard<std::mutex> BlobStore::BlobStats::lock() const
return std::lock_guard(lock_stats);
}

BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard)
BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, UInt64 max_caps, const std::lock_guard<std::mutex> & guard)
{
// New blob file id won't bigger than roll_id
if (blob_file_id > roll_id)
Expand All @@ -1307,7 +1298,7 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std:
}

// Create a stat without checking the file_id exist or not
auto stat = createStatNotChecking(blob_file_id, guard);
auto stat = createStatNotChecking(blob_file_id, max_caps, guard);

// Roll to the next new blob id
if (blob_file_id == roll_id)
Expand All @@ -1318,38 +1309,13 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std:
return stat;
}

BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id, UInt64 max_caps, const std::lock_guard<std::mutex> &)
{
LOG_FMT_INFO(log, "Created a new BlobStat [blob_id={}]", blob_file_id);
LOG_FMT_INFO(log, "Created a new BlobStat [blob_id={}] with capacity {}", blob_file_id, max_caps);
BlobStatPtr stat = std::make_shared<BlobStat>(
blob_file_id,
static_cast<SpaceMap::SpaceMapType>(config.spacemap_type.get()),
config.file_limit_size);

PageFileIdAndLevel id_lvl{blob_file_id, 0};
stats_map[delegator->choosePath(id_lvl)].emplace_back(stat);
return stat;
}

BlobStatPtr BlobStore::BlobStats::createBigStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard)
{
auto stat = createBigPageStatNotChecking(blob_file_id, guard);
// Roll to the next new blob id
if (blob_file_id == roll_id)
{
roll_id++;
}

return stat;
}

BlobStatPtr BlobStore::BlobStats::createBigPageStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
{
LOG_FMT_INFO(log, "Created a new big BlobStat [blob_id={}]", blob_file_id);
BlobStatPtr stat = std::make_shared<BlobStat>(
blob_file_id,
SpaceMap::SpaceMapType::SMAP64_BIG,
config.file_limit_size);
max_caps);

PageFileIdAndLevel id_lvl{blob_file_id, 0};
stats_map[delegator->choosePath(id_lvl)].emplace_back(stat);
Expand Down Expand Up @@ -1414,9 +1380,11 @@ std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_s
for (const auto & stat : stats_iter->second)
{
auto lock = stat->lock(); // TODO: will it bring performance regression?
// Only BlobFile which total capacity is smaller than config.file_limit_size can be reused for other write
if (stat->isNormal()
&& stat->sm_max_caps >= buf_size
&& stat->sm_valid_rate < smallest_valid_rate)
&& stat->sm_valid_rate < smallest_valid_rate
&& stat->file_total_caps <= config.file_limit_size)
{
smallest_valid_rate = stat->sm_valid_rate;
stat_ptr = stat;
Expand Down Expand Up @@ -1449,11 +1417,6 @@ std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_s
return std::make_pair(stat_ptr, INVALID_BLOBFILE_ID);
}

BlobFileId BlobStore::BlobStats::chooseBigStat(const std::lock_guard<std::mutex> &) const
{
return roll_id;
}

BlobStatPtr BlobStore::BlobStats::blobIdToStat(BlobFileId file_id, bool ignore_not_exist)
{
auto guard = lock();
Expand Down Expand Up @@ -1512,6 +1475,7 @@ BlobFileOffset BlobStore::BlobStats::BlobStat::getPosFromStat(size_t buf_size, c
UInt64 max_cap = 0;
bool expansion = true;

// TODO: check the detail algorithm and what `expansion` means
std::tie(offset, max_cap, expansion) = smap->searchInsertOffset(buf_size);
ProfileEvents::increment(expansion ? ProfileEvents::PSV3MBlobExpansion : ProfileEvents::PSV3MBlobReused);

Expand Down Expand Up @@ -1544,12 +1508,12 @@ BlobFileOffset BlobStore::BlobStats::BlobStat::getPosFromStat(size_t buf_size, c
return offset;
}

bool BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard<std::mutex> &)
size_t BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard<std::mutex> &)
{
if (!smap->markFree(offset, buf_size))
{
smap->logDebugString();
throw Exception(fmt::format("Remove postion from BlobStat failed, invalid position [offset={}] [buf_size={}] [blob_id={}]",
throw Exception(fmt::format("Remove position from BlobStat failed, invalid position [offset={}] [buf_size={}] [blob_id={}]",
offset,
buf_size,
id),
Expand All @@ -1558,7 +1522,7 @@ bool BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si

sm_valid_size -= buf_size;
sm_valid_rate = sm_valid_size * 1.0 / sm_total_size;
return ((isReadOnly() || isBigBlob()) && sm_valid_size == 0);
return sm_valid_size;
}

void BlobStore::BlobStats::BlobStat::restoreSpaceMap(BlobFileOffset offset, size_t buf_size)
Expand Down
Loading

0 comments on commit 1553a85

Please sign in to comment.