Skip to content

Commit

Permalink
[Enhancement] add sub level memory limit to preload state in lake pk …
Browse files Browse the repository at this point in the history
…table (StarRocks#39982)

Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha authored Jan 29, 2024
1 parent e6a9b95 commit 562b5d5
Show file tree
Hide file tree
Showing 16 changed files with 43 additions and 22 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,8 @@ CONF_mInt64(lake_vacuum_retry_min_delay_ms, "10");
CONF_mBool(enable_primary_key_recover, "false");
CONF_mBool(lake_enable_compaction_async_write, "false");
CONF_mInt64(lake_pk_compaction_max_input_rowsets, "5");
// Used for control memory usage of update state cache and compaction state cache
CONF_mInt32(lake_pk_preload_memory_limit_percent, "30");

CONF_mBool(dependency_librdkafka_debug_enable, "false");

Expand Down
13 changes: 8 additions & 5 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ UpdateManager::UpdateManager(LocationProvider* location_provider, MemTracker* me
_location_provider(location_provider),
_pk_index_shards(config::pk_index_map_shard_size) {
_update_mem_tracker = mem_tracker;
_update_state_mem_tracker = std::make_unique<MemTracker>(-1, "lake_rowset_update_state", mem_tracker);
const int64_t update_mem_limit = _update_mem_tracker->limit();
const int64_t preload_mem_limit =
update_mem_limit * std::max(std::min(100, config::lake_pk_preload_memory_limit_percent), 0) / 100;
_update_state_mem_tracker =
std::make_unique<MemTracker>(preload_mem_limit, "lake_rowset_update_state", mem_tracker);
_index_cache_mem_tracker = std::make_unique<MemTracker>(-1, "lake_index_cache", mem_tracker);
_compaction_state_mem_tracker = std::make_unique<MemTracker>(-1, "compaction_state_cache", mem_tracker);
_compaction_state_mem_tracker =
std::make_unique<MemTracker>(preload_mem_limit, "lake_compaction_state", mem_tracker);
_index_cache.set_mem_tracker(_index_cache_mem_tracker.get());
_update_state_cache.set_mem_tracker(_update_state_mem_tracker.get());

int64_t byte_limits = ParseUtil::parse_mem_spec(config::mem_limit, MemInfo::physical_mem());
int32_t update_mem_percent = std::max(std::min(100, config::update_memory_limit_percent), 0);
_index_cache.set_capacity(byte_limits * update_mem_percent / 100);
_index_cache.set_capacity(update_mem_limit);
}

UpdateManager::~UpdateManager() {
Expand Down
4 changes: 3 additions & 1 deletion be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LakeDelvecLoader : public DelvecLoader {

class UpdateManager {
public:
UpdateManager(LocationProvider* location_provider, MemTracker* mem_tracker = nullptr);
UpdateManager(LocationProvider* location_provider, MemTracker* mem_tracker);
~UpdateManager();
void set_tablet_mgr(TabletManager* tablet_mgr) { _tablet_mgr = tablet_mgr; }
void set_cache_expire_ms(int64_t expire_ms) { _cache_expire_ms = expire_ms; }
Expand Down Expand Up @@ -131,6 +131,8 @@ class UpdateManager {

MemTracker* compaction_state_mem_tracker() const { return _compaction_state_mem_tracker.get(); }

MemTracker* update_state_mem_tracker() const { return _update_state_mem_tracker.get(); }

// get or create primary index, and prepare primary index state
StatusOr<IndexEntry*> prepare_primary_index(const TabletMetadataPtr& metadata, MetaFileBuilder* builder,
int64_t base_version, int64_t new_version,
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ void* StorageEngine::_pk_dump_thread_callback(void* arg) {
SLEEP_IN_BG_WORKER(60);
// disable pk dump generation when pk_dump_interval_seconds less than 0
if (config::pk_dump_interval_seconds > 0) {
auto st = StorageEngine::instance()->tablet_manager()->generate_pk_dump_in_error_state();
auto st = StorageEngine::instance()->tablet_manager()->generate_pk_dump();
if (!st.ok()) {
LOG(ERROR) << "generate pk dump failed, st: " << st;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com
return best_tablet;
}

Status TabletManager::generate_pk_dump_in_error_state() {
Status TabletManager::generate_pk_dump() {
std::vector<TabletAndScore> pick_tablets;
// 1. pick primary key tablet
std::vector<TabletSharedPtr> tablet_ptr_list;
Expand All @@ -717,7 +717,7 @@ Status TabletManager::generate_pk_dump_in_error_state() {
}
// 2. generate pk dump if need
for (const auto& tablet_ptr : tablet_ptr_list) {
RETURN_IF_ERROR(tablet_ptr->updates()->generate_pk_dump());
RETURN_IF_ERROR(tablet_ptr->updates()->generate_pk_dump_if_in_error_state());
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class TabletManager {

std::vector<TabletAndScore> pick_tablets_to_do_pk_index_major_compaction();

Status generate_pk_dump_in_error_state();
Status generate_pk_dump();

private:
using TabletMap = std::unordered_map<int64_t, TabletSharedPtr>;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2288,7 +2288,7 @@ bool TabletUpdates::check_rowset_id(const RowsetId& rowset_id) const {
return false;
}

Status TabletUpdates::generate_pk_dump() {
Status TabletUpdates::generate_pk_dump_if_in_error_state() {
if (_error) {
// generate pk dump
static int64_t last_generate_time = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class TabletUpdates {

void set_error(const string& msg) { _set_error(msg); }

Status generate_pk_dump();
Status generate_pk_dump_if_in_error_state();

private:
friend class Tablet;
Expand Down
4 changes: 2 additions & 2 deletions be/test/runtime/lake_tablets_channel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ class LakeTabletsChannelTest : public testing::Test {
public:
LakeTabletsChannelTest() {
_schema_id = next_id();
_mem_tracker = std::make_unique<MemTracker>(-1);
_mem_tracker = std::make_unique<MemTracker>(1024 * 1024);
_location_provider = std::make_unique<lake::FixedLocationProvider>(kTestGroupPath);
_update_manager = std::make_unique<lake::UpdateManager>(_location_provider.get());
_update_manager = std::make_unique<lake::UpdateManager>(_location_provider.get(), _mem_tracker.get());
_tablet_manager =
std::make_unique<lake::TabletManager>(_location_provider.get(), _update_manager.get(), 1024 * 1024);

Expand Down
4 changes: 2 additions & 2 deletions be/test/runtime/load_channel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class LoadChannelTestForLakeTablet : public testing::Test {
public:
LoadChannelTestForLakeTablet() {
_schema_id = next_id();
_mem_tracker = std::make_unique<MemTracker>(-1);
_mem_tracker = std::make_unique<MemTracker>(1024 * 1024);
_location_provider = std::make_unique<lake::FixedLocationProvider>(kTestGroupPath);
_update_manager = std::make_unique<lake::UpdateManager>(_location_provider.get());
_update_manager = std::make_unique<lake::UpdateManager>(_location_provider.get(), _mem_tracker.get());
_tablet_manager =
std::make_unique<lake::TabletManager>(_location_provider.get(), _update_manager.get(), 1024 * 1024);

Expand Down
4 changes: 3 additions & 1 deletion be/test/storage/lake/meta_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class MetaFileTest : public ::testing::Test {
CHECK_OK(fs::create_directories(join_path(kTestDir, kSegmentDirectoryName)));

s_location_provider = std::make_unique<FixedLocationProvider>(kTestDir);
s_update_manager = std::make_unique<lake::UpdateManager>(s_location_provider.get());
s_mem_tracker = std::make_unique<MemTracker>(1024 * 1024);
s_update_manager = std::make_unique<lake::UpdateManager>(s_location_provider.get(), s_mem_tracker.get());
s_tablet_manager =
std::make_unique<lake::TabletManager>(s_location_provider.get(), s_update_manager.get(), 1638400000);
}
Expand All @@ -71,6 +72,7 @@ class MetaFileTest : public ::testing::Test {
inline static std::unique_ptr<lake::LocationProvider> s_location_provider;
inline static std::unique_ptr<TabletManager> s_tablet_manager;
inline static std::unique_ptr<UpdateManager> s_update_manager;
inline static std::unique_ptr<MemTracker> s_mem_tracker;
};

TEST_F(MetaFileTest, test_meta_rw) {
Expand Down
8 changes: 8 additions & 0 deletions be/test/storage/lake/primary_key_publish_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,14 @@ TEST_P(LakePrimaryKeyPublishTest, test_batch_publish) {
EXPECT_EQ(0, read_rows(tablet_id, new_version));
}

TEST_P(LakePrimaryKeyPublishTest, test_mem_tracker) {
EXPECT_EQ(1024 * 1024, _mem_tracker->limit());
EXPECT_EQ(1024 * 1024 * config::lake_pk_preload_memory_limit_percent / 100,
_update_mgr->compaction_state_mem_tracker()->limit());
EXPECT_EQ(1024 * 1024 * config::lake_pk_preload_memory_limit_percent / 100,
_update_mgr->update_state_mem_tracker()->limit());
}

INSTANTIATE_TEST_SUITE_P(LakePrimaryKeyPublishTest, LakePrimaryKeyPublishTest,
::testing::Values(PrimaryKeyParam{true}, PrimaryKeyParam{false}));

Expand Down
4 changes: 3 additions & 1 deletion be/test/storage/lake/replication_txn_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class LakeReplicationTxnManagerTest : public testing::TestWithParam<TKeysType::t
CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->metadata_root_location(1)));
CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->txn_log_root_location(1)));
CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->segment_root_location(1)));
_update_manager = std::make_unique<lake::UpdateManager>(_location_provider.get());
_mem_tracker = std::make_unique<MemTracker>(1024 * 1024);
_update_manager = std::make_unique<lake::UpdateManager>(_location_provider.get(), _mem_tracker.get());
_tablet_manager = std::make_unique<lake::TabletManager>(_location_provider.get(), _update_manager.get(), 16384);
_replication_txn_manager = std::make_unique<lake::ReplicationTxnManager>(_tablet_manager.get());

Expand Down Expand Up @@ -204,6 +205,7 @@ class LakeReplicationTxnManagerTest : public testing::TestWithParam<TKeysType::t
std::string _test_dir;
std::unique_ptr<lake::LocationProvider> _location_provider;
std::unique_ptr<lake::UpdateManager> _update_manager;
std::unique_ptr<MemTracker> _mem_tracker;
std::unique_ptr<lake::ReplicationTxnManager> _replication_txn_manager;

int64_t _transaction_id = 100;
Expand Down
4 changes: 2 additions & 2 deletions be/test/storage/lake/schema_change_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ std::string to_string_param_name(const testing::TestParamInfo<SchemaChangeParam>
class SchemaChangeTest : public testing::Test, public testing::WithParamInterface<SchemaChangeParam> {
public:
SchemaChangeTest(const std::string& test_dir) {
_mem_tracker = std::make_unique<MemTracker>(-1);
_mem_tracker = std::make_unique<MemTracker>(1024 * 1024);
_location_provider = std::make_unique<FixedLocationProvider>(test_dir);
_update_manager = std::make_unique<UpdateManager>(_location_provider.get());
_update_manager = std::make_unique<UpdateManager>(_location_provider.get(), _mem_tracker.get());
_tablet_manager = std::make_unique<TabletManager>(_location_provider.get(), _update_manager.get(), 1024 * 1024);
}

Expand Down
4 changes: 3 additions & 1 deletion be/test/storage/lake/tablet_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class LakeTabletManagerTest : public testing::Test {
CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->metadata_root_location(1)));
CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->txn_log_root_location(1)));
CHECK_OK(FileSystem::Default()->create_dir_recursive(_location_provider->segment_root_location(1)));
_update_manager = std::make_unique<lake::UpdateManager>(_location_provider);
_mem_tracker = std::make_unique<MemTracker>(1024 * 1024);
_update_manager = std::make_unique<lake::UpdateManager>(_location_provider, _mem_tracker.get());
_tablet_manager = new starrocks::lake::TabletManager(_location_provider, _update_manager.get(), 16384);
}

Expand All @@ -69,6 +70,7 @@ class LakeTabletManagerTest : public testing::Test {
std::string _test_dir;
lake::LocationProvider* _location_provider{nullptr};
std::unique_ptr<lake::UpdateManager> _update_manager;
std::unique_ptr<MemTracker> _mem_tracker;
};

// NOLINTNEXTLINE
Expand Down
2 changes: 1 addition & 1 deletion be/test/storage/lake/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TestBase : public ::testing::Test {
explicit TestBase(std::string test_dir, int64_t cache_limit = 1024 * 1024)
: _test_dir(std::move(test_dir)),
_parent_tracker(std::make_unique<MemTracker>(-1)),
_mem_tracker(std::make_unique<MemTracker>(-1, "", _parent_tracker.get())),
_mem_tracker(std::make_unique<MemTracker>(1024 * 1024, "", _parent_tracker.get())),
_lp(std::make_unique<FixedLocationProvider>(_test_dir)),
_update_mgr(std::make_unique<UpdateManager>(_lp.get(), _mem_tracker.get())),
_tablet_mgr(std::make_unique<TabletManager>(_lp.get(), _update_mgr.get(), cache_limit)) {}
Expand Down

0 comments on commit 562b5d5

Please sign in to comment.