Skip to content

Commit

Permalink
[Enhancement]Optimize publish version through batch in SHARED_DATA (#…
Browse files Browse the repository at this point in the history
…28789)

Signed-off-by: lixianhai <lixianhai.lxh@alibaba-inc.com>
  • Loading branch information
smartlxh authored Nov 10, 2023
1 parent 19dc819 commit 9c0d1bc
Show file tree
Hide file tree
Showing 28 changed files with 1,934 additions and 107 deletions.
94 changes: 68 additions & 26 deletions be/src/service/service_be/lake_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,44 @@ void LakeServiceImpl::publish_version(::google::protobuf::RpcController* control
TEST_SYNC_POINT("LakeServiceImpl::publish_version:return");
}

void LakeServiceImpl::_submit_publish_log_version_task(const int64_t* tablet_ids, size_t tablet_size,
const int64_t* txn_ids, const int64_t* log_versions,
size_t txn_size,
::starrocks::lake::PublishLogVersionResponse* response) {
auto thread_pool = publish_version_thread_pool(_env);
auto latch = BThreadCountDownLatch(tablet_size);
bthread::Mutex response_mtx;

for (int i = 0; i < tablet_size; i++) {
auto tablet_id = tablet_ids[i];
auto task = [&, tablet_id]() {
DeferOp defer([&] { latch.count_down(); });
auto st = lake::publish_log_version(_tablet_mgr, tablet_id, txn_ids, log_versions, txn_size);
if (!st.ok()) {
g_publish_version_failed_tasks << 1;
LOG(WARNING) << "Fail to publish log version: " << st << " tablet_id=" << tablet_id
<< " txn_ids=" << JoinElementsIterator(txn_ids, txn_ids + txn_size, ",")
<< " versions=" << JoinElementsIterator(log_versions, log_versions + txn_size, ",");
std::lock_guard l(response_mtx);
response->add_failed_tablets(tablet_id);
}
};

auto st = thread_pool->submit_func(task);
if (!st.ok()) {
g_publish_version_failed_tasks << 1;
LOG(WARNING) << "Fail to submit publish log version task, tablet_id: " << tablet_id
<< ", txn_ids: " << JoinElementsIterator(txn_ids, txn_ids + txn_size, ",")
<< ", versions: " << JoinElementsIterator(log_versions, log_versions + txn_size, ",")
<< ", error" << st;
std::lock_guard l(response_mtx);
response->add_failed_tablets(tablet_id);
latch.count_down();
}
}

latch.wait();
}
void LakeServiceImpl::publish_log_version(::google::protobuf::RpcController* controller,
const ::starrocks::lake::PublishLogVersionRequest* request,
::starrocks::lake::PublishLogVersionResponse* response,
Expand All @@ -236,36 +274,40 @@ void LakeServiceImpl::publish_log_version(::google::protobuf::RpcController* con
return;
}

auto thread_pool = publish_version_thread_pool(_env);
auto latch = BThreadCountDownLatch(request->tablet_ids_size());
bthread::Mutex response_mtx;
auto tablet_ids = request->tablet_ids().data();
int64_t txn_id = request->txn_id();
int64_t version = request->version();

for (auto tablet_id : request->tablet_ids()) {
auto task = [&, tablet_id]() {
DeferOp defer([&] { latch.count_down(); });
auto txn_id = request->txn_id();
auto version = request->version();
auto st = lake::publish_log_version(_tablet_mgr, tablet_id, txn_id, version);
if (!st.ok()) {
g_publish_version_failed_tasks << 1;
LOG(WARNING) << "Fail to publish log version: " << st << " tablet_id=" << tablet_id
<< " txn_id=" << txn_id << " version=" << version;
std::lock_guard l(response_mtx);
response->add_failed_tablets(tablet_id);
}
};
_submit_publish_log_version_task(tablet_ids, request->tablet_ids_size(), &txn_id, &version, 1, response);
}

auto st = thread_pool->submit_func(task);
if (!st.ok()) {
g_publish_version_failed_tasks << 1;
LOG(WARNING) << "Fail to submit publish log version task: " << st;
std::lock_guard l(response_mtx);
response->add_failed_tablets(tablet_id);
latch.count_down();
}
void LakeServiceImpl::publish_log_version_batch(::google::protobuf::RpcController* controller,
const ::starrocks::lake::PublishLogVersionBatchRequest* request,
::starrocks::lake::PublishLogVersionResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard guard(done);
auto cntl = static_cast<brpc::Controller*>(controller);

if (request->tablet_ids_size() == 0) {
cntl->SetFailed("missing tablet_ids");
return;
}
if (request->txn_ids_size() == 0) {
cntl->SetFailed("missing txn_ids");
return;
}
if (request->versions_size() == 0) {
cntl->SetFailed("missing versions");
return;
}

latch.wait();
auto tablet_ids = request->tablet_ids().data();
auto txn_ids = request->txn_ids().data();
auto versions = request->versions().data();
DCHECK_EQ(request->txn_ids_size(), request->versions_size());

_submit_publish_log_version_task(tablet_ids, request->tablet_ids_size(), txn_ids, versions, request->txn_ids_size(),
response);
}

void LakeServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
Expand Down
11 changes: 11 additions & 0 deletions be/src/service/service_be/lake_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#pragma once
#include <span>

#include "gen_cpp/lake_service.pb.h"

Expand Down Expand Up @@ -60,6 +61,11 @@ class LakeServiceImpl : public ::starrocks::lake::LakeService {
::starrocks::lake::PublishLogVersionResponse* response,
::google::protobuf::Closure* done) override;

void publish_log_version_batch(::google::protobuf::RpcController* controller,
const ::starrocks::lake::PublishLogVersionBatchRequest* request,
::starrocks::lake::PublishLogVersionResponse* response,
::google::protobuf::Closure* done) override;

void lock_tablet_metadata(::google::protobuf::RpcController* controller,
const ::starrocks::lake::LockTabletMetadataRequest* request,
::starrocks::lake::LockTabletMetadataResponse* response,
Expand Down Expand Up @@ -91,6 +97,11 @@ class LakeServiceImpl : public ::starrocks::lake::LakeService {
void vacuum_full(::google::protobuf::RpcController* controller, const ::starrocks::lake::VacuumFullRequest* request,
::starrocks::lake::VacuumFullResponse* response, ::google::protobuf::Closure* done) override;

private:
void _submit_publish_log_version_task(const int64_t* tablet_ids, size_t tablet_size, const int64_t* txn_ids,
const int64_t* log_versions, size_t txn_size,
::starrocks::lake::PublishLogVersionResponse* response);

private:
ExecEnv* _env;
lake::TabletManager* _tablet_mgr;
Expand Down
7 changes: 4 additions & 3 deletions be/src/service/service_be/starrocks_be.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ void start_be(const std::vector<StorePath>& paths, bool as_cn) {
}
const auto lake_service_max_concurrency = config::lake_service_max_concurrency;
const auto service_name = "starrocks.lake.LakeService";
const auto methods = {"abort_txn", "abort_compaction", "compact", "drop_table",
"delete_data", "delete_tablet", "get_tablet_stats", "publish_version",
"publish_log_version", "vacuum", "vacuum_full"};
const auto methods = {
"abort_txn", "abort_compaction", "compact", "drop_table", "delete_data",
"delete_tablet", "get_tablet_stats", "publish_version", "publish_log_version", "publish_log_version_batch",
"vacuum", "vacuum_full"};
for (auto method : methods) {
brpc_server->MaxConcurrencyOf(service_name, method) = lake_service_max_concurrency;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/tablet_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ using TabletMetadata = TabletMetadataPB;
using TabletMetadataPtr = std::shared_ptr<const TabletMetadata>;
using MutableTabletMetadataPtr = std::shared_ptr<TabletMetadata>;

} // namespace starrocks::lake
} // namespace starrocks::lake
160 changes: 110 additions & 50 deletions be/src/storage/lake/transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "storage/lake/transactions.h"

#include "fs/fs_util.h"
#include "gutil/strings/join.h"
#include "storage/lake/metacache.h"
#include "storage/lake/tablet.h"
#include "storage/lake/tablet_manager.h"
Expand All @@ -28,10 +29,13 @@ namespace starrocks::lake {
StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t tablet_id, int64_t base_version,
int64_t new_version, std::span<const int64_t> txn_ids,
int64_t commit_time) {
if (txn_ids.size() != 1) {
return Status::NotSupported("does not support publish multiple txns yet");
if (txn_ids.size() > 1) {
CHECK_EQ(new_version, base_version + txn_ids.size());
}

VLOG(1) << "publish version tablet_id: " << tablet_id << ", txns: " << JoinInts(txn_ids, ",")
<< ", base_version: " << base_version << ", new_version: " << new_version;

auto new_version_metadata_or_error = [=](Status error) -> StatusOr<TabletMetadataPtr> {
auto res = tablet_mgr->get_tablet_metadata(tablet_id, new_version);
if (res.ok()) return res;
Expand All @@ -51,42 +55,60 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
}

auto base_metadata = std::move(base_metadata_or).value();
auto new_metadata = std::make_shared<TabletMetadataPB>(*base_metadata);
auto log_applier = new_txn_log_applier(Tablet(tablet_mgr, tablet_id), new_metadata, new_version);

if (new_metadata->compaction_inputs_size() > 0) {
new_metadata->mutable_compaction_inputs()->Clear();
}

if (new_metadata->orphan_files_size() > 0) {
new_metadata->mutable_orphan_files()->Clear();
}

if (base_metadata->compaction_inputs_size() > 0 || base_metadata->orphan_files_size() > 0) {
new_metadata->set_prev_garbage_version(base_metadata->version());
}

new_metadata->set_commit_time(commit_time);

auto init_st = log_applier->init();
if (!init_st.ok()) {
if (init_st.is_already_exist()) {
return new_version_metadata_or_error(init_st);
} else {
return init_st;
}
}

std::unique_ptr<TxnLogApplier> log_applier;
std::shared_ptr<TabletMetadataPB> new_metadata;
std::vector<std::string> files_to_delete;

// Apply txn logs
int64_t alter_version = -1;
for (auto txn_id : txn_ids) {
// Do not delete txn logs if txns_size != 1, let vacuum do the work
// If the txn logs are deleted, it will be tricky to handle the situation of batch publish switching to single.

// for example:
// 1. the mode of publish is batch,
// 2. txn2 and txn3 have been published successfully and visible version in FE is updated to 3,
// 3. then txn4 and txn5 are published successfully in BE and the txn_log of txn4 and txn5 have been deleted,
// but FE do not get the response for some reason,
// 4. turn the mode of publish to single,
// 5. txn4 will be published in later publish task, but we can't judge what's the latest_version in BE and we can not reapply txn_log if
// txn logs have been deleted.
bool delete_txn_log = (txn_ids.size() == 1);
for (int i = 0; i < txn_ids.size(); i++) {
auto txn_id = txn_ids[i];
auto log_path = tablet_mgr->txn_log_location(tablet_id, txn_id);
auto txn_log_st = tablet_mgr->get_txn_log(log_path, false);

if (txn_log_st.status().is_not_found()) {
return new_version_metadata_or_error(txn_log_st.status());
if (i == 0) {
// this may happen in two situations
// 1. duplicate publish in mode single
if (txn_ids.size() == 1) {
return new_version_metadata_or_error(txn_log_st.status());
}

// 2. when converting from single publish to batch for txn log has been deleted,
// for example:
// the current mode of publish is single,
// txn2 has been published successfully and visible version in FE is updated to 2,
// then txn3 is published successfully in BE and the txn_log of txn3 has been deleted, but FE do not get the response for some reason,
// turn the mode of publish to batch,
// txn3 ,txn4, txn5 will be published in one publish batch task, so txn3 should be skipped just apply txn_log of txn4 and txn5.
auto missig_txn_log_meta = tablet_mgr->get_tablet_metadata(tablet_id, base_version + 1);
if (missig_txn_log_meta.status().is_not_found()) {
// this should't happen
LOG(WARNING) << "txn_log of txn: " << txn_id << " not found, and can not find the tablet_meta";
return Status::InternalError("Both txn_log and corresponding tablet_meta missing");
} else if (!missig_txn_log_meta.status().ok()) {
LOG(WARNING) << "txn_log of txn: " << txn_id << " not found, find the tablet_meta error: "
<< missig_txn_log_meta.status().to_string();
return new_version_metadata_or_error(missig_txn_log_meta.status());
} else {
base_metadata = std::move(missig_txn_log_meta).value();
continue;
}
} else {
return new_version_metadata_or_error(txn_log_st.status());
}
}

if (!txn_log_st.ok()) {
Expand All @@ -99,13 +121,44 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
alter_version = txn_log->op_schema_change().alter_version();
}

if (log_applier == nullptr) {
// init log_applier
new_metadata = std::make_shared<TabletMetadataPB>(*base_metadata);
log_applier = new_txn_log_applier(Tablet(tablet_mgr, tablet_id), new_metadata, new_version);

if (new_metadata->compaction_inputs_size() > 0) {
new_metadata->mutable_compaction_inputs()->Clear();
}

if (new_metadata->orphan_files_size() > 0) {
new_metadata->mutable_orphan_files()->Clear();
}

if (base_metadata->compaction_inputs_size() > 0 || base_metadata->orphan_files_size() > 0) {
new_metadata->set_prev_garbage_version(base_metadata->version());
}

new_metadata->set_commit_time(commit_time);

auto init_st = log_applier->init();
if (!init_st.ok()) {
if (init_st.is_already_exist()) {
return new_version_metadata_or_error(init_st);
} else {
return init_st;
}
}
}

auto st = log_applier->apply(*txn_log);
if (!st.ok()) {
LOG(WARNING) << "Fail to apply " << log_path << ": " << st;
return st;
}

files_to_delete.emplace_back(log_path);
if (delete_txn_log) {
files_to_delete.emplace_back(log_path);
}

tablet_mgr->metacache()->erase(log_path);
}
Expand Down Expand Up @@ -153,28 +206,35 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
return new_metadata;
}

Status publish_log_version(TabletManager* tablet_mgr, int64_t tablet_id, int64_t txn_id, int64_t log_version) {
auto txn_log_path = tablet_mgr->txn_log_location(tablet_id, txn_id);
auto txn_vlog_path = tablet_mgr->txn_vlog_location(tablet_id, log_version);
// TODO: use rename() API if supported by the underlying filesystem.
auto st = fs::copy_file(txn_log_path, txn_vlog_path);
if (st.is_not_found()) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(txn_vlog_path));
auto check_st = fs->path_exists(txn_vlog_path);
if (check_st.ok()) {
return Status::OK();
} else {
LOG_IF(WARNING, !check_st.is_not_found())
<< "Fail to check the existance of " << txn_vlog_path << ": " << check_st;
Status publish_log_version(TabletManager* tablet_mgr, int64_t tablet_id, const int64_t* txn_ids,
const int64_t* log_versions, int txns_size) {
std::vector<std::string> files_to_delete;
for (int i = 0; i < txns_size; i++) {
auto txn_id = txn_ids[i];
auto log_version = log_versions[i];
auto txn_log_path = tablet_mgr->txn_log_location(tablet_id, txn_id);
auto txn_vlog_path = tablet_mgr->txn_vlog_location(tablet_id, log_version);
// TODO: use rename() API if supported by the underlying filesystem.
auto st = fs::copy_file(txn_log_path, txn_vlog_path);
if (st.is_not_found()) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(txn_vlog_path));
auto check_st = fs->path_exists(txn_vlog_path);
if (check_st.ok()) {
continue;
} else {
LOG_IF(WARNING, !check_st.is_not_found())
<< "Fail to check the existance of " << txn_vlog_path << ": " << check_st;
return st;
}
} else if (!st.ok()) {
return st;
} else {
files_to_delete.emplace_back(txn_log_path);
tablet_mgr->metacache()->erase(txn_log_path);
}
} else if (!st.ok()) {
return st;
} else {
delete_files_async({txn_log_path});
tablet_mgr->metacache()->erase(txn_log_path);
return Status::OK();
}
delete_files_async(std::move(files_to_delete));
return Status::OK();
}

void abort_txn(TabletManager* tablet_mgr, int64_t tablet_id, std::span<const int64_t> txn_ids) {
Expand Down
Loading

0 comments on commit 9c0d1bc

Please sign in to comment.