Skip to content

Commit

Permalink
[Enhancement] Write combined txn log parallel (StarRocks#55143)
Browse files Browse the repository at this point in the history
Signed-off-by: sevev <qiangzh95@gmail.com>
  • Loading branch information
sevev authored Jan 26, 2025
1 parent ab228aa commit af3d78f
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 0 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1565,4 +1565,6 @@ CONF_mBool(avro_ignore_union_type_tag, "false");
// default batch size for simdjson lib
CONF_mInt32(json_parse_many_batch_size, "1000000");
CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true");
CONF_mInt32(put_combined_txn_log_thread_pool_num_max, "64");
CONF_mBool(enable_put_combinded_txn_log_parallel, "false");
} // namespace starrocks::config
4 changes: 4 additions & 0 deletions be/src/exec/tablet_sink_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ bool TabletSinkSender::get_immutable_partition_ids(std::set<int64_t>* partition_
}

Status TabletSinkSender::_write_combined_txn_log() {
if (config::enable_put_combinded_txn_log_parallel) {
return write_combined_txn_log_parallel(_txn_log_map);
}

for (const auto& [partition_id, logs] : _txn_log_map) {
(void)partition_id;
RETURN_IF_ERROR(write_combined_txn_log(logs));
Expand Down
54 changes: 54 additions & 0 deletions be/src/exec/write_combined_txn_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

#include "exec/write_combined_txn_log.h"

#include <vector>

#include "runtime/exec_env.h"
#include "storage/lake/tablet_manager.h"
#include "util/countdown_latch.h"

namespace starrocks {

Expand All @@ -24,4 +27,55 @@ Status write_combined_txn_log(const CombinedTxnLogPB& logs) {
return tablet_mgr->put_combined_txn_log(logs);
}

void mark_failure(const Status& status, std::atomic<bool>* has_error, Status* final_status) {
if (!has_error->load()) {
if (!has_error->exchange(true)) {
if (final_status->ok()) {
*final_status = status;
}
}
}
}

std::function<void()> create_txn_log_task(const CombinedTxnLogPB* logs, lake::TabletManager* tablet_mgr,
std::atomic<bool>* has_error, Status* final_status) {
return [logs, tablet_mgr, has_error, final_status]() {
try {
Status status = tablet_mgr->put_combined_txn_log(*logs);
if (!status.ok()) {
throw std::runtime_error("Txn log write failed");
}
} catch (const std::exception& e) {
mark_failure(Status::IOError(e.what()), has_error, final_status);
} catch (...) {
mark_failure(Status::Unknown("Unknown exception in write combined txn log task"), has_error, final_status);
}
};
}

Status write_combined_txn_log_parallel(const std::map<int64_t, CombinedTxnLogPB>& txn_log_map) {
CountDownLatch latch(txn_log_map.size());
std::atomic<bool> has_error(false);
Status final_status;
{
std::vector<std::shared_ptr<AutoCleanRunnable>> tasks;
for (const auto& [partition_id, logs] : txn_log_map) {
auto task_logic = create_txn_log_task(&logs, ExecEnv::GetInstance()->lake_tablet_manager(), &has_error,
&final_status);
auto task = std::make_shared<AutoCleanRunnable>(std::move(task_logic), [&latch]() { latch.count_down(); });
tasks.emplace_back(std::move(task));
}
for (const auto& task : tasks) {
Status submit_status = ExecEnv::GetInstance()->put_combined_txn_log_thread_pool()->submit(task);
if (!submit_status.ok()) {
mark_failure(submit_status, &has_error, &final_status);
break;
}
}
}

latch.wait();
return has_error.load() ? final_status : Status::OK();
}

} // namespace starrocks
1 change: 1 addition & 0 deletions be/src/exec/write_combined_txn_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ namespace starrocks {
class CombinedTxnLogPB;

Status write_combined_txn_log(const CombinedTxnLogPB& logs);
Status write_combined_txn_log_parallel(const std::map<int64_t, CombinedTxnLogPB>& txn_log_map);

} // namespace starrocks
10 changes: 10 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {

std::unique_ptr<ThreadPool> load_rowset_pool;
std::unique_ptr<ThreadPool> load_segment_pool;
std::unique_ptr<ThreadPool> put_combined_txn_log_thread_pool;
RETURN_IF_ERROR(
ThreadPoolBuilder("load_rowset_pool")
.set_min_threads(0)
Expand All @@ -501,6 +502,14 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
_load_segment_thread_pool = load_segment_pool.release();

_broker_mgr = new BrokerMgr(this);

RETURN_IF_ERROR(ThreadPoolBuilder("put_combined_txn_log_thread_pool")
.set_min_threads(0)
.set_max_threads(config::put_combined_txn_log_thread_pool_num_max)
.set_idle_timeout(MonoDelta::FromMilliseconds(500))
.build(&put_combined_txn_log_thread_pool));
_put_combined_txn_log_thread_pool = put_combined_txn_log_thread_pool.release();

#ifndef BE_TEST
_bfd_parser = BfdParser::create();
#endif
Expand Down Expand Up @@ -752,6 +761,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_lake_update_manager);
SAFE_DELETE(_lake_replication_txn_manager);
SAFE_DELETE(_cache_mgr);
SAFE_DELETE(_put_combined_txn_log_thread_pool);
_dictionary_cache_pool.reset();
_automatic_partition_pool.reset();
_metrics = nullptr;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ class ExecEnv {
ThreadPool* streaming_load_thread_pool() { return _streaming_load_thread_pool; }
ThreadPool* load_rowset_thread_pool() { return _load_rowset_thread_pool; }
ThreadPool* load_segment_thread_pool() { return _load_segment_thread_pool; }
ThreadPool* put_combined_txn_log_thread_pool() { return _put_combined_txn_log_thread_pool; }

pipeline::DriverExecutor* wg_driver_executor();
workgroup::ScanExecutor* scan_executor();
Expand Down Expand Up @@ -369,6 +370,7 @@ class ExecEnv {

ThreadPool* _load_segment_thread_pool = nullptr;
ThreadPool* _load_rowset_thread_pool = nullptr;
ThreadPool* _put_combined_txn_log_thread_pool = nullptr;

PriorityThreadPool* _udf_call_pool = nullptr;
PriorityThreadPool* _pipeline_prepare_pool = nullptr;
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "storage/rowset/segment.h"
#include "storage/tablet_schema_map.h"
#include "testutil/sync_point.h"
#include "util/failpoint/fail_point.h"
#include "util/raw_container.h"
#include "util/trace.h"

Expand Down Expand Up @@ -458,7 +459,11 @@ Status TabletManager::put_txn_vlog(const TxnLogPtr& log, int64_t version) {
return put_txn_log(log, txn_vlog_location(log->tablet_id(), version));
}

DEFINE_FAIL_POINT(put_combined_txn_log_success);
DEFINE_FAIL_POINT(put_combined_txn_log_fail);
Status TabletManager::put_combined_txn_log(const starrocks::CombinedTxnLogPB& logs) {
FAIL_POINT_TRIGGER_RETURN(put_combined_txn_log_success, Status::OK());
FAIL_POINT_TRIGGER_RETURN(put_combined_txn_log_fail, Status::InternalError("write combined_txn_log_fail"));
if (UNLIKELY(logs.txn_logs_size() == 0)) {
return Status::InvalidArgument("empty CombinedTxnLogPB");
}
Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ set(EXEC_FILES
./storage/lake/lake_persistent_index_test.cpp
./storage/lake/replication_txn_manager_test.cpp
./storage/lake/persistent_index_sstable_test.cpp
./storage/lake/write_combined_txn_log_test.cpp
./block_cache/datacache_utils_test.cpp
./block_cache/block_cache_hit_rate_counter_test.cpp
./util/thrift_rpc_helper_test.cpp
Expand Down
54 changes: 54 additions & 0 deletions be/test/storage/lake/write_combined_txn_log_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/write_combined_txn_log.h"

#include <gtest/gtest.h>

#include <map>

#include "testutil/assert.h"
#include "util/failpoint/fail_point.h"

namespace starrocks::lake {

class WriteCombinedTxnLogTest : public testing::Test {
public:
WriteCombinedTxnLogTest() {}
};

TEST_F(WriteCombinedTxnLogTest, test_write_combined_txn_log_parallel) {
std::map<int64_t, CombinedTxnLogPB> txn_log_map;
size_t N = 2;
for (int64_t i = 0; i < N; i++) {
CombinedTxnLogPB combinde_txn_log_pb;
txn_log_map.insert(std::make_pair(i, std::move(combinde_txn_log_pb)));
}
PFailPointTriggerMode trigger_mode;
trigger_mode.set_mode(FailPointTriggerModeType::ENABLE);
auto fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("put_combined_txn_log_success");
fp->setMode(trigger_mode);
ASSERT_TRUE(write_combined_txn_log_parallel(txn_log_map).ok());
trigger_mode.set_mode(FailPointTriggerModeType::DISABLE);
fp->setMode(trigger_mode);

trigger_mode.set_mode(FailPointTriggerModeType::ENABLE);
fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("put_combined_txn_log_fail");
fp->setMode(trigger_mode);
ASSERT_FALSE(write_combined_txn_log_parallel(txn_log_map).ok());
trigger_mode.set_mode(FailPointTriggerModeType::DISABLE);
fp->setMode(trigger_mode);
}

} // namespace starrocks::lake

0 comments on commit af3d78f

Please sign in to comment.