Skip to content

Commit

Permalink
Fix large write cause tiflash crash (release-6.5) (#7345) (#7377)
Browse files Browse the repository at this point in the history
close #7316
  • Loading branch information
ti-chi-bot authored Apr 25, 2023
1 parent f7c6c60 commit d49dc8b
Show file tree
Hide file tree
Showing 14 changed files with 565 additions and 182 deletions.
34 changes: 30 additions & 4 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;
#define APPLY_FOR_FAILPOINTS_ONCE(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
Expand Down Expand Up @@ -95,7 +94,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_ingest_via_replace) \
M(unblock_query_init_after_write) \
M(exception_in_merged_task_init) \
M(force_fail_in_flush_region_data)
M(force_fail_in_flush_region_data) \
M(exception_after_large_write_exceed)


#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
Expand Down Expand Up @@ -145,6 +145,8 @@ APPLY_FOR_RANDOM_FAILPOINTS(M)
} // namespace FailPoints

#ifdef FIU_ENABLE
std::unordered_map<String, std::any> FailPointHelper::fail_point_val;
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;
class FailPointChannel : private boost::noncopyable
{
public:
Expand Down Expand Up @@ -202,13 +204,17 @@ void FailPointHelper::enablePauseFailPoint(const String & fail_point_name, UInt6
throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR);
}

void FailPointHelper::enableFailPoint(const String & fail_point_name)
void FailPointHelper::enableFailPoint(const String & fail_point_name, std::optional<std::any> v)
{
#define SUB_M(NAME, flags) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, flags); \
if (v.has_value()) \
{ \
fail_point_val.try_emplace(FailPoints::NAME, v.value()); \
} \
return; \
}

Expand All @@ -226,6 +232,10 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, flags); \
fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared<FailPointChannel>()); \
if (v.has_value()) \
{ \
fail_point_val.try_emplace(FailPoints::NAME, v.value()); \
} \
return; \
}

Expand All @@ -241,6 +251,16 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR);
}

std::optional<std::any>
FailPointHelper::getFailPointVal(const String & fail_point_name)
{
if (auto iter = fail_point_val.find(fail_point_name); iter != fail_point_val.end())
{
return iter->second;
}
return std::nullopt;
}

void FailPointHelper::disableFailPoint(const String & fail_point_name)
{
if (auto iter = fail_point_wait_channels.find(fail_point_name); iter != fail_point_wait_channels.end())
Expand All @@ -250,6 +270,7 @@ void FailPointHelper::disableFailPoint(const String & fail_point_name)
iter->second->notifyAll();
fail_point_wait_channels.erase(iter);
}
fail_point_val.erase(fail_point_name);
fiu_disable(fail_point_name.c_str());
}

Expand Down Expand Up @@ -303,7 +324,12 @@ class FailPointChannel
{
};

void FailPointHelper::enableFailPoint(const String &) {}
void FailPointHelper::enableFailPoint(const String &, std::optional<std::any>) {}

std::optional<std::any> FailPointHelper::getFailPointVal(const String &)
{
return std::nullopt;
}

void FailPointHelper::enablePauseFailPoint(const String &, UInt64) {}

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
#include <fiu-local.h>
#include <fiu.h>

#include <any>
#include <unordered_map>

namespace Poco
{
class Logger;
namespace Util
{
class LayeredConfiguration;
Expand All @@ -48,7 +48,9 @@ class FailPointChannel;
class FailPointHelper
{
public:
static void enableFailPoint(const String & fail_point_name);
static void enableFailPoint(const String & fail_point_name, std::optional<std::any> v = std::nullopt);

static std::optional<std::any> getFailPointVal(const String & fail_point_name);

static void enablePauseFailPoint(const String & fail_point_name, UInt64 time);

Expand All @@ -67,6 +69,9 @@ class FailPointHelper
static void enableRandomFailPoint(const String & fail_point_name, double rate);

private:
#ifdef FIU_ENABLE
static std::unordered_map<String, std::shared_ptr<FailPointChannel>> fail_point_wait_channels;
static std::unordered_map<String, std::any> fail_point_val;
#endif
};
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ namespace DB
M(tiflash_storage_read_thread_seconds, "Bucketed histogram of read thread", Histogram, \
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_mpp_task_manager, "The gauge of mpp task manager", Gauge, \
F(type_mpp_query_count, {"type", "mpp_query_count"})) \
F(type_mpp_query_count, {"type", "mpp_query_count"})) \
// clang-format on

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/MemoryReadWriteBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ReadBufferFromMemoryWriteBuffer : public ReadBuffer
return setChunk();
}

~ReadBufferFromMemoryWriteBuffer()
~ReadBufferFromMemoryWriteBuffer() override
{
for (const auto & range : chunk_list)
free(range.begin(), range.size());
Expand Down Expand Up @@ -138,7 +138,7 @@ void MemoryWriteBuffer::addChunk()
}
}

Position begin = reinterpret_cast<Position>(alloc(next_chunk_size));
auto * begin = reinterpret_cast<Position>(alloc(next_chunk_size));
chunk_tail = chunk_list.emplace_after(chunk_tail, begin, begin + next_chunk_size);
total_chunks_size += next_chunk_size;

Expand Down
12 changes: 0 additions & 12 deletions dbms/src/Storages/Page/V3/Blob/BlobStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ class BlobStats
READ_ONLY = 2
};

static String blobTypeToString(BlobStatType type)
{
switch (type)
{
case BlobStatType::NORMAL:
return "normal";
case BlobStatType::READ_ONLY:
return "read only";
}
return "Invalid";
}

struct BlobStat
{
const BlobFileId id;
Expand Down
Loading

0 comments on commit d49dc8b

Please sign in to comment.