Skip to content

Commit

Permalink
Fix large write cause tiflash crash
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Apr 23, 2023
1 parent f7c6c60 commit f8842c8
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 141 deletions.
33 changes: 30 additions & 3 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_ingest_via_replace) \
M(unblock_query_init_after_write) \
M(exception_in_merged_task_init) \
M(force_fail_in_flush_region_data)
M(force_fail_in_flush_region_data) \
M(exception_after_large_write_exceed)


#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
Expand Down Expand Up @@ -145,6 +146,8 @@ APPLY_FOR_RANDOM_FAILPOINTS(M)
} // namespace FailPoints

#ifdef FIU_ENABLE
std::unordered_map<String, std::any> FailPointHelper::fail_point_val;
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;
class FailPointChannel : private boost::noncopyable
{
public:
Expand Down Expand Up @@ -202,13 +205,17 @@ void FailPointHelper::enablePauseFailPoint(const String & fail_point_name, UInt6
throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR);
}

void FailPointHelper::enableFailPoint(const String & fail_point_name)
void FailPointHelper::enableFailPoint(const String & fail_point_name, std::optional<std::any> v)
{
#define SUB_M(NAME, flags) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, flags); \
if (v.has_value()) \
{ \
fail_point_val.try_emplace(FailPoints::NAME, v.value()); \
} \
return; \
}

Expand All @@ -226,6 +233,10 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, flags); \
fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared<FailPointChannel>()); \
if (v.has_value()) \
{ \
fail_point_val.try_emplace(FailPoints::NAME, v.value()); \
} \
return; \
}

Expand All @@ -241,6 +252,16 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR);
}

std::optional<std::any>
FailPointHelper::getFailPointVal(const String & fail_point_name)
{
if (auto iter = fail_point_val.find(fail_point_name); iter != fail_point_val.end())
{
return iter->second;
}
return std::nullopt;
}

void FailPointHelper::disableFailPoint(const String & fail_point_name)
{
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter != fail_point_wait_channels.end())
Expand All @@ -250,6 +271,7 @@ void FailPointHelper::disableFailPoint(const String & fail_point_name)
iter->second->notifyAll();
fail_point_wait_channels.erase(iter);
}
fail_point_val.erase(fail_point_name);
fiu_disable(fail_point_name.c_str());
}

Expand Down Expand Up @@ -303,7 +325,12 @@ class FailPointChannel
{
};

void FailPointHelper::enableFailPoint(const String &) {}
void FailPointHelper::enableFailPoint(const String &, std::optional<std::any>) {}

std::optional<std::any> FailPointHelper::getFailPointVal(const String &)
{
return std::nullopt;
}

void FailPointHelper::enablePauseFailPoint(const String &, UInt64) {}

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
#include <fiu-local.h>
#include <fiu.h>

#include <any>
#include <unordered_map>

namespace Poco
{
class Logger;
namespace Util
{
class LayeredConfiguration;
Expand All @@ -48,7 +48,9 @@ class FailPointChannel;
class FailPointHelper
{
public:
static void enableFailPoint(const String & fail_point_name);
static void enableFailPoint(const String & fail_point_name, std::optional<std::any> v = std::nullopt);

static std::optional<std::any> getFailPointVal(const String & fail_point_name);

static void enablePauseFailPoint(const String & fail_point_name, UInt64 time);

Expand All @@ -67,6 +69,9 @@ class FailPointHelper
static void enableRandomFailPoint(const String & fail_point_name, double rate);

private:
#ifdef FIU_ENABLE
static std::unordered_map<String, std::shared_ptr<FailPointChannel>> fail_point_wait_channels;
static std::unordered_map<String, std::any> fail_point_val;
#endif
};
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ namespace DB
M(tiflash_storage_read_thread_seconds, "Bucketed histogram of read thread", Histogram, \
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_mpp_task_manager, "The gauge of mpp task manager", Gauge, \
F(type_mpp_query_count, {"type", "mpp_query_count"})) \
F(type_mpp_query_count, {"type", "mpp_query_count"})) \
// clang-format on

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/MemoryReadWriteBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ReadBufferFromMemoryWriteBuffer : public ReadBuffer
return setChunk();
}

~ReadBufferFromMemoryWriteBuffer()
~ReadBufferFromMemoryWriteBuffer() override
{
for (const auto & range : chunk_list)
free(range.begin(), range.size());
Expand Down Expand Up @@ -138,7 +138,7 @@ void MemoryWriteBuffer::addChunk()
}
}

Position begin = reinterpret_cast<Position>(alloc(next_chunk_size));
auto * begin = reinterpret_cast<Position>(alloc(next_chunk_size));
chunk_tail = chunk_list.emplace_after(chunk_tail, begin, begin + next_chunk_size);
total_chunks_size += next_chunk_size;

Expand Down
12 changes: 0 additions & 12 deletions dbms/src/Storages/Page/V3/Blob/BlobStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ class BlobStats
READ_ONLY = 2
};

static String blobTypeToString(BlobStatType type)
{
switch (type)
{
case BlobStatType::NORMAL:
return "normal";
case BlobStatType::READ_ONLY:
return "read only";
}
return "Invalid";
}

struct BlobStat
{
const BlobFileId id;
Expand Down
147 changes: 106 additions & 41 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Storages/Page/WriteBatch.h>
#include <boost_wrapper/string_split.h>
#include <common/logger_useful.h>
#include <fiu.h>

#include <boost/algorithm/string/classification.hpp>
#include <ext/scope_guard.h>
Expand All @@ -58,6 +59,7 @@ extern const int CHECKSUM_DOESNT_MATCH;
namespace FailPoints
{
extern const char force_change_all_blobs_to_read_only[];
extern const char exception_after_large_write_exceed[];
} // namespace FailPoints

namespace PS::V3
Expand All @@ -66,6 +68,8 @@ static constexpr bool BLOBSTORE_CHECKSUM_ON_READ = true;

using BlobStatPtr = BlobStats::BlobStatPtr;
using ChecksumClass = Digest::CRC64;
static_assert(!std::is_same_v<ChecksumClass, Digest::XXH3>, "The checksum must support streaming checksum");
static_assert(!std::is_same_v<ChecksumClass, Digest::City128>, "The checksum must support streaming checksum");

/**********************
* BlobStore methods *
Expand Down Expand Up @@ -159,7 +163,7 @@ FileUsageStatistics BlobStore::getFileUsageStatistics() const
return usage;
}

PageEntriesEdit BlobStore::handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter)
PageEntriesEdit BlobStore::handleLargeWrite(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter)
{
PageEntriesEdit edit;
for (auto & write : wb.getWrites())
Expand All @@ -168,53 +172,114 @@ PageEntriesEdit BlobStore::handleLargeWrite(DB::WriteBatch & wb, const WriteLimi
{
case WriteBatchWriteType::PUT:
{
const auto [blob_id, offset_in_file] = getPosFromStats(write.size);
auto blob_file = getBlobFile(blob_id);
ChecksumClass digest;
PageEntryV3 entry;

auto [blob_id, offset_in_file] = getPosFromStats(write.size);

entry.file_id = blob_id;
entry.size = write.size;
entry.tag = write.tag;
entry.offset = offset_in_file;
// padding size won't work on big write batch
entry.padded_size = 0;

BufferBase::Buffer data_buf = write.read_buffer->buffer();

digest.update(data_buf.begin(), write.size);
entry.checksum = digest.checksum();

UInt64 field_begin, field_end;

for (size_t i = 0; i < write.offsets.size(); ++i)
{
ChecksumClass field_digest;
field_begin = write.offsets[i].first;
field_end = (i == write.offsets.size() - 1) ? write.size : write.offsets[i + 1].first;

field_digest.update(data_buf.begin() + field_begin, field_end - field_begin);
write.offsets[i].second = field_digest.checksum();
}

if (!write.offsets.empty())
// swap from WriteBatch instead of copying
PageFieldOffsetChecksums field_offset_and_checksum;
field_offset_and_checksum.swap(write.offsets);

ChecksumClass field_digest;
size_t cur_field_index = 0;
UInt64 cur_field_begin = 0, cur_field_end = 0;
if (!field_offset_and_checksum.empty())
{
// we can swap from WriteBatch instead of copying
entry.field_offsets.swap(write.offsets);
cur_field_begin = field_offset_and_checksum[cur_field_index].first;
cur_field_end = (cur_field_index == field_offset_and_checksum.size() - 1) ? write.size : field_offset_and_checksum[cur_field_index + 1].first;
}

try
{
auto blob_file = getBlobFile(blob_id);
blob_file->write(data_buf.begin(), offset_in_file, write.size, write_limiter);
UInt64 buffer_begin_in_page = 0, buffer_end_in_page = 0;

while (true)
{
// The write batch data size is large, we do NOT copy the data into a temporary buffer in order to
// make the memory usage of tiflash more smooth. Instead, we process the data in ReadBuffer in a
// streaming manner.
BufferBase::Buffer data_buf = write.read_buffer->buffer();
buffer_end_in_page = buffer_begin_in_page + data_buf.size();

// TODO: Add static check to make sure the checksum support streaming
digest.update(data_buf.begin(), data_buf.size()); // the checksum of the whole page

// the checksum of each field
if (!field_offset_and_checksum.empty())
{
while (true)
{
auto field_begin_in_buf = cur_field_begin <= buffer_begin_in_page ? 0 : cur_field_begin - buffer_begin_in_page;
auto field_length_in_buf = cur_field_end > buffer_end_in_page ? data_buf.size() - field_begin_in_buf : cur_field_end - buffer_begin_in_page - field_begin_in_buf;
field_digest.update(data_buf.begin() + field_begin_in_buf, field_length_in_buf);

/*
* This piece of buffer does not contain all data of current field, break the loop
* PageBegin PageEnd
* │ │----------- Buffer Range -----------│ │
* │ │------------- Current Field --------------│ |
* ↑ ↑ Update field checksum
*/
if (cur_field_end > buffer_end_in_page)
break;

/*
* This piece of buffer contains all data of current field, update
* checksum and continue to try get the checksum of next field until
* this piece of buffer does not contain all data of field.
* PageBegin PageEnd
* │ │----------- Buffer Range -----------│ │
* │ │---- Field i ----│---- Field j ----│--- Field k ---│ |
* ↑ ↑ ↑ ↑ Update field checksum
*/
field_offset_and_checksum[cur_field_index].second = field_digest.checksum();

// all fields' checksum is OK, break the loop
if (cur_field_index >= field_offset_and_checksum.size() - 1)
break;

field_digest = ChecksumClass(); // reset
cur_field_index += 1;
cur_field_begin = field_offset_and_checksum[cur_field_index].first;
cur_field_end = (cur_field_index == field_offset_and_checksum.size() - 1) ? write.size : field_offset_and_checksum[cur_field_index + 1].first;
}
}

blob_file->write(data_buf.begin(), offset_in_file + buffer_begin_in_page, data_buf.size(), write_limiter);
buffer_begin_in_page += data_buf.size();

fiu_do_on(FailPoints::exception_after_large_write_exceed, {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::exception_after_large_write_exceed); v)
{
auto failpoint_bound = std::any_cast<size_t>(v.value());
if (buffer_end_in_page > failpoint_bound)
{
throw Exception(ErrorCodes::FAIL_POINT_ERROR, "failpoint throw exception buffer_end={} write_end={}", buffer_end_in_page, failpoint_bound);
}
}
});

if (!write.read_buffer->next())
break;
}
}
catch (DB::Exception & e)
{
// If exception happens, remove the allocated space in BlobStat
removePosFromStats(blob_id, offset_in_file, write.size);
LOG_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] write failed.", blob_id, offset_in_file, write.size);
LOG_ERROR(log, "large write failed, blob_id={} offset_in_file={} size={} msg={}", blob_id, offset_in_file, write.size, e.message());
throw e;
}

const auto entry = PageEntryV3{
.file_id = blob_id,
.size = write.size,
.padded_size = 0, // padding size won't work on large write batch
.tag = write.tag,
.offset = offset_in_file,
.checksum = digest.checksum(),
.field_offsets = std::move(field_offset_and_checksum),
};

edit.put(wb.getFullPageId(write.page_id), entry);
break;
}
Expand Down Expand Up @@ -285,7 +350,8 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
// This can avoid allocating a big buffer for writing data and can smooth memory usage.
if (all_page_data_size > config.file_limit_size)
{
return handleLargeWrite(wb, write_limiter);
LOG_INFO(log, "handling large write, all_page_data_size={}", all_page_data_size);
return handleLargeWrite(std::move(wb), write_limiter);
}

char * buffer = static_cast<char *>(alloc(all_page_data_size));
Expand Down Expand Up @@ -398,7 +464,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
catch (DB::Exception & e)
{
removePosFromStats(blob_id, offset_in_file, actually_allocated_size);
LOG_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] [actually_allocated_size={}] write failed [error={}]", blob_id, offset_in_file, all_page_data_size, actually_allocated_size, e.message());
LOG_ERROR(log, "write failed, blob_id={} offset_in_file={} size={} actually_allocated_size={} msg={}", blob_id, offset_in_file, all_page_data_size, actually_allocated_size, e.message());
throw e;
}

Expand Down Expand Up @@ -642,8 +708,8 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li
auto field_checksum = digest.checksum();
if (unlikely(entry.size != 0 && field_checksum != expect_checksum))
{
throw Exception(
fmt::format("Reading with fields meet checksum not match "
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,
"Reading with fields meet checksum not match "
"[page_id={}] [expected=0x{:X}] [actual=0x{:X}] "
"[field_index={}] [field_offset={}] [field_size={}] "
"[entry={}] [file={}]",
Expand All @@ -654,8 +720,7 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li
beg_offset,
size_to_read,
toDebugString(entry),
blob_file->getPath()),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
blob_file->getPath());
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class BlobStore : private Allocator<false>
private:
#endif

PageEntriesEdit handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);
PageEntriesEdit handleLargeWrite(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter = nullptr);

BlobFilePtr read(const PageIdV3Internal & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false);

Expand Down
Loading

0 comments on commit f8842c8

Please sign in to comment.