Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] only use compute node to create cloudnative table #21092

Merged
merged 12 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update codes
Signed-off-by: abc982627271 <liuxuefen@starrocks.com>
  • Loading branch information
abc982627271 committed Apr 12, 2023
commit f92162d6cdb948ccf982de398d47f06a241e66fc
178 changes: 90 additions & 88 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ void AgentServer::Impl::init_or_die() {
LOG(WARNING) << "std exception when remove dpp download path. path=" << path.path;
}
}
}

#define BUILD_DYNAMIC_TASK_THREAD_POOL(name, min_threads, max_threads, queue_size, pool) \
do { \
Expand All @@ -151,75 +152,74 @@ void AgentServer::Impl::init_or_die() {
CHECK(st.ok()) << st; \
} while (false)

// The ideal queue size of threadpool should be larger than the maximum number of tablet of a partition.
// But it seems that there's no limit for the number of tablets of a partition.
// Since a large queue size brings a little overhead, a big one is chosen here.
BUILD_DYNAMIC_TASK_THREAD_POOL("publish_version", config::transaction_publish_version_worker_count,
config::transaction_publish_version_worker_count,
DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE, _thread_pool_publish_version);
// The ideal queue size of threadpool should be larger than the maximum number of tablet of a partition.
// But it seems that there's no limit for the number of tablets of a partition.
// Since a large queue size brings a little overhead, a big one is chosen here.
BUILD_DYNAMIC_TASK_THREAD_POOL("publish_version", config::transaction_publish_version_worker_count,
config::transaction_publish_version_worker_count,
DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE, _thread_pool_publish_version);

BUILD_DYNAMIC_TASK_THREAD_POOL("drop", config::drop_tablet_worker_count, config::drop_tablet_worker_count,
std::numeric_limits<int>::max(), _thread_pool_drop);
BUILD_DYNAMIC_TASK_THREAD_POOL("drop", config::drop_tablet_worker_count, config::drop_tablet_worker_count,
std::numeric_limits<int>::max(), _thread_pool_drop);

BUILD_DYNAMIC_TASK_THREAD_POOL("create_tablet", config::create_tablet_worker_count,
config::create_tablet_worker_count, std::numeric_limits<int>::max(),
_thread_pool_create_tablet);
BUILD_DYNAMIC_TASK_THREAD_POOL("create_tablet", config::create_tablet_worker_count,
config::create_tablet_worker_count, std::numeric_limits<int>::max(),
_thread_pool_create_tablet);

BUILD_DYNAMIC_TASK_THREAD_POOL("alter_tablet", config::alter_tablet_worker_count,
config::alter_tablet_worker_count, std::numeric_limits<int>::max(),
_thread_pool_alter_tablet);
BUILD_DYNAMIC_TASK_THREAD_POOL("alter_tablet", config::alter_tablet_worker_count, config::alter_tablet_worker_count,
std::numeric_limits<int>::max(), _thread_pool_alter_tablet);

BUILD_DYNAMIC_TASK_THREAD_POOL("clear_transaction", config::clear_transaction_task_worker_count,
config::clear_transaction_task_worker_count, std::numeric_limits<int>::max(),
_thread_pool_clear_transaction);
BUILD_DYNAMIC_TASK_THREAD_POOL("clear_transaction", config::clear_transaction_task_worker_count,
config::clear_transaction_task_worker_count, std::numeric_limits<int>::max(),
_thread_pool_clear_transaction);

BUILD_DYNAMIC_TASK_THREAD_POOL("storage_medium_migrate", config::storage_medium_migrate_count,
config::storage_medium_migrate_count, std::numeric_limits<int>::max(),
_thread_pool_storage_medium_migrate);
BUILD_DYNAMIC_TASK_THREAD_POOL("storage_medium_migrate", config::storage_medium_migrate_count,
config::storage_medium_migrate_count, std::numeric_limits<int>::max(),
_thread_pool_storage_medium_migrate);

BUILD_DYNAMIC_TASK_THREAD_POOL("check_consistency", config::check_consistency_worker_count,
config::check_consistency_worker_count, std::numeric_limits<int>::max(),
_thread_pool_check_consistency);
BUILD_DYNAMIC_TASK_THREAD_POOL("check_consistency", config::check_consistency_worker_count,
config::check_consistency_worker_count, std::numeric_limits<int>::max(),
_thread_pool_check_consistency);

BUILD_DYNAMIC_TASK_THREAD_POOL("upload", config::upload_worker_count, config::upload_worker_count,
std::numeric_limits<int>::max(), _thread_pool_upload);
BUILD_DYNAMIC_TASK_THREAD_POOL("upload", config::upload_worker_count, config::upload_worker_count,
std::numeric_limits<int>::max(), _thread_pool_upload);

BUILD_DYNAMIC_TASK_THREAD_POOL("download", config::download_worker_count, config::download_worker_count,
std::numeric_limits<int>::max(), _thread_pool_download);
BUILD_DYNAMIC_TASK_THREAD_POOL("download", config::download_worker_count, config::download_worker_count,
std::numeric_limits<int>::max(), _thread_pool_download);

BUILD_DYNAMIC_TASK_THREAD_POOL("make_snapshot", config::make_snapshot_worker_count,
config::make_snapshot_worker_count, std::numeric_limits<int>::max(),
_thread_pool_make_snapshot);
BUILD_DYNAMIC_TASK_THREAD_POOL("make_snapshot", config::make_snapshot_worker_count,
config::make_snapshot_worker_count, std::numeric_limits<int>::max(),
_thread_pool_make_snapshot);

BUILD_DYNAMIC_TASK_THREAD_POOL("release_snapshot", config::release_snapshot_worker_count,
config::release_snapshot_worker_count, std::numeric_limits<int>::max(),
_thread_pool_release_snapshot);
BUILD_DYNAMIC_TASK_THREAD_POOL("release_snapshot", config::release_snapshot_worker_count,
config::release_snapshot_worker_count, std::numeric_limits<int>::max(),
_thread_pool_release_snapshot);

BUILD_DYNAMIC_TASK_THREAD_POOL("move_dir", 1, 1, std::numeric_limits<int>::max(), _thread_pool_move_dir);
BUILD_DYNAMIC_TASK_THREAD_POOL("move_dir", 1, 1, std::numeric_limits<int>::max(), _thread_pool_move_dir);

BUILD_DYNAMIC_TASK_THREAD_POOL("update_tablet_meta_info", 1, 1, std::numeric_limits<int>::max(),
_thread_pool_update_tablet_meta_info);
BUILD_DYNAMIC_TASK_THREAD_POOL("update_tablet_meta_info", 1, 1, std::numeric_limits<int>::max(),
_thread_pool_update_tablet_meta_info);

BUILD_DYNAMIC_TASK_THREAD_POOL("drop_auto_increment_map_dir", 1, 1, std::numeric_limits<int>::max(),
_thread_pool_drop_auto_increment_map);
BUILD_DYNAMIC_TASK_THREAD_POOL("drop_auto_increment_map_dir", 1, 1, std::numeric_limits<int>::max(),
_thread_pool_drop_auto_increment_map);

#ifndef BE_TEST
// Currently FE can have at most num_of_storage_path * schedule_slot_num_per_path(default 2) clone tasks
// scheduled simultaneously, but previously we have only 3 clone worker threads by default,
// so this is to keep the dop of clone task handling in sync with FE.
//
// TODO(shangyiming): using dynamic thread pool to handle task directly instead of using TaskThreadPool
// Currently, the task submission and processing logic is deeply coupled with TaskThreadPool, change that will
// need to modify many interfaces. So for now we still use TaskThreadPool to submit clone tasks, but with
// only a single worker thread, then we use dynamic thread pool to handle the task concurrently in clone task
// callback, so that we can match the dop of FE clone task scheduling.
BUILD_DYNAMIC_TASK_THREAD_POOL("clone", MIN_CLONE_TASK_THREADS_IN_POOL,
_exec_env->store_paths().size() * config::parallel_clone_task_per_path,
DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE, _thread_pool_clone);
// Currently FE can have at most num_of_storage_path * schedule_slot_num_per_path(default 2) clone tasks
// scheduled simultaneously, but previously we have only 3 clone worker threads by default,
// so this is to keep the dop of clone task handling in sync with FE.
//
// TODO(shangyiming): using dynamic thread pool to handle task directly instead of using TaskThreadPool
// Currently, the task submission and processing logic is deeply coupled with TaskThreadPool, change that will
// need to modify many interfaces. So for now we still use TaskThreadPool to submit clone tasks, but with
// only a single worker thread, then we use dynamic thread pool to handle the task concurrently in clone task
// callback, so that we can match the dop of FE clone task scheduling.
BUILD_DYNAMIC_TASK_THREAD_POOL("clone", MIN_CLONE_TASK_THREADS_IN_POOL,
_exec_env->store_paths().size() * config::parallel_clone_task_per_path,
DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE, _thread_pool_clone);
#endif

// It is the same code to create workers of each type, so we use a macro
// to make code to be more readable.
// It is the same code to create workers of each type, so we use a macro
// to make code to be more readable.
#ifndef BE_TEST
#define CREATE_AND_START_POOL(pool_name, CLASS_NAME, worker_num) \
pool_name.reset(new CLASS_NAME(_exec_env, worker_num)); \
Expand All @@ -228,54 +228,53 @@ void AgentServer::Impl::init_or_die() {
#define CREATE_AND_START_POOL(pool_name, CLASS_NAME, worker_num)
#endif // BE_TEST

CREATE_AND_START_POOL(_publish_version_workers, PublishVersionTaskWorkerPool, base::NumCPUs())
// Both PUSH and REALTIME_PUSH type use _push_workers
CREATE_AND_START_POOL(_push_workers, PushTaskWorkerPool,
config::push_worker_count_high_priority + config::push_worker_count_normal_priority)
CREATE_AND_START_POOL(_delete_workers, DeleteTaskWorkerPool,
config::delete_worker_count_normal_priority + config::delete_worker_count_high_priority)
CREATE_AND_START_POOL(_report_task_workers, ReportTaskWorkerPool, REPORT_TASK_WORKER_COUNT)
CREATE_AND_START_POOL(_report_disk_state_workers, ReportDiskStateTaskWorkerPool, REPORT_DISK_STATE_WORKER_COUNT)
CREATE_AND_START_POOL(_report_tablet_workers, ReportOlapTableTaskWorkerPool, REPORT_OLAP_TABLE_WORKER_COUNT)
CREATE_AND_START_POOL(_report_workgroup_workers, ReportWorkgroupTaskWorkerPool, REPORT_WORKGROUP_WORKER_COUNT)
}
CREATE_AND_START_POOL(_publish_version_workers, PublishVersionTaskWorkerPool, base::NumCPUs())
// Both PUSH and REALTIME_PUSH type use _push_workers
CREATE_AND_START_POOL(_push_workers, PushTaskWorkerPool,
config::push_worker_count_high_priority + config::push_worker_count_normal_priority)
CREATE_AND_START_POOL(_delete_workers, DeleteTaskWorkerPool,
config::delete_worker_count_normal_priority + config::delete_worker_count_high_priority)
CREATE_AND_START_POOL(_report_task_workers, ReportTaskWorkerPool, REPORT_TASK_WORKER_COUNT)
CREATE_AND_START_POOL(_report_disk_state_workers, ReportDiskStateTaskWorkerPool, REPORT_DISK_STATE_WORKER_COUNT)
CREATE_AND_START_POOL(_report_tablet_workers, ReportOlapTableTaskWorkerPool, REPORT_OLAP_TABLE_WORKER_COUNT)
CREATE_AND_START_POOL(_report_workgroup_workers, ReportWorkgroupTaskWorkerPool, REPORT_WORKGROUP_WORKER_COUNT)

CREATE_AND_START_POOL(_report_resource_usage_workers, ReportResourceUsageTaskWorkerPool,
REPORT_RESOURCE_USAGE_WORKER_COUNT)
#undef CREATE_AND_START_POOL
}

void AgentServer::Impl::stop() {
if (!_is_compute_node) {
_thread_pool_publish_version->shutdown();
_thread_pool_drop->shutdown();
_thread_pool_create_tablet->shutdown();
_thread_pool_alter_tablet->shutdown();
_thread_pool_clear_transaction->shutdown();
_thread_pool_storage_medium_migrate->shutdown();
_thread_pool_check_consistency->shutdown();
_thread_pool_upload->shutdown();
_thread_pool_download->shutdown();
_thread_pool_make_snapshot->shutdown();
_thread_pool_release_snapshot->shutdown();
_thread_pool_move_dir->shutdown();
_thread_pool_update_tablet_meta_info->shutdown();
_thread_pool_drop_auto_increment_map->shutdown();
_thread_pool_publish_version->shutdown();
_thread_pool_drop->shutdown();
_thread_pool_create_tablet->shutdown();
_thread_pool_alter_tablet->shutdown();
_thread_pool_clear_transaction->shutdown();
_thread_pool_storage_medium_migrate->shutdown();
_thread_pool_check_consistency->shutdown();
_thread_pool_upload->shutdown();
_thread_pool_download->shutdown();
_thread_pool_make_snapshot->shutdown();
_thread_pool_release_snapshot->shutdown();
_thread_pool_move_dir->shutdown();
_thread_pool_update_tablet_meta_info->shutdown();
_thread_pool_drop_auto_increment_map->shutdown();

#ifndef BE_TEST
_thread_pool_clone->shutdown();
_thread_pool_clone->shutdown();
#define STOP_POOL(type, pool_name) pool_name->stop();
#else
#define STOP_POOL(type, pool_name)
#endif // BE_TEST
STOP_POOL(PUBLISH_VERSION, _publish_version_workers);
// Both PUSH and REALTIME_PUSH type use _push_workers
STOP_POOL(PUSH, _push_workers);
STOP_POOL(DELETE, _delete_workers);
STOP_POOL(REPORT_TASK, _report_task_workers);
STOP_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
STOP_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
STOP_POOL(REPORT_WORKGROUP, _report_workgroup_workers);
}
STOP_POOL(PUBLISH_VERSION, _publish_version_workers);
// Both PUSH and REALTIME_PUSH type use _push_workers
STOP_POOL(PUSH, _push_workers);
STOP_POOL(DELETE, _delete_workers);
STOP_POOL(REPORT_TASK, _report_task_workers);
STOP_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
STOP_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
STOP_POOL(REPORT_WORKGROUP, _report_workgroup_workers);

STOP_POOL(REPORT_WORKGROUP, _report_resource_usage_workers);
#undef STOP_POOL
}
Expand All @@ -285,6 +284,9 @@ AgentServer::Impl::~Impl() {}
// TODO(lingbin): each task in the batch may have it own status or FE must check and
// resend request when something is wrong(BE may need some logic to guarantee idempotence.
void AgentServer::Impl::submit_tasks(TAgentResult& agent_result, const std::vector<TAgentTaskRequest>& tasks) {
// for debug
LOG(INFO) << "enter submit_tasks";
abc982627271 marked this conversation as resolved.
Show resolved Hide resolved

Status ret_st;
auto master_address = get_master_address();
if (master_address.hostname.empty() || master_address.port == 0) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ void run_drop_tablet_task(const std::shared_ptr<DropTabletAgentTaskRequest>& age
}

void run_create_tablet_task(const std::shared_ptr<CreateTabletAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
// for debug
LOG(INFO) << "enter run_create_tablet_task";

const auto& create_tablet_req = agent_task_req->task_req;
TFinishTaskRequest finish_task_request;
TStatusCode::type status_code = TStatusCode::OK;
Expand All @@ -185,6 +188,7 @@ void run_create_tablet_task(const std::shared_ptr<CreateTabletAgentTaskRequest>&
} else {
create_status = StorageEngine::instance()->create_tablet(create_tablet_req);
}

if (!create_status.ok()) {
LOG(WARNING) << "create table failed. status: " << create_status.to_string()
<< ", signature: " << agent_task_req->signature;
Expand All @@ -209,6 +213,9 @@ void run_create_tablet_task(const std::shared_ptr<CreateTabletAgentTaskRequest>&
tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
}

// for debug
LOG(INFO) << "ready to finish task to fe";
abc982627271 marked this conversation as resolved.
Show resolved Hide resolved

unify_finish_agent_task(status_code, error_msgs, agent_task_req->task_type, agent_task_req->signature, true);
}

Expand Down
46 changes: 22 additions & 24 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,36 +313,34 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {

// it means acting as compute node while store_path is empty. some threads are not needed for that case.
bool is_compute_node = store_paths.empty();
if (!is_compute_node) {
Status status = _load_path_mgr->init();
if (!status.ok()) {
LOG(ERROR) << "load path mgr init failed." << status.get_error_msg();
exit(-1);
}
Status status = _load_path_mgr->init();
if (!status.ok()) {
LOG(ERROR) << "load path mgr init failed." << status.get_error_msg();
exit(-1);
}
#if defined(USE_STAROS) && !defined(BE_TEST)
_lake_location_provider = new lake::StarletLocationProvider();
_lake_update_manager = new lake::UpdateManager(_lake_location_provider, update_mem_tracker());
_lake_tablet_manager = new lake::TabletManager(_lake_location_provider, _lake_update_manager,
config::lake_metadata_cache_limit);
if (config::starlet_cache_dir.empty()) {
std::vector<std::string> starlet_cache_paths;
std::for_each(store_paths.begin(), store_paths.end(), [&](StorePath root_path) {
std::string starlet_cache_path = root_path.path + "/starlet_cache";
starlet_cache_paths.emplace_back(starlet_cache_path);
});
config::starlet_cache_dir = JoinStrings(starlet_cache_paths, ":");
}
_lake_location_provider = new lake::StarletLocationProvider();
_lake_update_manager = new lake::UpdateManager(_lake_location_provider, update_mem_tracker());
_lake_tablet_manager =
new lake::TabletManager(_lake_location_provider, _lake_update_manager, config::lake_metadata_cache_limit);
if (config::starlet_cache_dir.empty()) {
std::vector<std::string> starlet_cache_paths;
std::for_each(store_paths.begin(), store_paths.end(), [&](StorePath root_path) {
std::string starlet_cache_path = root_path.path + "/starlet_cache";
starlet_cache_paths.emplace_back(starlet_cache_path);
});
config::starlet_cache_dir = JoinStrings(starlet_cache_paths, ":");
}
#elif defined(BE_TEST)
_lake_location_provider = new lake::FixedLocationProvider(_store_paths.front().path);
_lake_update_manager = new lake::UpdateManager(_lake_location_provider, update_mem_tracker());
_lake_tablet_manager = new lake::TabletManager(_lake_location_provider, _lake_update_manager,
config::lake_metadata_cache_limit);
_lake_location_provider = new lake::FixedLocationProvider(_store_paths.front().path);
_lake_update_manager = new lake::UpdateManager(_lake_location_provider, update_mem_tracker());
_lake_tablet_manager =
new lake::TabletManager(_lake_location_provider, _lake_update_manager, config::lake_metadata_cache_limit);
#endif

#if defined(USE_STAROS) && !defined(BE_TEST)
_lake_tablet_manager->start_gc();
_lake_tablet_manager->start_gc();
#endif
}

_agent_server = new AgentServer(this, is_compute_node);
_agent_server->init_or_die();
Expand Down
Loading