Skip to content

Commit

Permalink
[Refactor](Status) Refactor the scanner scheduler code make return er…
Browse files Browse the repository at this point in the history
…ror msg means (apache#35286)

## Proposed changes

Before error msg:
```
Failed to submit scanner to scanner pool
```

After error msg:
```
Failed to submit scanner to scanner pool reason:Scan thread pool had shutdown|type 1

```
  • Loading branch information
HappenLee authored May 28, 2024
1 parent 0632309 commit 852a396
Showing 1 changed file with 22 additions and 30 deletions.
52 changes: 22 additions & 30 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,41 +151,33 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,

scanner_delegate->_scanner->start_wait_worker_timer();
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
bool ret = false;
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
if (auto* scan_sched = ctx->get_simple_scan_scheduler()) {
auto sumbit_task = [&]() {
bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL;
auto* scan_sched =
is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler();
auto& thread_pool = is_local ? _local_scan_thread_pool : _remote_scan_thread_pool;
if (scan_sched) {
auto work_func = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret = scan_sched->submit_scan_task(simple_scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
};
task.priority = nice;
ret = _local_scan_thread_pool->offer(task);
}
} else {
if (auto* remote_scan_sched = ctx->get_remote_scan_scheduler()) {
auto work_func = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret = remote_scan_sched->submit_scan_task(simple_scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
};
task.priority = nice;
ret = _remote_scan_thread_pool->offer(task);
return scan_sched->submit_scan_task(simple_scan_task);
}
}
if (!ret) {
scan_task->set_status(
Status::InternalError("Failed to submit scanner to scanner pool"));

PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
};
task.priority = nice;
return thread_pool->offer(task)
? Status::OK()
: Status::InternalError("Scan thread pool had shutdown");
};

if (auto ret = sumbit_task(); !ret) {
scan_task->set_status(Status::InternalError(
"Failed to submit scanner to scanner pool reason:" + std::string(ret.msg()) +
"|type:" + std::to_string(type)));
ctx->append_block_to_queue(scan_task);
return;
}
Expand Down

0 comments on commit 852a396

Please sign in to comment.