Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: sevev <qiangzh95@gmail.com>
  • Loading branch information
sevev committed Jan 23, 2025
1 parent 733b4f0 commit 6ec701a
Showing 1 changed file with 33 additions and 41 deletions.
74 changes: 33 additions & 41 deletions be/src/exec/write_combined_txn_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,60 @@

#include "exec/write_combined_txn_log.h"

#include <future>
#include <vector>

#include "runtime/exec_env.h"
#include "storage/lake/tablet_manager.h"
#include "util/countdown_latch.h"

namespace starrocks {

class TxnLogTask : public Runnable {
public:
TxnLogTask(const CombinedTxnLogPB* logs, lake::TabletManager* tablet_mgr, std::promise<Status> promise)
: _logs(logs), _tablet_mgr(tablet_mgr), _promise(std::move(promise)) {}
Status write_combined_txn_log(const CombinedTxnLogPB& logs) {
auto tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager();
return tablet_mgr->put_combined_txn_log(logs);
}

void mark_failure(const Status& status, std::atomic<bool>* has_error, Status* final_status) {
if (!has_error->load()) {
if (!has_error->exchange(true)) {
if (final_status->ok()) {
*final_status = status;
}
}
}
}

void run() override {
std::function<void()> create_txn_log_task(const CombinedTxnLogPB* logs, lake::TabletManager* tablet_mgr,
std::atomic<bool>* has_error, Status* final_status) {
return [logs, tablet_mgr, has_error, final_status]() {
try {
Status status = _tablet_mgr->put_combined_txn_log(*_logs);
Status status = tablet_mgr->put_combined_txn_log(*logs);
if (!status.ok()) {
throw std::runtime_error("Log write failed");
throw std::runtime_error("Txn log write failed");
}
_promise.set_value(Status::OK());
} catch (const std::exception& e) {
_promise.set_value(Status::IOError(e.what()));
mark_failure(Status::IOError(e.what()), has_error, final_status);
} catch (...) {
mark_failure(Status::Unknown("Unknown exception in write combined txn log task"), has_error, final_status);
}
}

private:
const CombinedTxnLogPB* _logs;
lake::TabletManager* _tablet_mgr;
std::promise<Status> _promise;
};

Status write_combined_txn_log(const CombinedTxnLogPB& logs) {
auto tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager();
return tablet_mgr->put_combined_txn_log(logs);
};
}

Status write_combined_txn_log(const std::map<int64_t, CombinedTxnLogPB>& txn_log_map) {
std::vector<std::future<Status>> futures;
std::vector<std::shared_ptr<Runnable>> tasks;
size_t task_count = txn_log_map.size();
CountDownLatch latch(task_count);
std::atomic<bool> has_error(false);
Status final_status;

for (const auto& [partition_id, logs] : txn_log_map) {
(void)partition_id;
std::promise<Status> promise;
futures.push_back(promise.get_future());
std::shared_ptr<Runnable> r(
std::make_shared<TxnLogTask>(&logs, ExecEnv::GetInstance()->lake_tablet_manager(), std::move(promise)));
tasks.emplace_back(std::move(r));
}

for (auto& task : tasks) {
auto task_logic =
create_txn_log_task(&logs, ExecEnv::GetInstance()->lake_tablet_manager(), &has_error, &final_status);
auto task = std::make_shared<AutoCleanRunnable>(std::move(task_logic), [&latch]() { latch.count_down(); });
RETURN_IF_ERROR(ExecEnv::GetInstance()->put_combined_txn_log_thread_pool()->submit(task));
}

Status st;
for (auto& future : futures) {
Status status = future.get();
if (st.ok() && !status.ok()) {
st = status;
}
}

return st;
latch.wait();
return has_error.load() ? final_status : Status::OK();
}

} // namespace starrocks

0 comments on commit 6ec701a

Please sign in to comment.