Skip to content

Commit

Permalink
[Enhancement] limit persistent index compaction by disk (StarRocks#36681
Browse files Browse the repository at this point in the history
)

Why I'm doing:
In current implementation, we doesn't control the compaction concurrency on one disk, which will cause compaction unbalanced between disks, and some disk's IO cost will be too high.

What I'm doing:

Limit the disk compaction concurrency by config pindex_major_compaction_limit_per_disk .
Refactor persistent index compaction scheduler code.

Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha authored Dec 25, 2023
1 parent 832d7b0 commit a44147f
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 37 deletions.
6 changes: 4 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1042,8 +1042,10 @@ CONF_Bool(enable_pindex_minor_compaction, "true");
// if l2 num is larger than this, stop doing async compaction,
// add this config to prevent l2 grow too large.
CONF_mInt64(max_allow_pindex_l2_num, "5");
// control the background compaction threads
CONF_mInt64(pindex_major_compaction_num_threads, "0");
// Number of max major compaction threads
CONF_mInt32(pindex_major_compaction_num_threads, "0");
// Limit of major compaction per disk.
CONF_mInt32(pindex_major_compaction_limit_per_disk, "2");
// control the persistent index schedule compaction interval
CONF_mInt64(pindex_major_compaction_schedule_interval_seconds, "15");
// control the local persistent index in shared_data gc/evict interval
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
PersistentIndexCompactionManager* mgr =
StorageEngine::instance()->update_manager()->get_pindex_compaction_mgr();
if (mgr != nullptr) {
(void)mgr->update_max_threads(config::pindex_major_compaction_num_threads);
const int max_pk_index_compaction_thread_cnt = std::max(1, config::pindex_major_compaction_num_threads);
(void)mgr->update_max_threads(max_pk_index_compaction_thread_cnt);
}
});
_config_callback.emplace("update_memory_limit_percent", [&]() {
Expand Down
6 changes: 4 additions & 2 deletions be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,11 @@ void* StorageEngine::_pk_index_major_compaction_thread_callback(void* arg) {
ProfilerRegisterThread();
#endif
while (!_bg_worker_stopped.load(std::memory_order_consume)) {
SLEEP_IN_BG_WORKER(config::pindex_major_compaction_schedule_interval_seconds);
SLEEP_IN_BG_WORKER(1);
// schedule persistent index compaction
_update_manager->get_pindex_compaction_mgr()->schedule();
_update_manager->get_pindex_compaction_mgr()->schedule([&]() {
return StorageEngine::instance()->tablet_manager()->pick_tablets_to_do_pk_index_major_compaction();
});
}

return nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4842,8 +4842,8 @@ StatusOr<EditVersion> PersistentIndex::_major_compaction_impl(
return new_l2_version;
}

static void modify_l2_versions(const std::vector<EditVersion>& input_l2_versions, const EditVersion& output_l2_version,
PersistentIndexMetaPB& index_meta) {
void PersistentIndex::modify_l2_versions(const std::vector<EditVersion>& input_l2_versions,
const EditVersion& output_l2_version, PersistentIndexMetaPB& index_meta) {
// delete input l2 versions, and add output l2 version
std::vector<EditVersion> new_l2_versions;
std::vector<bool> new_l2_version_merged;
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,9 @@ class PersistentIndex {

void reset_cancel_major_compaction();

static void modify_l2_versions(const std::vector<EditVersion>& input_l2_versions,
const EditVersion& output_l2_version, PersistentIndexMetaPB& index_meta);

protected:
Status _delete_expired_index_file(const EditVersion& l0_version, const EditVersion& l1_version,
const EditVersionWithMerge& min_l2_version);
Expand Down
61 changes: 38 additions & 23 deletions be/src/storage/persistent_index_compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ PersistentIndexCompactionManager::~PersistentIndexCompactionManager() {
}

Status PersistentIndexCompactionManager::init() {
int max_pk_index_compaction_thread_cnt =
const int max_pk_index_compaction_thread_cnt =
config::pindex_major_compaction_num_threads > 0
? config::pindex_major_compaction_num_threads
: std::max((size_t)1, StorageEngine::instance()->get_store_num() * 2);
Expand All @@ -50,56 +50,71 @@ class PkIndexMajorCompactionTask : public Runnable {

void run() override {
WARN_IF_ERROR(_tablet->updates()->pk_index_major_compaction(), "Failed to run PkIndexMajorCompactionTask");
_mgr->unmark_running(_tablet->tablet_id());
_mgr->unmark_running(_tablet.get());
}

private:
TabletSharedPtr _tablet;
PersistentIndexCompactionManager* _mgr;
};

void PersistentIndexCompactionManager::schedule() {
if (_too_many_tasks()) {
// too many running tasks, stop schedule
LOG(WARNING) << "PersistentIndex compaction schedule failed, too many running tasks";
return;
}
std::vector<TabletAndScore> pick_tablets =
StorageEngine::instance()->tablet_manager()->pick_tablets_to_do_pk_index_major_compaction();
for (auto& tablet_score : pick_tablets) {
const int64_t tablet_id = tablet_score.first->tablet_id();
if (_need_skip(tablet_id)) {
void PersistentIndexCompactionManager::schedule(const std::function<std::vector<TabletAndScore>()>& pick_algo) {
update_ready_tablet_queue(pick_algo);
for (auto it = _ready_tablets_queue.begin(); it != _ready_tablets_queue.end();) {
auto& tablet_score = *it;
if (is_running(tablet_score.first.get())) {
// remove this tablet because it is already running
it = _ready_tablets_queue.erase(it);
continue;
}
if (disk_limit(tablet_score.first.get())) {
// skip it, may re-run it next round.
++it;
continue;
}
mark_running(tablet_id);
mark_running(tablet_score.first.get());
std::shared_ptr<Runnable> r = std::make_shared<PkIndexMajorCompactionTask>(tablet_score.first, this);
auto st = _worker_thread_pool->submit(std::move(r));
if (!st.ok()) {
unmark_running(tablet_id);
// Resource busy, break and quit
unmark_running(tablet_score.first.get());
LOG(ERROR) << strings::Substitute("submit pk index compaction task failed: $0", st.to_string());
break;
}
it = _ready_tablets_queue.erase(it);
}
}

void PersistentIndexCompactionManager::update_ready_tablet_queue(
const std::function<std::vector<TabletAndScore>()>& pick_algo) {
size_t current_time = time(nullptr);
if (current_time - _last_schedule_time > config::pindex_major_compaction_schedule_interval_seconds) {
// need re-schedule
_ready_tablets_queue = pick_algo();
_last_schedule_time = current_time;
}
}

void PersistentIndexCompactionManager::mark_running(int64_t tablet_id) {
void PersistentIndexCompactionManager::mark_running(Tablet* tablet) {
std::lock_guard<std::mutex> guard(_mutex);
_running_tablets.insert(tablet_id);
_running_tablets.insert(tablet->tablet_id());
_data_dir_to_task_num_map[tablet->data_dir()]++;
}

void PersistentIndexCompactionManager::unmark_running(int64_t tablet_id) {
void PersistentIndexCompactionManager::unmark_running(Tablet* tablet) {
std::lock_guard<std::mutex> guard(_mutex);
_running_tablets.erase(tablet_id);
_running_tablets.erase(tablet->tablet_id());
_data_dir_to_task_num_map[tablet->data_dir()]--;
}

bool PersistentIndexCompactionManager::_too_many_tasks() {
bool PersistentIndexCompactionManager::is_running(Tablet* tablet) {
std::lock_guard<std::mutex> guard(_mutex);
return _running_tablets.size() > MAX_RUNNING_TABLETS;
return _running_tablets.count(tablet->tablet_id()) > 0;
}

bool PersistentIndexCompactionManager::_need_skip(int64_t tablet_id) {
bool PersistentIndexCompactionManager::disk_limit(Tablet* tablet) {
std::lock_guard<std::mutex> guard(_mutex);
return _running_tablets.count(tablet_id) > 0;
return _data_dir_to_task_num_map[tablet->data_dir()] >= std::max(1, config::pindex_major_compaction_limit_per_disk);
}

Status PersistentIndexCompactionManager::update_max_threads(int max_threads) {
Expand Down
24 changes: 17 additions & 7 deletions be/src/storage/persistent_index_compaction_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "storage/olap_common.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"

namespace starrocks {

Expand All @@ -31,19 +32,28 @@ class PersistentIndexCompactionManager {
PersistentIndexCompactionManager() {}
~PersistentIndexCompactionManager();
Status init();
void schedule();
void mark_running(int64_t tablet_id);
void unmark_running(int64_t tablet_id);
void schedule(const std::function<std::vector<TabletAndScore>()>& pick_algo);
// Mark tablet is running and increase disk concurrency
void mark_running(Tablet* tablet);
// Mark tablet is no running and decrease disk concurrency
void unmark_running(Tablet* tablet);
// change the thread pool thread count
Status update_max_threads(int max_threads);
// Call pick algo function, and refresh ready tablet queue
void update_ready_tablet_queue(const std::function<std::vector<TabletAndScore>()>& pick_algo);
// Is tablet in running state
bool is_running(Tablet* tablet);
// Is tablet's disk out of concurrency limit
bool disk_limit(Tablet* tablet);

private:
bool _too_many_tasks();
bool _need_skip(int64_t tablet_id);
static const uint32_t MAX_RUNNING_TABLETS = 1000;

std::mutex _mutex;
// Sorted by prority
std::vector<TabletAndScore> _ready_tablets_queue;
std::unordered_set<int64_t> _running_tablets;
std::unique_ptr<ThreadPool> _worker_thread_pool;
std::unordered_map<DataDir*, uint64_t> _data_dir_to_task_num_map;
size_t _last_schedule_time = 0;
};

} // namespace starrocks
Loading

0 comments on commit a44147f

Please sign in to comment.