Skip to content

Commit 946b9db

Browse files
rickifchaoyli
authored andcommitted
BugFix: fatal error when the delta writer is busy (#3878)
This PR makes retry when the AsyncDeltaWriter is busy to avoid the bthread's fatal error.
1 parent cbc44a7 commit 946b9db

File tree

3 files changed

+42
-3
lines changed

3 files changed

+42
-3
lines changed

be/src/storage/async_delta_writer_executor.h

+11-3
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,40 @@
33
#include "common/compiler_util.h"
44
DIAGNOSTIC_PUSH
55
DIAGNOSTIC_IGNORE("-Wclass-memaccess")
6+
#include <bthread/bthread.h>
67
#include <bthread/execution_queue.h>
78
DIAGNOSTIC_POP
89

910
#include "common/config.h"
1011
#include "util/threadpool.h"
1112

1213
namespace starrocks {
14+
const int64_t kRetryIntervalMs = 50;
1315

1416
// Used to run bthread::ExecutionQueue task in pthread instead of bthread.
1517
// Reference: https://github.com/apache/incubator-brpc/blob/master/docs/cn/execution_queue.md
1618
class AsyncDeltaWriterExecutor : public bthread::Executor {
1719
public:
18-
Status init() {
20+
Status init(int max_queue_size = 40960) {
1921
if (_thread_pool != nullptr) {
2022
return Status::InternalError("already initialized");
2123
}
2224
return ThreadPoolBuilder("delta_writer")
2325
.set_min_threads(config::number_tablet_writer_threads / 2)
2426
.set_max_threads(std::max<int>(1, config::number_tablet_writer_threads))
25-
.set_max_queue_size(40960)
27+
.set_max_queue_size(max_queue_size)
2628
.set_idle_timeout(MonoDelta::FromMilliseconds(5 * 60 * 1000))
2729
.build(&_thread_pool);
2830
}
2931

3032
int submit(void* (*fn)(void*), void* args) override {
31-
auto st = _thread_pool->submit_func([=]() { fn(args); });
33+
Status st;
34+
while (true) {
35+
st = _thread_pool->submit_func([=]() { fn(args); });
36+
if (!st.is_service_unavailable()) break;
37+
LOG(INFO) << "async_delta_writer is busy, retry after " << kRetryIntervalMs << "ms";
38+
bthread_usleep(kRetryIntervalMs * 1000);
39+
}
3240
LOG_IF(WARNING, !st.ok()) << st;
3341
return st.ok() ? 0 : -1;
3442
}

be/test/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ set(EXEC_FILES
152152
./storage/rowset/zone_map_index_test.cpp
153153
./storage/rowset/unique_rowset_id_generator_test.cpp
154154
./storage/rowset/index_page_test.cpp
155+
./storage/async_delta_writer_executor_test.cpp
155156
./storage/selection_vector_test.cpp
156157
./storage/snapshot_meta_test.cpp
157158
./storage/short_key_index_test.cpp
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
2+
3+
#include "storage/async_delta_writer_executor.h"
4+
5+
#include <gtest/gtest.h>
6+
7+
#include <vector>
8+
9+
#include "common/config.h"
10+
#include "testutil/parallel_test.h"
11+
#include "util/monotime.h"
12+
13+
namespace starrocks {
14+
15+
PARALLEL_TEST(AsyncDeltaWriterExecutorTest, test_overload) {
16+
AsyncDeltaWriterExecutor executor;
17+
ASSERT_TRUE(executor.init(1).ok());
18+
19+
// submit overloading tasks.
20+
for (size_t i = 0; i < 4 * config::number_tablet_writer_threads; ++i) {
21+
ASSERT_EQ(0, executor.submit(
22+
[](void*) -> void* {
23+
SleepFor(MonoDelta::FromSeconds(1));
24+
return nullptr;
25+
},
26+
nullptr));
27+
}
28+
}
29+
30+
} // namespace starrocks

0 commit comments

Comments
 (0)