Skip to content

Commit

Permalink
[Enhancement] Support sync publish version for primary key table (Sta…
Browse files Browse the repository at this point in the history
…rRocks#27055)

The pr refactor publish version mechanism and BE will return publish version request success until all version is queryable. The main improvement is to support sync publish a version for the primary key table and we will query the specified version data immediately if we return publish version success.

---------

Signed-off-by: zhangqiang <qiangzh95@gmail.com>
  • Loading branch information
sevev authored Aug 16, 2023
1 parent 8c358cb commit 519ef2c
Show file tree
Hide file tree
Showing 38 changed files with 851 additions and 29 deletions.
1 change: 0 additions & 1 deletion be/src/agent/finish_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ namespace starrocks {
class TFinishTaskRequest;

void finish_task(const TFinishTaskRequest& finish_task_request);

} // namespace starrocks
10 changes: 9 additions & 1 deletion be/src/agent/publish_version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
span->SetAttribute("txn_id", transaction_id);
auto scoped = trace::Scope(span);

bool enable_sync_publish = publish_version_req.enable_sync_publish;
size_t num_partition = publish_version_req.partition_version_infos.size();
size_t num_active_tablet = 0;
std::vector<std::map<TabletInfo, RowsetSharedPtr>> partitions(num_partition);
Expand All @@ -72,6 +73,7 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
span->SetAttribute("num_tablet", num_active_tablet);
std::vector<TabletPublishVersionTask> tablet_tasks(num_active_tablet);
size_t tablet_idx = 0;

for (size_t i = 0; i < publish_version_req.partition_version_infos.size(); i++) {
for (auto& itr : partitions[i]) {
auto& task = tablet_tasks[tablet_idx++];
Expand Down Expand Up @@ -150,13 +152,18 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
finish_task.__isset.tablet_versions = true;
auto& error_tablet_ids = finish_task.error_tablet_ids;
auto& tablet_versions = finish_task.tablet_versions;
auto& tablet_publish_versions = finish_task.tablet_publish_versions;
tablet_versions.reserve(tablet_tasks.size());
for (auto& task : tablet_tasks) {
if (!task.st.ok()) {
error_tablet_ids.push_back(task.tablet_id);
if (st.ok()) {
st = task.st;
}
} else {
auto& pair = tablet_publish_versions.emplace_back();
pair.__set_tablet_id(task.tablet_id);
pair.__set_version(task.version);
}
}
// return tablet and its version which has already finished.
Expand All @@ -173,7 +180,8 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
LOG(WARNING) << fmt::format("publish_version tablet not found tablet_id: {}, version: {} txn_id: {}",
tablet_info.tablet_id, partition_version.version, transaction_id);
} else {
const int64_t max_continuous_version = tablet->max_continuous_version();
const int64_t max_continuous_version =
enable_sync_publish ? tablet->max_continuous_version() : tablet->max_readable_version();
if (max_continuous_version > 0) {
auto& pair = tablet_versions.emplace_back();
pair.__set_tablet_id(tablet_info.tablet_id);
Expand Down
54 changes: 38 additions & 16 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "storage/data_dir.h"
#include "storage/lake/tablet_manager.h"
#include "storage/olap_common.h"
#include "storage/publish_version_manager.h"
#include "storage/snapshot_manager.h"
#include "storage/storage_engine.h"
#include "storage/task/engine_alter_tablet_task.h"
Expand Down Expand Up @@ -515,6 +516,10 @@ void* PublishVersionTaskWorkerPool::_worker_thread_callback(void* arg_this) {
const auto& publish_version_task = *priority_tasks.top();
LOG(INFO) << "get publish version task txn_id: " << publish_version_task.task_req.transaction_id
<< " priority queue size: " << priority_tasks.size();
bool enable_sync_publish = publish_version_task.task_req.enable_sync_publish;
if (enable_sync_publish) {
wait_time = 0;
}
StarRocksMetrics::instance()->publish_task_request_total.increment(1);
auto& finish_task_request = finish_task_requests.emplace_back();
finish_task_request.__set_backend(BackendOptions::get_localBackend());
Expand All @@ -528,23 +533,40 @@ void* PublishVersionTaskWorkerPool::_worker_thread_callback(void* arg_this) {
batch_publish_latency += MonotonicMillis() - start_ts;
priority_tasks.pop();

if (priority_tasks.empty() || finish_task_requests.size() > PUBLISH_VERSION_BATCH_SIZE ||
batch_publish_latency > config::max_batch_publish_latency_ms) {
int64_t t0 = MonotonicMillis();
StorageEngine::instance()->txn_manager()->flush_dirs(affected_dirs);
int64_t t1 = MonotonicMillis();
// notify FE when all tasks of group have been finished.
for (auto& finish_task_request : finish_task_requests) {
finish_task(finish_task_request);
remove_task_info(finish_task_request.task_type, finish_task_request.signature);
if (!enable_sync_publish) {
if (priority_tasks.empty() || finish_task_requests.size() > PUBLISH_VERSION_BATCH_SIZE ||
batch_publish_latency > config::max_batch_publish_latency_ms) {
int64_t t0 = MonotonicMillis();
StorageEngine::instance()->txn_manager()->flush_dirs(affected_dirs);
int64_t t1 = MonotonicMillis();
// notify FE when all tasks of group have been finished.
for (auto& finish_task_request : finish_task_requests) {
finish_task(finish_task_request);
remove_task_info(finish_task_request.task_type, finish_task_request.signature);
}
int64_t t2 = MonotonicMillis();
LOG(INFO) << "batch flush " << finish_task_requests.size()
<< " txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0
<< "ms finish_task_rpc:" << t2 - t1 << "ms";
finish_task_requests.clear();
affected_dirs.clear();
batch_publish_latency = 0;
}
} else {
if (priority_tasks.empty() || finish_task_requests.size() > PUBLISH_VERSION_BATCH_SIZE ||
batch_publish_latency > config::max_batch_publish_latency_ms) {
int64_t finish_task_size = finish_task_requests.size();
int64_t t0 = MonotonicMillis();
StorageEngine::instance()->txn_manager()->flush_dirs(affected_dirs);
int64_t t1 = MonotonicMillis();
StorageEngine::instance()->publish_version_manager()->wait_publish_task_apply_finish(
std::move(finish_task_requests));
StorageEngine::instance()->wake_finish_publish_vesion_thread();
affected_dirs.clear();
batch_publish_latency = 0;
LOG(INFO) << "batch submit " << finish_task_size << " finish publish version task "
<< "txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0 << "ms";
}
int64_t t2 = MonotonicMillis();
LOG(INFO) << "batch flush " << finish_task_requests.size()
<< " txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0
<< "ms finish_task_rpc:" << t2 - t1 << "ms";
finish_task_requests.clear();
affected_dirs.clear();
batch_publish_latency = 0;
}
}
return nullptr;
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,9 @@ CONF_mInt32(primary_key_limit_size, "128");
CONF_mBool(enable_short_key_for_one_column_filter, "false");

CONF_mBool(enable_http_stream_load_limit, "false");
CONF_mInt32(finish_publish_version_internal, "100");

CONF_mInt32(get_txn_status_internal_sec, "30");

CONF_mBool(dump_metrics_with_bvar, "true");

Expand Down
3 changes: 0 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;

return Status::OK();
}

Expand Down Expand Up @@ -324,7 +323,6 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
int64_t begin_txn_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx));
ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;

// process put file
return _process_put(http_req, ctx);
}
Expand Down Expand Up @@ -606,7 +604,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
}
ctx->put_result.params.query_options.mem_limit = exec_mem_limit;
}

return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class StreamLoadContext {

TStreamLoadPutRequest request;

int64_t load_deadline_sec = -1;
std::unique_ptr<ConcurrentLimiterGuard> _http_limiter_guard;

public:
Expand Down
31 changes: 31 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
}
ctx->txn_id = result.txnId;
ctx->need_rollback = true;
ctx->load_deadline_sec = UnixSeconds() + result.timeout;

return Status::OK();
}
Expand Down Expand Up @@ -230,6 +231,36 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
LOG(WARNING) << "commit transaction failed, errmsg=" << status.get_error_msg() << ctx->brief();
if (status.code() == TStatusCode::PUBLISH_TIMEOUT) {
ctx->need_rollback = false;
if (ctx->load_deadline_sec > UnixSeconds()) {
//wait for apply finish
TGetLoadTxnStatusRequest v_request;
TGetLoadTxnStatusResult v_result;
set_request_auth(&v_request, ctx->auth);
v_request.db = ctx->db;
v_request.tbl = ctx->table;
v_request.txnId = ctx->txn_id;
while (ctx->load_deadline_sec > UnixSeconds()) {
sleep(std::min((int64_t)config::get_txn_status_internal_sec,
ctx->load_deadline_sec - UnixSeconds()));
auto visiable_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&v_request, &v_result](FrontendServiceConnection& client) {
client->getLoadTxnStatus(v_result, v_request);
},
config::txn_commit_rpc_timeout_ms);
if (!visiable_st.ok()) {
return status;
} else {
if (v_result.status == TTransactionStatus::VISIBLE) {
return Status::OK();
} else if (v_result.status == TTransactionStatus::COMMITTED) {
continue;
} else {
return status;
}
}
}
}
}
return status;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ add_library(Storage STATIC
metadata_util.cpp
kv_store.cpp
local_tablet_reader.cpp
publish_version_manager.cpp
olap_common.cpp
olap_server.cpp
options.cpp
Expand Down
26 changes: 26 additions & 0 deletions be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "storage/olap_common.h"
#include "storage/olap_define.h"
#include "storage/persistent_index_compaction_manager.h"
#include "storage/publish_version_manager.h"
#include "storage/storage_engine.h"
#include "storage/tablet_manager.h"
#include "storage/update_manager.h"
Expand Down Expand Up @@ -95,6 +96,10 @@ Status StorageEngine::start_bg_threads() {
_pk_index_major_compaction_thread = std::thread([this] { _pk_index_major_compaction_thread_callback(nullptr); });
Thread::set_thread_name(_pk_index_major_compaction_thread, "pk_index_compaction_scheduler");

// start thread for check finish publish version
_finish_publish_version_thread = std::thread([this] { _finish_publish_version_thread_callback(nullptr); });
Thread::set_thread_name(_finish_publish_version_thread, "finish_publish_version");

// convert store map to vector
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
Expand Down Expand Up @@ -608,6 +613,27 @@ void* StorageEngine::_disk_stat_monitor_thread_callback(void* arg) {
return nullptr;
}

void* StorageEngine::_finish_publish_version_thread_callback(void* arg) {
while (!_bg_worker_stopped.load(std::memory_order_consume)) {
int32_t interval = config::finish_publish_version_internal;
{
std::unique_lock<std::mutex> wl(_finish_publish_version_mutex);
while (!_publish_version_manager->has_pending_task() &&
!_bg_worker_stopped.load(std::memory_order_consume)) {
_finish_publish_version_cv.wait(wl);
}
_publish_version_manager->finish_publish_version_task();
if (interval <= 0) {
LOG(WARNING) << "finish_publish_version_internal config is illegal: " << interval << ", force set to 1";
interval = 1000;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}

return nullptr;
}

void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* data_dir,
const std::pair<int32_t, int32_t>& tablet_shards_range) {
#ifdef GOOGLE_PROFILER
Expand Down
Loading

0 comments on commit 519ef2c

Please sign in to comment.