Skip to content

Commit

Permalink
Fix large write cause tiflash crash (#7335) (#7352)
Browse files Browse the repository at this point in the history
close #7316
  • Loading branch information
ti-chi-bot authored Apr 28, 2023
1 parent c91ca2a commit 17d72e9
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 126 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ namespace DB
M(force_use_dmfile_format_v3) \
M(force_set_mocked_s3_object_mtime) \
M(force_stop_background_checkpoint_upload) \
M(skip_seek_before_read_dmfile)
M(skip_seek_before_read_dmfile) \
M(exception_after_large_write_exceed)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/MemoryReadWriteBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ReadBufferFromMemoryWriteBuffer : public ReadBuffer
return setChunk();
}

~ReadBufferFromMemoryWriteBuffer()
~ReadBufferFromMemoryWriteBuffer() override
{
for (const auto & range : chunk_list)
free(range.begin(), range.size());
Expand Down Expand Up @@ -138,7 +138,7 @@ void MemoryWriteBuffer::addChunk()
}
}

Position begin = reinterpret_cast<Position>(alloc(next_chunk_size));
auto * begin = reinterpret_cast<Position>(alloc(next_chunk_size));
chunk_tail = chunk_list.emplace_after(chunk_tail, begin, begin + next_chunk_size);
total_chunks_size += next_chunk_size;

Expand Down
12 changes: 0 additions & 12 deletions dbms/src/Storages/Page/V3/Blob/BlobStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,6 @@ class BlobStats
READ_ONLY = 2
};

static String blobTypeToString(BlobStatType type)
{
switch (type)
{
case BlobStatType::NORMAL:
return "normal";
case BlobStatType::READ_ONLY:
return "read only";
}
return "Invalid";
}

struct BlobStat
{
const BlobFileId id;
Expand Down
148 changes: 107 additions & 41 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <Storages/PathPool.h>
#include <boost_wrapper/string_split.h>
#include <common/logger_useful.h>
#include <fiu.h>

#include <ext/scope_guard.h>
#include <iterator>
Expand All @@ -58,6 +59,7 @@ extern const int CHECKSUM_DOESNT_MATCH;
namespace FailPoints
{
extern const char force_change_all_blobs_to_read_only[];
extern const char exception_after_large_write_exceed[];
} // namespace FailPoints

namespace PS::V3
Expand All @@ -66,6 +68,8 @@ static constexpr bool BLOBSTORE_CHECKSUM_ON_READ = true;

using BlobStatPtr = BlobStats::BlobStatPtr;
using ChecksumClass = Digest::CRC64;
static_assert(!std::is_same_v<ChecksumClass, Digest::XXH3>, "The checksum must support streaming checksum");
static_assert(!std::is_same_v<ChecksumClass, Digest::City128>, "The checksum must support streaming checksum");

/**********************
* BlobStore methods *
Expand Down Expand Up @@ -165,7 +169,7 @@ FileUsageStatistics BlobStore<Trait>::getFileUsageStatistics() const

template <typename Trait>
typename BlobStore<Trait>::PageEntriesEdit
BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch & wb, const WriteLimiterPtr & write_limiter)
BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch && wb, const WriteLimiterPtr & write_limiter)
{
PageEntriesEdit edit;
for (auto & write : wb.getMutWrites())
Expand All @@ -175,52 +179,114 @@ BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch & wb, const WriteL
case WriteBatchWriteType::PUT:
case WriteBatchWriteType::UPDATE_DATA_FROM_REMOTE:
{
const auto [blob_id, offset_in_file] = getPosFromStats(write.size);
auto blob_file = getBlobFile(blob_id);
ChecksumClass digest;
PageEntryV3 entry;

auto [blob_id, offset_in_file] = getPosFromStats(write.size);

entry.file_id = blob_id;
entry.size = write.size;
entry.tag = write.tag;
entry.offset = offset_in_file;
// padding size won't work on big write batch
entry.padded_size = 0;

BufferBase::Buffer data_buf = write.read_buffer->buffer();

digest.update(data_buf.begin(), write.size);
entry.checksum = digest.checksum();

UInt64 field_begin, field_end;

for (size_t i = 0; i < write.offsets.size(); ++i)
// swap from WriteBatch instead of copying
PageFieldOffsetChecksums field_offset_and_checksum;
field_offset_and_checksum.swap(write.offsets);

ChecksumClass field_digest;
size_t cur_field_index = 0;
UInt64 cur_field_begin = 0, cur_field_end = 0;
if (!field_offset_and_checksum.empty())
{
ChecksumClass field_digest;
field_begin = write.offsets[i].first;
field_end = (i == write.offsets.size() - 1) ? write.size : write.offsets[i + 1].first;

field_digest.update(data_buf.begin() + field_begin, field_end - field_begin);
write.offsets[i].second = field_digest.checksum();
}

if (!write.offsets.empty())
{
// we can swap from WriteBatch instead of copying
entry.field_offsets.swap(write.offsets);
cur_field_begin = field_offset_and_checksum[cur_field_index].first;
cur_field_end = (cur_field_index == field_offset_and_checksum.size() - 1) ? write.size : field_offset_and_checksum[cur_field_index + 1].first;
}

try
{
auto blob_file = getBlobFile(blob_id);
blob_file->write(data_buf.begin(), offset_in_file, write.size, write_limiter);
UInt64 buffer_begin_in_page = 0, buffer_end_in_page = 0;

while (true)
{
// The write batch data size is large, we do NOT copy the data into a temporary buffer in order to
// make the memory usage of tiflash more smooth. Instead, we process the data in ReadBuffer in a
// streaming manner.
BufferBase::Buffer data_buf = write.read_buffer->buffer();
buffer_end_in_page = buffer_begin_in_page + data_buf.size();

// TODO: Add static check to make sure the checksum support streaming
digest.update(data_buf.begin(), data_buf.size()); // the checksum of the whole page

// the checksum of each field
if (!field_offset_and_checksum.empty())
{
while (true)
{
auto field_begin_in_buf = cur_field_begin <= buffer_begin_in_page ? 0 : cur_field_begin - buffer_begin_in_page;
auto field_length_in_buf = cur_field_end > buffer_end_in_page ? data_buf.size() - field_begin_in_buf : cur_field_end - buffer_begin_in_page - field_begin_in_buf;
field_digest.update(data_buf.begin() + field_begin_in_buf, field_length_in_buf);

/*
* This piece of buffer does not contain all data of current field, break the loop
* PageBegin PageEnd
* │ │----------- Buffer Range -----------│ │
* │ │------------- Current Field --------------│ |
* ↑ ↑ Update field checksum
*/
if (cur_field_end > buffer_end_in_page)
break;

/*
* This piece of buffer contains all data of current field, update
* checksum and continue to try get the checksum of next field until
* this piece of buffer does not contain all data of field.
* PageBegin PageEnd
* │ │----------- Buffer Range -----------│ │
* │ │---- Field i ----│---- Field j ----│--- Field k ---│ |
* ↑ ↑ ↑ ↑ Update field checksum
*/
field_offset_and_checksum[cur_field_index].second = field_digest.checksum();

// all fields' checksum is OK, break the loop
if (cur_field_index >= field_offset_and_checksum.size() - 1)
break;

field_digest = ChecksumClass(); // reset
cur_field_index += 1;
cur_field_begin = field_offset_and_checksum[cur_field_index].first;
cur_field_end = (cur_field_index == field_offset_and_checksum.size() - 1) ? write.size : field_offset_and_checksum[cur_field_index + 1].first;
}
}

blob_file->write(data_buf.begin(), offset_in_file + buffer_begin_in_page, data_buf.size(), write_limiter);
buffer_begin_in_page += data_buf.size();

fiu_do_on(FailPoints::exception_after_large_write_exceed, {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::exception_after_large_write_exceed); v)
{
auto failpoint_bound = std::any_cast<size_t>(v.value());
if (buffer_end_in_page > failpoint_bound)
{
throw Exception(ErrorCodes::FAIL_POINT_ERROR, "failpoint throw exception buffer_end={} write_end={}", buffer_end_in_page, failpoint_bound);
}
}
});

if (!write.read_buffer->next())
break;
}
}
catch (DB::Exception & e)
{
// If exception happens, remove the allocated space in BlobStat
removePosFromStats(blob_id, offset_in_file, write.size);
LOG_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] write failed.", blob_id, offset_in_file, write.size);
LOG_ERROR(log, "large write failed, blob_id={} offset_in_file={} size={} msg={}", blob_id, offset_in_file, write.size, e.message());
throw e;
}

const auto entry = PageEntryV3{
.file_id = blob_id,
.size = write.size,
.padded_size = 0, // padding size won't work on large write batch
.tag = write.tag,
.offset = offset_in_file,
.checksum = digest.checksum(),
.checkpoint_info = OptionalCheckpointInfo{},
.field_offsets = std::move(field_offset_and_checksum),
};
if (write.type == WriteBatchWriteType::PUT)
{
edit.put(wb.getFullPageId(write.page_id), entry);
Expand Down Expand Up @@ -358,7 +424,8 @@ BlobStore<Trait>::write(typename Trait::WriteBatch && wb, const WriteLimiterPtr
// This can avoid allocating a big buffer for writing data and can smooth memory usage.
if (all_page_data_size > config.file_limit_size)
{
return handleLargeWrite(wb, write_limiter);
LOG_INFO(log, "handling large write, all_page_data_size={}", all_page_data_size);
return handleLargeWrite(std::move(wb), write_limiter);
}

char * buffer = static_cast<char *>(alloc(all_page_data_size));
Expand Down Expand Up @@ -508,7 +575,7 @@ BlobStore<Trait>::write(typename Trait::WriteBatch && wb, const WriteLimiterPtr
catch (DB::Exception & e)
{
removePosFromStats(blob_id, offset_in_file, actually_allocated_size);
LOG_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] [actually_allocated_size={}] write failed [error={}]", blob_id, offset_in_file, all_page_data_size, actually_allocated_size, e.message());
LOG_ERROR(log, "write failed, blob_id={} offset_in_file={} size={} actually_allocated_size={} msg={}", blob_id, offset_in_file, all_page_data_size, actually_allocated_size, e.message());
throw e;
}

Expand Down Expand Up @@ -762,8 +829,8 @@ BlobStore<Trait>::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_lim
auto field_checksum = digest.checksum();
if (unlikely(entry.size != 0 && field_checksum != expect_checksum))
{
throw Exception(
fmt::format("Reading with fields meet checksum not match "
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,
"Reading with fields meet checksum not match "
"[page_id={}] [expected=0x{:X}] [actual=0x{:X}] "
"[field_index={}] [field_offset={}] [field_size={}] "
"[entry={}] [file={}]",
Expand All @@ -774,8 +841,7 @@ BlobStore<Trait>::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_lim
beg_offset,
size_to_read,
entry,
blob_file->getPath()),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
blob_file->getPath());
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class BlobStore : private Allocator<false>
private:
#endif

PageEntriesEdit handleLargeWrite(typename Trait::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);
PageEntriesEdit handleLargeWrite(typename Trait::WriteBatch && wb, const WriteLimiterPtr & write_limiter = nullptr);

BlobFilePtr read(const PageId & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false);

Expand Down
Loading

0 comments on commit 17d72e9

Please sign in to comment.