Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: limit memory usage of data sharing (#8567) #8573

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ namespace DB
F(type_sche_new_task, {"type", "sche_new_task"}), \
F(type_add_cache_succ, {"type", "add_cache_succ"}), \
F(type_add_cache_stale, {"type", "add_cache_stale"}), \
F(type_add_cache_reach_count_limit, {"type", "type_add_cache_reach_count_limit"}), \
F(type_add_cache_total_bytes_limit, {"type", "add_cache_total_bytes_limit"}), \
F(type_get_cache_miss, {"type", "get_cache_miss"}), \
F(type_get_cache_part, {"type", "get_cache_part"}), \
F(type_get_cache_hit, {"type", "get_cache_hit"}), \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ struct Settings
\
M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \
M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \
\
M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \
M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \
Expand Down
20 changes: 18 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/ScanContext.h>

#include <utility>

namespace DB::DM
{
DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context)
Expand Down Expand Up @@ -54,6 +56,20 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &

bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();

if (!enable_read_thread || max_sharing_column_bytes_for_all <= 0)
{
// Disable data sharing.
max_sharing_column_count = 0;
}
else if (
shared_column_data_mem_tracker != nullptr
&& shared_column_data_mem_tracker->get() >= static_cast<Int64>(max_sharing_column_bytes_for_all))
{
// The memory used reaches the limitation by running queries, disable the data sharing for this DMFile
max_sharing_column_count = 0;
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment();
}

DMFileReader reader(
dmfile,
read_columns,
Expand All @@ -73,9 +89,9 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
rows_threshold_per_read,
read_one_pack_every_time,
tracing_id,
enable_read_thread,
max_sharing_column_count,
scan_context);

return std::make_shared<DMFileBlockInputStream>(std::move(reader), enable_read_thread);
return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_count > 0);
}
} // namespace DB::DM
16 changes: 9 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ namespace DM
class DMFileBlockInputStream : public SkippableBlockInputStream
{
public:
explicit DMFileBlockInputStream(DMFileReader && reader_, bool enable_read_thread_)
explicit DMFileBlockInputStream(DMFileReader && reader_, bool enable_data_sharing_)
: reader(std::move(reader_))
, enable_read_thread(enable_read_thread_)
, enable_data_sharing(enable_data_sharing_)
{
if (enable_read_thread)
if (enable_data_sharing)
{
DMFileReaderPool::instance().add(reader);
}
}

~DMFileBlockInputStream()
{
if (enable_read_thread)
if (enable_data_sharing)
{
DMFileReaderPool::instance().del(reader);
}
Expand All @@ -64,7 +64,7 @@ class DMFileBlockInputStream : public SkippableBlockInputStream

private:
DMFileReader reader;
bool enable_read_thread;
bool enable_data_sharing;
};

using DMFileBlockInputStreamPtr = std::shared_ptr<DMFileBlockInputStream>;
Expand Down Expand Up @@ -153,7 +153,8 @@ class DMFileBlockInputStreamBuilder
enable_column_cache = settings.dt_enable_stable_column_cache;
aio_threshold = settings.min_bytes_to_use_direct_io;
max_read_buffer_size = settings.max_read_buffer_size;
enable_read_thread = settings.dt_enable_read_thread;
max_sharing_column_bytes_for_all = settings.dt_max_sharing_column_bytes_for_all;
max_sharing_column_count = settings.dt_max_sharing_column_count;
return *this;
}
DMFileBlockInputStreamBuilder & setCaches(const MarkCachePtr & mark_cache_, const MinMaxIndexCachePtr & index_cache_)
Expand Down Expand Up @@ -186,7 +187,8 @@ class DMFileBlockInputStreamBuilder
size_t max_read_buffer_size{};
size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD;
bool read_one_pack_every_time = false;
bool enable_read_thread = false;
size_t max_sharing_column_bytes_for_all = 0;
size_t max_sharing_column_count = 0;
String tracing_id;
};

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ DMFileReader::DMFileReader(
size_t rows_threshold_per_read_,
bool read_one_pack_every_time_,
const String & tracing_id_,
bool enable_col_sharing_cache,
size_t max_sharing_column_count,
const ScanContextPtr & scan_context_)
: dmfile(dmfile_)
, read_columns(read_columns_)
Expand Down Expand Up @@ -275,9 +275,9 @@ DMFileReader::DMFileReader(
const auto data_type = dmfile->getColumnStat(cd.id).type;
data_type->enumerateStreams(callback, {});
}
if (enable_col_sharing_cache)
if (max_sharing_column_count > 0)
{
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, log);
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, max_sharing_column_count, log);
for (const auto & cd : read_columns)
{
last_read_from_cache[cd.id] = false;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DMFileReader
size_t rows_threshold_per_read_,
bool read_one_pack_every_time_,
const String & tracing_id_,
bool enable_col_sharing_cache,
size_t max_sharing_column_count,
const ScanContextPtr & scan_context_);

Block getHeader() const { return toEmptyBlock(read_columns); }
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ class ColumnSharingCache
ColumnPtr col_data;
};

void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data)
void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data, size_t max_sharing_column_count)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_succ).Increment();
std::lock_guard lock(mtx);
if (packs.size() >= max_sharing_column_count)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_reach_count_limit).Increment();
return;
}
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_succ).Increment();
auto & value = packs[start_pack_id];
if (value.pack_count < pack_count)
{
Expand Down Expand Up @@ -121,9 +126,14 @@ class ColumnSharingCache
class ColumnSharingCacheMap
{
public:
ColumnSharingCacheMap(const std::string & dmfile_name_, const ColumnDefines & cds, LoggerPtr & log_)
ColumnSharingCacheMap(
const String & dmfile_name_,
const ColumnDefines & cds,
size_t max_sharing_column_count_,
LoggerPtr & log_)
: dmfile_name(dmfile_name_)
, stats(static_cast<int>(ColumnCacheStatus::_TOTAL_COUNT))
, max_sharing_column_count(max_sharing_column_count_)
, log(log_)
{
for (const auto & cd : cds)
Expand Down Expand Up @@ -152,7 +162,7 @@ class ColumnSharingCacheMap
{
return;
}
itr->second.add(start_pack_id, pack_count, col_data);
itr->second.add(start_pack_id, pack_count, col_data, max_sharing_column_count);
}

bool get(int64_t col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col_data, DataTypePtr data_type)
Expand Down Expand Up @@ -207,6 +217,7 @@ class ColumnSharingCacheMap
std::string dmfile_name;
std::unordered_map<int64_t, ColumnSharingCache> cols;
std::vector<std::atomic<int64_t>> stats;
size_t max_sharing_column_count;
LoggerPtr log;
};

Expand Down
60 changes: 54 additions & 6 deletions dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TEST(ColumnSharingCacheTest, AddAndGet)
ColumnSharingCache cache;

auto col = createColumn(8);
cache.add(1, 8, col);
cache.add(1, 8, col, std::numeric_limits<UInt64>::max());

ColumnPtr col1;
auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
Expand All @@ -75,15 +75,15 @@ TEST(ColumnSharingCacheTest, AddAndGet)
ASSERT_EQ(col4, nullptr);

auto col5 = createColumn(7);
cache.add(1, 7, col5);
cache.add(1, 7, col5, std::numeric_limits<UInt64>::max());
ColumnPtr col6;
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col6, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_HIT);
ASSERT_EQ(col6->size(), 8 * TEST_PACK_ROWS);
compareColumn(col6, col, col6->size());

auto col7 = createColumn(9);
cache.add(1, 9, col7);
cache.add(1, 9, col7, std::numeric_limits<UInt64>::max());
ColumnPtr col8;
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
Expand All @@ -96,13 +96,13 @@ TEST(ColumnSharingCacheTest, Del)
ColumnSharingCache cache;

auto col1 = createColumn(8);
cache.add(1, 8, col1);
cache.add(1, 8, col1, std::numeric_limits<UInt64>::max());

auto col2 = createColumn(8);
cache.add(9, 8, col2);
cache.add(9, 8, col2, std::numeric_limits<UInt64>::max());

auto col3 = createColumn(8);
cache.add(17, 8, col3);
cache.add(17, 8, col3, std::numeric_limits<UInt64>::max());

cache.del(10);

Expand All @@ -119,4 +119,52 @@ TEST(ColumnSharingCacheTest, Del)
compareColumn(col5, col2, col5->size());
}

TEST(ColumnSharingCacheTest, AddAndGetWithLimitation)
{
ColumnSharingCache cache;

auto col = createColumn(8);
// Limit to 0, add should fail.
cache.add(1, 8, col, 0);
ColumnPtr col1;
auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_MISS);
ASSERT_EQ(col1, nullptr);

// Limit to 1, add should succ.
cache.add(1, 8, col, 1);
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_HIT);
ASSERT_EQ(col1->size(), 8 * TEST_PACK_ROWS);
compareColumn(col1, col, col1->size());
ColumnPtr col2;
st = cache.get(1, 7, 7 * TEST_PACK_ROWS, col2, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
ASSERT_EQ(col2->size(), 7 * TEST_PACK_ROWS);
compareColumn(col2, col, col2->size());
ColumnPtr col3;
st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col3, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_PART);
ASSERT_EQ(col3, nullptr);
ColumnPtr col4;
st = cache.get(2, 8, 8 * TEST_PACK_ROWS, col4, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_MISS);
ASSERT_EQ(col4, nullptr);

auto col7 = createColumn(9);
// Limit to 1, add should fail.
cache.add(1, 9, col7, 1);
ColumnPtr col8;
st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_PART);
ASSERT_EQ(col8, nullptr);

// Limit to 2, add should succ.
cache.add(1, 9, col7, 2);
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
ASSERT_EQ(col8->size(), 8 * TEST_PACK_ROWS);
compareColumn(col8, col7, col8->size());
}

} // namespace DB::DM::tests