From 519ef2ca132cdf95d1d8c80b33c93046ade7b54d Mon Sep 17 00:00:00 2001 From: zhangqiang Date: Wed, 16 Aug 2023 10:05:31 +0800 Subject: [PATCH] [Enhancement] Support sync publish version for primary key table (#27055) The pr refactor publish version mechanism and BE will return publish version request success until all version is queryable. The main improvement is to support sync publish a version for the primary key table and we will query the specified version data immediately if we return publish version success. --------- Signed-off-by: zhangqiang --- be/src/agent/finish_task.h | 1 - be/src/agent/publish_version.cpp | 10 +- be/src/agent/task_worker_pool.cpp | 54 ++-- be/src/common/config.h | 3 + be/src/http/action/stream_load.cpp | 3 - .../runtime/stream_load/stream_load_context.h | 1 + .../stream_load/stream_load_executor.cpp | 31 +++ be/src/storage/CMakeLists.txt | 1 + be/src/storage/olap_server.cpp | 26 ++ be/src/storage/publish_version_manager.cpp | 180 ++++++++++++ be/src/storage/publish_version_manager.h | 55 ++++ be/src/storage/storage_engine.cpp | 7 +- be/src/storage/storage_engine.h | 20 ++ be/src/storage/tablet.cpp | 15 + be/src/storage/tablet.h | 1 + be/src/storage/tablet_updates.cpp | 8 + be/src/storage/tablet_updates.h | 2 + be/test/CMakeLists.txt | 1 + .../storage/publish_version_manager_test.cpp | 262 ++++++++++++++++++ .../java/com/starrocks/common/Config.java | 2 + .../com/starrocks/leader/ReportHandler.java | 9 +- .../starrocks/load/loadv2/TaskAttachment.java | 1 + .../java/com/starrocks/qe/StmtExecutor.java | 4 +- .../com/starrocks/server/GlobalStateMgr.java | 5 + .../service/FrontendServiceImpl.java | 46 +++ .../starrocks/task/PublishVersionTask.java | 5 +- .../transaction/DatabaseTransactionMgr.java | 38 +++ .../transaction/GlobalTransactionMgr.java | 6 + .../transaction/TransactionState.java | 3 +- .../pseudocluster/DecommissionTest.java | 1 + .../pseudocluster/PseudoCluster.java | 2 +- .../pseudocluster/PseudoFrontend.java | 1 + .../service/FrontendServiceImplTest.java | 37 +++ .../DatabaseTransactionMgrTest.java | 8 + gensrc/thrift/AgentService.thrift | 1 + gensrc/thrift/FrontendService.thrift | 18 ++ gensrc/thrift/MasterService.thrift | 2 + gensrc/thrift/Status.thrift | 10 +- 38 files changed, 851 insertions(+), 29 deletions(-) create mode 100644 be/src/storage/publish_version_manager.cpp create mode 100644 be/src/storage/publish_version_manager.h create mode 100644 be/test/storage/publish_version_manager_test.cpp diff --git a/be/src/agent/finish_task.h b/be/src/agent/finish_task.h index 7392945fbabcd..2bd1a8a2da558 100644 --- a/be/src/agent/finish_task.h +++ b/be/src/agent/finish_task.h @@ -5,5 +5,4 @@ namespace starrocks { class TFinishTaskRequest; void finish_task(const TFinishTaskRequest& finish_task_request); - } // namespace starrocks diff --git a/be/src/agent/publish_version.cpp b/be/src/agent/publish_version.cpp index 2a8b76871d59f..5e9136bfcf64a 100644 --- a/be/src/agent/publish_version.cpp +++ b/be/src/agent/publish_version.cpp @@ -60,6 +60,7 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque span->SetAttribute("txn_id", transaction_id); auto scoped = trace::Scope(span); + bool enable_sync_publish = publish_version_req.enable_sync_publish; size_t num_partition = publish_version_req.partition_version_infos.size(); size_t num_active_tablet = 0; std::vector> partitions(num_partition); @@ -72,6 +73,7 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque span->SetAttribute("num_tablet", num_active_tablet); std::vector tablet_tasks(num_active_tablet); size_t tablet_idx = 0; + for (size_t i = 0; i < publish_version_req.partition_version_infos.size(); i++) { for (auto& itr : partitions[i]) { auto& task = tablet_tasks[tablet_idx++]; @@ -150,6 +152,7 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque finish_task.__isset.tablet_versions = true; auto& error_tablet_ids = finish_task.error_tablet_ids; auto& tablet_versions = finish_task.tablet_versions; + auto& tablet_publish_versions = finish_task.tablet_publish_versions; tablet_versions.reserve(tablet_tasks.size()); for (auto& task : tablet_tasks) { if (!task.st.ok()) { @@ -157,6 +160,10 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque if (st.ok()) { st = task.st; } + } else { + auto& pair = tablet_publish_versions.emplace_back(); + pair.__set_tablet_id(task.tablet_id); + pair.__set_version(task.version); } } // return tablet and its version which has already finished. @@ -173,7 +180,8 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque LOG(WARNING) << fmt::format("publish_version tablet not found tablet_id: {}, version: {} txn_id: {}", tablet_info.tablet_id, partition_version.version, transaction_id); } else { - const int64_t max_continuous_version = tablet->max_continuous_version(); + const int64_t max_continuous_version = + enable_sync_publish ? tablet->max_continuous_version() : tablet->max_readable_version(); if (max_continuous_version > 0) { auto& pair = tablet_versions.emplace_back(); pair.__set_tablet_id(tablet_info.tablet_id); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index b4b72a3f1528b..afa00c895663a 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -61,6 +61,7 @@ #include "storage/data_dir.h" #include "storage/lake/tablet_manager.h" #include "storage/olap_common.h" +#include "storage/publish_version_manager.h" #include "storage/snapshot_manager.h" #include "storage/storage_engine.h" #include "storage/task/engine_alter_tablet_task.h" @@ -515,6 +516,10 @@ void* PublishVersionTaskWorkerPool::_worker_thread_callback(void* arg_this) { const auto& publish_version_task = *priority_tasks.top(); LOG(INFO) << "get publish version task txn_id: " << publish_version_task.task_req.transaction_id << " priority queue size: " << priority_tasks.size(); + bool enable_sync_publish = publish_version_task.task_req.enable_sync_publish; + if (enable_sync_publish) { + wait_time = 0; + } StarRocksMetrics::instance()->publish_task_request_total.increment(1); auto& finish_task_request = finish_task_requests.emplace_back(); finish_task_request.__set_backend(BackendOptions::get_localBackend()); @@ -528,23 +533,40 @@ void* PublishVersionTaskWorkerPool::_worker_thread_callback(void* arg_this) { batch_publish_latency += MonotonicMillis() - start_ts; priority_tasks.pop(); - if (priority_tasks.empty() || finish_task_requests.size() > PUBLISH_VERSION_BATCH_SIZE || - batch_publish_latency > config::max_batch_publish_latency_ms) { - int64_t t0 = MonotonicMillis(); - StorageEngine::instance()->txn_manager()->flush_dirs(affected_dirs); - int64_t t1 = MonotonicMillis(); - // notify FE when all tasks of group have been finished. - for (auto& finish_task_request : finish_task_requests) { - finish_task(finish_task_request); - remove_task_info(finish_task_request.task_type, finish_task_request.signature); + if (!enable_sync_publish) { + if (priority_tasks.empty() || finish_task_requests.size() > PUBLISH_VERSION_BATCH_SIZE || + batch_publish_latency > config::max_batch_publish_latency_ms) { + int64_t t0 = MonotonicMillis(); + StorageEngine::instance()->txn_manager()->flush_dirs(affected_dirs); + int64_t t1 = MonotonicMillis(); + // notify FE when all tasks of group have been finished. + for (auto& finish_task_request : finish_task_requests) { + finish_task(finish_task_request); + remove_task_info(finish_task_request.task_type, finish_task_request.signature); + } + int64_t t2 = MonotonicMillis(); + LOG(INFO) << "batch flush " << finish_task_requests.size() + << " txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0 + << "ms finish_task_rpc:" << t2 - t1 << "ms"; + finish_task_requests.clear(); + affected_dirs.clear(); + batch_publish_latency = 0; + } + } else { + if (priority_tasks.empty() || finish_task_requests.size() > PUBLISH_VERSION_BATCH_SIZE || + batch_publish_latency > config::max_batch_publish_latency_ms) { + int64_t finish_task_size = finish_task_requests.size(); + int64_t t0 = MonotonicMillis(); + StorageEngine::instance()->txn_manager()->flush_dirs(affected_dirs); + int64_t t1 = MonotonicMillis(); + StorageEngine::instance()->publish_version_manager()->wait_publish_task_apply_finish( + std::move(finish_task_requests)); + StorageEngine::instance()->wake_finish_publish_vesion_thread(); + affected_dirs.clear(); + batch_publish_latency = 0; + LOG(INFO) << "batch submit " << finish_task_size << " finish publish version task " + << "txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0 << "ms"; } - int64_t t2 = MonotonicMillis(); - LOG(INFO) << "batch flush " << finish_task_requests.size() - << " txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0 - << "ms finish_task_rpc:" << t2 - t1 << "ms"; - finish_task_requests.clear(); - affected_dirs.clear(); - batch_publish_latency = 0; } } return nullptr; diff --git a/be/src/common/config.h b/be/src/common/config.h index 8432c7f3dc3ba..a3831c3b65c77 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1031,6 +1031,9 @@ CONF_mInt32(primary_key_limit_size, "128"); CONF_mBool(enable_short_key_for_one_column_filter, "false"); CONF_mBool(enable_http_stream_load_limit, "false"); +CONF_mInt32(finish_publish_version_internal, "100"); + +CONF_mInt32(get_txn_status_internal_sec, "30"); CONF_mBool(dump_metrics_with_bvar, "true"); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 31ca3e551aa6f..665e21def081d 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -200,7 +200,6 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { int64_t commit_and_publish_start_time = MonotonicNanos(); RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx)); ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; - return Status::OK(); } @@ -324,7 +323,6 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct int64_t begin_txn_start_time = MonotonicNanos(); RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx)); ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time; - // process put file return _process_put(http_req, ctx); } @@ -606,7 +604,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* } ctx->put_result.params.query_options.mem_limit = exec_mem_limit; } - return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 4d84f5a65cc4a..ca8fb6352153a 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -273,6 +273,7 @@ class StreamLoadContext { TStreamLoadPutRequest request; + int64_t load_deadline_sec = -1; std::unique_ptr _http_limiter_guard; public: diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index d204f2988d646..bb3b6e0527b72 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -181,6 +181,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { } ctx->txn_id = result.txnId; ctx->need_rollback = true; + ctx->load_deadline_sec = UnixSeconds() + result.timeout; return Status::OK(); } @@ -230,6 +231,36 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { LOG(WARNING) << "commit transaction failed, errmsg=" << status.get_error_msg() << ctx->brief(); if (status.code() == TStatusCode::PUBLISH_TIMEOUT) { ctx->need_rollback = false; + if (ctx->load_deadline_sec > UnixSeconds()) { + //wait for apply finish + TGetLoadTxnStatusRequest v_request; + TGetLoadTxnStatusResult v_result; + set_request_auth(&v_request, ctx->auth); + v_request.db = ctx->db; + v_request.tbl = ctx->table; + v_request.txnId = ctx->txn_id; + while (ctx->load_deadline_sec > UnixSeconds()) { + sleep(std::min((int64_t)config::get_txn_status_internal_sec, + ctx->load_deadline_sec - UnixSeconds())); + auto visiable_st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&v_request, &v_result](FrontendServiceConnection& client) { + client->getLoadTxnStatus(v_result, v_request); + }, + config::txn_commit_rpc_timeout_ms); + if (!visiable_st.ok()) { + return status; + } else { + if (v_result.status == TTransactionStatus::VISIBLE) { + return Status::OK(); + } else if (v_result.status == TTransactionStatus::COMMITTED) { + continue; + } else { + return status; + } + } + } + } } return status; } diff --git a/be/src/storage/CMakeLists.txt b/be/src/storage/CMakeLists.txt index b97c9dc03c532..876b1d3cc4870 100644 --- a/be/src/storage/CMakeLists.txt +++ b/be/src/storage/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(Storage STATIC metadata_util.cpp kv_store.cpp local_tablet_reader.cpp + publish_version_manager.cpp olap_common.cpp olap_server.cpp options.cpp diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index 685841bac62af..a191d28309f67 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -49,6 +49,7 @@ #include "storage/olap_common.h" #include "storage/olap_define.h" #include "storage/persistent_index_compaction_manager.h" +#include "storage/publish_version_manager.h" #include "storage/storage_engine.h" #include "storage/tablet_manager.h" #include "storage/update_manager.h" @@ -95,6 +96,10 @@ Status StorageEngine::start_bg_threads() { _pk_index_major_compaction_thread = std::thread([this] { _pk_index_major_compaction_thread_callback(nullptr); }); Thread::set_thread_name(_pk_index_major_compaction_thread, "pk_index_compaction_scheduler"); + // start thread for check finish publish version + _finish_publish_version_thread = std::thread([this] { _finish_publish_version_thread_callback(nullptr); }); + Thread::set_thread_name(_finish_publish_version_thread, "finish_publish_version"); + // convert store map to vector std::vector data_dirs; for (auto& tmp_store : _store_map) { @@ -608,6 +613,27 @@ void* StorageEngine::_disk_stat_monitor_thread_callback(void* arg) { return nullptr; } +void* StorageEngine::_finish_publish_version_thread_callback(void* arg) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { + int32_t interval = config::finish_publish_version_internal; + { + std::unique_lock wl(_finish_publish_version_mutex); + while (!_publish_version_manager->has_pending_task() && + !_bg_worker_stopped.load(std::memory_order_consume)) { + _finish_publish_version_cv.wait(wl); + } + _publish_version_manager->finish_publish_version_task(); + if (interval <= 0) { + LOG(WARNING) << "finish_publish_version_internal config is illegal: " << interval << ", force set to 1"; + interval = 1000; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } + + return nullptr; +} + void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* data_dir, const std::pair& tablet_shards_range) { #ifdef GOOGLE_PROFILER diff --git a/be/src/storage/publish_version_manager.cpp b/be/src/storage/publish_version_manager.cpp new file mode 100644 index 0000000000000..c511096a10484 --- /dev/null +++ b/be/src/storage/publish_version_manager.cpp @@ -0,0 +1,180 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "publish_version_manager.h" + +#include "agent/finish_task.h" +#include "agent/task_signatures_manager.h" +#include "common/config.h" +#include "storage/storage_engine.h" +#include "storage/tablet.h" +#include "storage/tablet_manager.h" +#include "util/cpu_info.h" + +namespace starrocks { +const int MIN_FINISH_PUBLISH_WORKER_COUNT = 8; + +Status PublishVersionManager::init() { + int max_thread_count = config::transaction_publish_version_worker_count; + if (max_thread_count <= 0) { + max_thread_count = CpuInfo::num_cores(); + } + max_thread_count = std::max(max_thread_count, MIN_FINISH_PUBLISH_WORKER_COUNT); + RETURN_IF_ERROR(ThreadPoolBuilder("finish_publish_version") + .set_min_threads(MIN_FINISH_PUBLISH_WORKER_COUNT) + .set_max_threads(max_thread_count) + .build(&_finish_publish_version_thread_pool)); + return Status::OK(); +} + +PublishVersionManager::~PublishVersionManager() { + if (_finish_publish_version_thread_pool) { + _finish_publish_version_thread_pool->shutdown(); + } + _finish_task_requests.clear(); + _waitting_finish_task_requests.clear(); + _unapplied_tablet_by_txn.clear(); +} + +// should under lock +bool PublishVersionManager::_all_task_applied(const TFinishTaskRequest& finish_task_request) { + if (finish_task_request.task_status.status_code != TStatusCode::OK) { + return true; + } + auto& tablet_versions = finish_task_request.tablet_publish_versions; + bool all_task_applied = true; + std::set> unapplied_tablet; + for (auto& tablet_version : tablet_versions) { + int64_t tablet_id = tablet_version.tablet_id; + int64_t request_version = tablet_version.version; + + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet != nullptr) { + if (tablet->keys_type() != KeysType::PRIMARY_KEYS) { + return true; + } + if (tablet->max_readable_version() < request_version) { + all_task_applied = false; + unapplied_tablet.insert(std::make_pair(tablet_id, request_version)); + } + VLOG(1) << "tablet: " << tablet->tablet_id() << " max_readable_version is " + << tablet->max_readable_version() << ", request_version is " << request_version; + } + } + + if (!all_task_applied) { + _unapplied_tablet_by_txn[finish_task_request.signature] = std::move(unapplied_tablet); + } + return all_task_applied; +} + +bool PublishVersionManager::_left_task_applied(const TFinishTaskRequest& finish_task_request) { + bool applied = true; + int64_t signature = finish_task_request.signature; + std::set> unapplied_tablet; + auto iter = _unapplied_tablet_by_txn.find(signature); + if (iter == _unapplied_tablet_by_txn.end()) { + return true; + } + for (auto& tablet_pair : iter->second) { + int64_t tablet_id = tablet_pair.first; + int64_t request_version = tablet_pair.second; + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet != nullptr) { + DCHECK(tablet->keys_type() == KeysType::PRIMARY_KEYS); + if (tablet->max_readable_version() < request_version) { + applied = false; + unapplied_tablet.insert(std::make_pair(tablet_id, request_version)); + } + VLOG(1) << "tablet: " << tablet->tablet_id() << " max_readable_version is " + << tablet->max_readable_version() << ", request_version is " << request_version; + } + } + if (!applied) { + iter->second.swap(unapplied_tablet); + } else { + _unapplied_tablet_by_txn.erase(signature); + } + return applied; +} + +Status PublishVersionManager::wait_publish_task_apply_finish(std::vector finish_task_requests) { + std::lock_guard wl(_lock); + for (size_t i = 0; i < finish_task_requests.size(); i++) { + if (_all_task_applied(finish_task_requests[i])) { + _finish_task_requests[finish_task_requests[i].signature] = std::move(finish_task_requests[i]); + } else { + _waitting_finish_task_requests[finish_task_requests[i].signature] = std::move(finish_task_requests[i]); + } + } + CHECK(has_pending_task()); + return Status::OK(); +} + +void PublishVersionManager::update_tablet_version(TFinishTaskRequest& finish_task_request) { + auto& tablet_versions = finish_task_request.tablet_versions; + for (int32_t i = 0; i < tablet_versions.size(); i++) { + int64_t tablet_id = tablet_versions[i].tablet_id; + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet != nullptr) { + tablet_versions[i].__set_version(tablet->max_readable_version()); + } + } +} + +Status PublishVersionManager::finish_publish_version_task() { + std::vector erase_finish_task_signature; + std::vector erase_waitting_finish_task_signature; + { + std::lock_guard wl(_lock); + Status st; + for (auto& [signature, finish_task_request] : _finish_task_requests) { + // submit finish task + st = _finish_publish_version_thread_pool->submit_func( + [this, finish_request = std::move(finish_task_request)]() mutable { + update_tablet_version(finish_request); +#ifndef BE_TEST + finish_task(finish_request); +#endif + remove_task_info(finish_request.task_type, finish_request.signature); + }); + erase_finish_task_signature.emplace_back(signature); + } + + std::vector clear_txn; + for (auto& [signature, finish_task_request] : _waitting_finish_task_requests) { + if (_left_task_applied(finish_task_request)) { + st = _finish_publish_version_thread_pool->submit_func( + [this, finish_request = std::move(finish_task_request)]() mutable { + update_tablet_version(finish_request); +#ifndef BE_TEST + finish_task(finish_request); +#endif + remove_task_info(finish_request.task_type, finish_request.signature); + }); + erase_waitting_finish_task_signature.emplace_back(signature); + } + } + for (auto& signature : erase_finish_task_signature) { + _finish_task_requests.erase(signature); + } + for (auto& signature : erase_waitting_finish_task_signature) { + _waitting_finish_task_requests.erase(signature); + _unapplied_tablet_by_txn.erase(signature); + } + } + return Status::OK(); +} + +} // namespace starrocks \ No newline at end of file diff --git a/be/src/storage/publish_version_manager.h b/be/src/storage/publish_version_manager.h new file mode 100644 index 0000000000000..ea9692eaa8f0a --- /dev/null +++ b/be/src/storage/publish_version_manager.h @@ -0,0 +1,55 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" +#include "gen_cpp/MasterService_types.h" +#include "util/threadpool.h" + +namespace starrocks { + +using FinishTaskRequestPtr = std::shared_ptr; + +class PublishVersionManager { +public: + Status init(); + ~PublishVersionManager(); + Status wait_publish_task_apply_finish(std::vector finish_task_requests); + bool has_pending_task() { return !_finish_task_requests.empty() || !_waitting_finish_task_requests.empty(); } + Status finish_publish_version_task(); + void update_tablet_version(TFinishTaskRequest& finish_task_request); + + size_t finish_task_requests_size() { return _finish_task_requests.size(); } + size_t waitting_finish_task_requests_size() { return _waitting_finish_task_requests.size(); } + +private: + bool _all_task_applied(const TFinishTaskRequest& finish_task_request); + bool _left_task_applied(const TFinishTaskRequest& finish_task_request); + +private: + mutable std::mutex _lock; + + std::map _finish_task_requests; + std::map _waitting_finish_task_requests; + std::map>> _unapplied_tablet_by_txn; + std::unique_ptr _finish_publish_version_thread_pool; +}; + +} // namespace starrocks \ No newline at end of file diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index f0d2b97b6992f..06527a76b0db0 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -59,6 +59,7 @@ #include "storage/compaction_manager.h" #include "storage/data_dir.h" #include "storage/memtable_flush_executor.h" +#include "storage/publish_version_manager.h" #include "storage/rowset/rowset_meta.h" #include "storage/rowset/rowset_meta_manager.h" #include "storage/rowset/unique_rowset_id_generator.h" @@ -111,7 +112,8 @@ StorageEngine::StorageEngine(const EngineOptions& options) _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), _memtable_flush_executor(nullptr), _update_manager(new UpdateManager(options.update_mem_tracker)), - _compaction_manager(new CompactionManager()) { + _compaction_manager(new CompactionManager()), + _publish_version_manager(new PublishVersionManager()) { #ifdef BE_TEST _p_instance = _s_instance; _s_instance = this; @@ -192,6 +194,8 @@ Status StorageEngine::_open(const EngineOptions& options) { RETURN_IF_ERROR_WITH_WARN(_update_manager->init(), "init update_manager failed"); + RETURN_IF_ERROR_WITH_WARN(_publish_version_manager->init(), "init publish_version_manager failed"); + auto dirs = get_stores(); // `load_data_dirs` depend on |_update_manager|. @@ -602,6 +606,7 @@ void StorageEngine::stop() { JOIN_THREAD(_unused_rowset_monitor_thread) JOIN_THREAD(_garbage_sweeper_thread) JOIN_THREAD(_disk_stat_monitor_thread) + JOIN_THREAD(_finish_publish_version_thread) JOIN_THREADS(_base_compaction_threads) JOIN_THREADS(_cumulative_compaction_threads) diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h index 32f1e64177cdf..3da64bc90a644 100644 --- a/be/src/storage/storage_engine.h +++ b/be/src/storage/storage_engine.h @@ -75,6 +75,7 @@ class MemTableFlushExecutor; class Tablet; class UpdateManager; class CompactionManager; +class PublishVersionManager; class SegmentFlushExecutor; class SegmentReplicateExecutor; @@ -219,6 +220,8 @@ class StorageEngine { CompactionManager* compaction_manager() { return _compaction_manager.get(); } + PublishVersionManager* publish_version_manager() { return _publish_version_manager.get(); } + bthread::Executor* async_delta_writer_executor() { return _async_delta_writer_executor.get(); } MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); } @@ -276,6 +279,11 @@ class StorageEngine { void clear_rowset_delta_column_group_cache(const Rowset& rowset); + void wake_finish_publish_vesion_thread() { + std::unique_lock wl(_finish_publish_version_mutex); + _finish_publish_version_cv.notify_one(); + } + protected: static StorageEngine* _s_instance; @@ -338,6 +346,9 @@ class StorageEngine { // delete tablet with io error process function void* _disk_stat_monitor_thread_callback(void* arg); + // finish publish version process function + void* _finish_publish_version_thread_callback(void* arg); + // clean file descriptors cache void* _fd_cache_clean_callback(void* arg); @@ -383,6 +394,8 @@ class StorageEngine { std::thread _garbage_sweeper_thread; // thread to monitor disk stat std::thread _disk_stat_monitor_thread; + // thread to check finish publish version task + std::thread _finish_publish_version_thread; // threads to run base compaction std::vector _base_compaction_threads; // threads to check cumulative @@ -413,6 +426,9 @@ class StorageEngine { std::mutex _trash_sweeper_mutex; std::condition_variable _trash_sweeper_cv; + std::mutex _finish_publish_version_mutex; + std::condition_variable _finish_publish_version_cv; + // For tablet and disk-stat report std::mutex _report_mtx; std::condition_variable _report_cv; @@ -436,6 +452,10 @@ class StorageEngine { std::unique_ptr _compaction_manager; + std::unique_ptr _publish_version_manager; + + HeartbeatFlags* _heartbeat_flags = nullptr; + std::unordered_map> _auto_increment_meta_map; std::mutex _auto_increment_mutex; diff --git a/be/src/storage/tablet.cpp b/be/src/storage/tablet.cpp index 826e82f012ab6..ca5ed96039a8b 100644 --- a/be/src/storage/tablet.cpp +++ b/be/src/storage/tablet.cpp @@ -970,6 +970,20 @@ int64_t Tablet::max_continuous_version() const { } } +int64_t Tablet::max_readable_version() const { + if (_updates != nullptr) { + return _updates->max_readable_version(); + } else { + std::shared_lock rdlock(_meta_lock); + int64_t v = _timestamped_version_tracker.get_max_continuous_version(); + if (tablet_state() == TABLET_RUNNING) { + // only check when tablet in running state + DCHECK_EQ(v, _max_continuous_version_from_beginning_unlocked().second); + } + return v; + } +} + void Tablet::calculate_cumulative_point() { std::unique_lock wrlock(_meta_lock); if (_cumulative_point != kInvalidCumulativePoint) { @@ -1428,6 +1442,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) { // and perform state modification operations. } tablet_info->__set_version(max_version); + tablet_info->__set_max_readable_version(max_version); // TODO: support getting minReadableVersion tablet_info->__set_min_readable_version(_timestamped_version_tracker.get_min_readable_version()); tablet_info->__set_version_count(_tablet_meta->version_count()); diff --git a/be/src/storage/tablet.h b/be/src/storage/tablet.h index 35a3f8c22888e..cf6417cdde700 100644 --- a/be/src/storage/tablet.h +++ b/be/src/storage/tablet.h @@ -202,6 +202,7 @@ class Tablet : public BaseTablet { // Same as max_continuous_version_from_beginning, only return end version, using a more efficient implementation int64_t max_continuous_version() const; + int64_t max_readable_version() const; int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; } void set_last_cumu_compaction_failure_time(int64_t millis) { _last_cumu_compaction_failure_millis = millis; } diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index 5ed3cdef7b3e7..90eafb2ac9e03 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -451,6 +451,11 @@ int64_t TabletUpdates::max_version() const { return _edit_version_infos.empty() ? 0 : _edit_version_infos.back()->version.major(); } +int64_t TabletUpdates::max_readable_version() const { + std::lock_guard rl(_lock); + return _edit_version_infos.empty() ? 0 : _edit_version_infos[_apply_version_idx]->version.major(); +} + Status TabletUpdates::get_rowsets_total_stats(const std::vector& rowsets, size_t* total_rows, size_t* total_dels) { string err_rowsets; @@ -2671,6 +2676,7 @@ size_t TabletUpdates::_get_rowset_num_deletes(const Rowset& rowset) { void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) { int64_t min_readable_version = 0; + int64_t max_readable_version = 0; int64_t version = 0; bool has_pending = false; int64_t version_count = 0; @@ -2681,6 +2687,7 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) { LOG(WARNING) << "tablet delete when get_tablet_info_extra tablet:" << _tablet.tablet_id(); } else { min_readable_version = _edit_version_infos[0]->version.major(); + max_readable_version = _edit_version_infos[_apply_version_idx]->version.major(); auto& last = _edit_version_infos.back(); version = last->version.major(); rowsets = last->rowsets; @@ -2712,6 +2719,7 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) { } info->__set_version(version); info->__set_min_readable_version(min_readable_version); + info->__set_max_readable_version(max_readable_version); info->__set_version_miss(has_pending); info->__set_version_count(version_count); info->__set_row_count(total_row); diff --git a/be/src/storage/tablet_updates.h b/be/src/storage/tablet_updates.h index 1b6c4a5f8d2ac..4110409a5f5f0 100644 --- a/be/src/storage/tablet_updates.h +++ b/be/src/storage/tablet_updates.h @@ -130,6 +130,8 @@ class TabletUpdates { // get latest version's version int64_t max_version() const; + int64_t max_readable_version() const; + // get total number of committed and pending rowsets size_t version_count() const; diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 786afa19b60fe..de76a5e44edd4 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -283,6 +283,7 @@ set(EXEC_FILES ./storage/binlog_reader_test.cpp ./storage/tablet_binlog_test.cpp ./storage/publish_version_task_test.cpp + ./storage/publish_version_manager_test.cpp ./storage/get_use_pk_index_test.cpp ./runtime/buffer_control_block_test.cpp ./runtime/datetime_value_test.cpp diff --git a/be/test/storage/publish_version_manager_test.cpp b/be/test/storage/publish_version_manager_test.cpp new file mode 100644 index 0000000000000..d38600982dc9d --- /dev/null +++ b/be/test/storage/publish_version_manager_test.cpp @@ -0,0 +1,262 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "storage/publish_version_manager.h" + +#include + +#include "agent/agent_common.h" +#include "agent/agent_server.h" +#include "agent/publish_version.h" +#include "butil/file_util.h" +#include "column/column_helper.h" +#include "column/column_pool.h" +#include "common/config.h" +#include "exec/pipeline/query_context.h" +#include "fs/fs_util.h" +#include "gtest/gtest.h" +#include "storage/chunk_helper.h" +#include "storage/delta_writer.h" +#include "storage/empty_iterator.h" +#include "storage/options.h" +#include "storage/rowset/rowset_factory.h" +#include "storage/rowset/rowset_writer.h" +#include "storage/rowset/rowset_writer_context.h" +#include "storage/storage_engine.h" +#include "storage/tablet_manager.h" +#include "storage/tablet_meta.h" +#include "storage/tablet_reader.h" +#include "storage/txn_manager.h" +#include "storage/union_iterator.h" +#include "storage/update_manager.h" +#include "testutil/assert.h" + +namespace starrocks { + +class PublishVersionManagerTest : public testing::Test { +public: + void SetUp() override { + _publish_version_manager = starrocks::StorageEngine::instance()->publish_version_manager(); + _finish_publish_version_thread = std::thread([this] { _finish_publish_version_thread_callback(nullptr); }); + Thread::set_thread_name(_finish_publish_version_thread, "finish_publish_version"); + } + + void TearDown() override { + if (_tablet) { + StorageEngine::instance()->tablet_manager()->drop_tablet(_tablet->tablet_id()); + _tablet.reset(); + } + _stopped.store(true, std::memory_order_release); + _finish_publish_version_cv.notify_all(); + if (_finish_publish_version_thread.joinable()) { + _finish_publish_version_thread.join(); + } + } + + TabletSharedPtr create_tablet(int64_t tablet_id, int32_t schema_hash) { + TCreateTabletReq request; + request.tablet_id = tablet_id; + request.__set_version(1); + request.__set_version_hash(0); + request.tablet_schema.schema_hash = schema_hash; + request.tablet_schema.short_key_column_count = 1; + request.tablet_schema.keys_type = TKeysType::PRIMARY_KEYS; + request.tablet_schema.storage_type = TStorageType::COLUMN; + + TColumn k1; + k1.column_name = "pk"; + k1.__set_is_key(true); + k1.column_type.type = TPrimitiveType::BIGINT; + request.tablet_schema.columns.push_back(k1); + + TColumn k2; + k2.column_name = "v1"; + k2.__set_is_key(false); + k2.column_type.type = TPrimitiveType::SMALLINT; + request.tablet_schema.columns.push_back(k2); + + TColumn k3; + k3.column_name = "v2"; + k3.__set_is_key(false); + k3.column_type.type = TPrimitiveType::INT; + request.tablet_schema.columns.push_back(k3); + auto st = StorageEngine::instance()->create_tablet(request); + CHECK(st.ok()) << st.to_string(); + return StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false); + } + + RowsetSharedPtr create_rowset(const TabletSharedPtr& tablet, const vector& keys, + Column* one_delete = nullptr, bool empty = false, bool has_merge_condition = false) { + RowsetWriterContext writer_context; + RowsetId rowset_id = StorageEngine::instance()->next_rowset_id(); + writer_context.rowset_id = rowset_id; + writer_context.tablet_id = tablet->tablet_id(); + writer_context.tablet_schema_hash = tablet->schema_hash(); + writer_context.partition_id = 0; + writer_context.rowset_path_prefix = tablet->schema_hash_path(); + writer_context.rowset_state = COMMITTED; + writer_context.tablet_schema = &tablet->tablet_schema(); + writer_context.version.first = 0; + writer_context.version.second = 0; + writer_context.segments_overlap = NONOVERLAPPING; + if (has_merge_condition) { + writer_context.merge_condition = "v2"; + } + std::unique_ptr writer; + EXPECT_TRUE(RowsetFactory::create_rowset_writer(writer_context, &writer).ok()); + if (empty) { + return *writer->build(); + } + auto schema = ChunkHelper::convert_schema(tablet->tablet_schema()); + auto chunk = ChunkHelper::new_chunk(schema, keys.size()); + auto& cols = chunk->columns(); + for (int64_t key : keys) { + if (schema.num_key_fields() == 1) { + cols[0]->append_datum(Datum(key)); + } else { + cols[0]->append_datum(Datum(key)); + string v = fmt::to_string(key * 234234342345); + cols[1]->append_datum(Datum(Slice(v))); + cols[2]->append_datum(Datum((int32_t)key)); + } + int vcol_start = schema.num_key_fields(); + cols[vcol_start]->append_datum(Datum((int16_t)(key % 100 + 1))); + if (cols[vcol_start + 1]->is_binary()) { + string v = fmt::to_string(key % 1000 + 2); + cols[vcol_start + 1]->append_datum(Datum(Slice(v))); + } else { + cols[vcol_start + 1]->append_datum(Datum((int32_t)(key % 1000 + 2))); + } + } + if (one_delete == nullptr && !keys.empty()) { + CHECK_OK(writer->flush_chunk(*chunk)); + } else if (one_delete == nullptr) { + CHECK_OK(writer->flush()); + } else if (one_delete != nullptr) { + CHECK_OK(writer->flush_chunk_with_deletes(*chunk, *one_delete)); + } + return *writer->build(); + } + + void* _finish_publish_version_thread_callback(void* arg) { + while (!_stopped.load(std::memory_order_consume)) { + int32_t interval = config::finish_publish_version_internal; + { + std::unique_lock wl(_finish_publish_version_mutex); + CHECK(_publish_version_manager != nullptr); + while (!_publish_version_manager->has_pending_task() && !_stopped.load(std::memory_order_consume)) { + _finish_publish_version_cv.wait(wl); + } + _publish_version_manager->finish_publish_version_task(); + if (interval <= 0) { + LOG(WARNING) << "finish_publish_version_internal config is illegal: " << interval + << ", force set to 1"; + interval = 1000; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } + + return nullptr; + } + +public: + TabletSharedPtr _tablet; + std::thread _finish_publish_version_thread; + std::mutex _finish_publish_version_mutex; + std::condition_variable _finish_publish_version_cv; + std::atomic _stopped{false}; + PublishVersionManager* _publish_version_manager; +}; + +static ChunkIteratorPtr create_tablet_iterator(TabletReader& reader, Schema& schema) { + TabletReaderParams params; + if (!reader.prepare().ok()) { + LOG(ERROR) << "reader prepare failed"; + return nullptr; + } + std::vector seg_iters; + if (!reader.get_segment_iterators(params, &seg_iters).ok()) { + LOG(ERROR) << "reader get segment iterators fail"; + return nullptr; + } + if (seg_iters.empty()) { + return new_empty_iterator(schema, DEFAULT_CHUNK_SIZE); + } + return new_union_iterator(seg_iters); +} + +static ssize_t read_until_eof(const ChunkIteratorPtr& iter) { + auto chunk = ChunkHelper::new_chunk(iter->schema(), 100); + size_t count = 0; + while (true) { + auto st = iter->get_next(chunk.get()); + if (st.is_end_of_file()) { + break; + } else if (st.ok()) { + count += chunk->num_rows(); + chunk->reset(); + } else { + LOG(WARNING) << "read error: " << st.to_string(); + return -1; + } + } + return count; +} + +static ssize_t read_tablet(const TabletSharedPtr& tablet, int64_t version) { + Schema schema = ChunkHelper::convert_schema(tablet->tablet_schema()); + TabletReader reader(tablet, Version(0, version), schema); + auto iter = create_tablet_iterator(reader, schema); + if (iter == nullptr) { + return -1; + } + return read_until_eof(iter); +} + +TEST_F(PublishVersionManagerTest, test_publish_task) { + _tablet = create_tablet(rand(), rand()); + // write + const int N = 1000; + std::vector keys; + for (int i = 0; i < N; i++) { + keys.push_back(i); + } + auto rs0 = create_rowset(_tablet, keys); + ASSERT_TRUE(_tablet->rowset_commit(2, rs0).ok()); + _tablet->updates()->stop_apply(true); + auto rs1 = create_rowset(_tablet, keys); + ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok()); + std::vector finish_task_requests; + auto& finish_task_request = finish_task_requests.emplace_back(); + finish_task_request.signature = 2222; + auto& tablet_publish_versions = finish_task_request.tablet_publish_versions; + auto& pair = tablet_publish_versions.emplace_back(); + pair.__set_tablet_id(_tablet->tablet_id()); + pair.__set_version(3); + _publish_version_manager->wait_publish_task_apply_finish(std::move(finish_task_requests)); + _finish_publish_version_cv.notify_one(); + + ASSERT_EQ(0, _publish_version_manager->finish_task_requests_size()); + ASSERT_EQ(1, _publish_version_manager->waitting_finish_task_requests_size()); + _tablet->updates()->stop_apply(false); + _tablet->updates()->check_for_apply(); + ASSERT_EQ(N, read_tablet(_tablet, 3)); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + ASSERT_EQ(0, _publish_version_manager->finish_task_requests_size()); + ASSERT_EQ(0, _publish_version_manager->waitting_finish_task_requests_size()); +} + +} // namespace starrocks \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 5f7d5f0bc31b6..eeed3f02b1cce 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -2274,6 +2274,8 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean enable_new_publish_mechanism = false; + @ConfField(mutable = true) + public static boolean enable_sync_publish = true; /** * Normally FE will quit when replaying a bad journal. This configuration provides a bypass mechanism. * If this was set to a positive value, FE will skip the corresponding bad journals before it quits. diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java b/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java index 5f305e6706241..ea4939d7651f0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java @@ -616,7 +616,11 @@ private static void sync(Map backendTablets, ListMultimap execPlan); + long jobDeadLineMs = System.currentTimeMillis() + context.getSessionVariable().getQueryTimeoutS() * 1000; coord.join(context.getSessionVariable().getQueryTimeoutS()); if (!coord.isDone()) { /* @@ -1836,7 +1837,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { transactionId, TabletCommitInfo.fromThrift(coord.getCommitInfos()), TabletFailInfo.fromThrift(coord.getFailInfos()), - context.getSessionVariable().getTransactionVisibleWaitTimeout() * 1000, + Config.enable_sync_publish ? jobDeadLineMs - System.currentTimeMillis() : + context.getSessionVariable().getTransactionVisibleWaitTimeout() * 1000, new InsertTxnCommitAttachment(loadedRows))) { txnStatus = TransactionStatus.VISIBLE; MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index b1efab5ce3844..3f30de522cbcf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -1317,6 +1317,11 @@ private void transferToLeader() { createBuiltinStorageVolume(); } + public void setFrontendNodeType(FrontendNodeType newType) { + // just for test, don't call it directly + feType = newType; + } + // start all daemon threads only running on Master private void startLeaderOnlyDaemonThreads() { if (RunMode.allowCreateLakeTable()) { diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 3e683c70cee9f..ce1b9d34083c9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -175,6 +175,8 @@ import com.starrocks.thrift.TGetDbsResult; import com.starrocks.thrift.TGetGrantsToRolesOrUserRequest; import com.starrocks.thrift.TGetGrantsToRolesOrUserResponse; +import com.starrocks.thrift.TGetLoadTxnStatusRequest; +import com.starrocks.thrift.TGetLoadTxnStatusResult; import com.starrocks.thrift.TGetLoadsParams; import com.starrocks.thrift.TGetLoadsResult; import com.starrocks.thrift.TGetProfileRequest; @@ -258,6 +260,7 @@ import com.starrocks.thrift.TTaskInfo; import com.starrocks.thrift.TTaskRunInfo; import com.starrocks.thrift.TTrackingLoadInfo; +import com.starrocks.thrift.TTransactionStatus; import com.starrocks.thrift.TUpdateExportTaskStatusRequest; import com.starrocks.thrift.TUpdateResourceUsageRequest; import com.starrocks.thrift.TUpdateResourceUsageResponse; @@ -1221,6 +1224,13 @@ public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TEx TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); + long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + if (Config.enable_sync_publish) { + result.setTimeout(timeoutSecond); + } else { + result.setTimeout(0); + } + try { result.setTxnId(loadTxnBeginImpl(request, clientAddr)); } catch (DuplicatedRequestException e) { @@ -1425,6 +1435,42 @@ private void loadTxnCommitImpl(TLoadTxnCommitRequest request, TStatus status) th } } + @Override + public TGetLoadTxnStatusResult getLoadTxnStatus(TGetLoadTxnStatusRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.info("receive get txn status request. db: {}, tbl: {}, txn_id: {}, backend: {}", + request.getDb(), request.getTbl(), request.getTxnId(), clientAddr); + LOG.debug("get txn status request: {}", request); + + TGetLoadTxnStatusResult result = new TGetLoadTxnStatusResult(); + // if current node is not master, reject the request + if (!GlobalStateMgr.getCurrentState().isLeader()) { + LOG.warn("current fe is not leader"); + result.setStatus(TTransactionStatus.UNKNOWN); + return result; + } + + // get database + GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState(); + String dbName = request.getDb(); + Database db = globalStateMgr.getDb(dbName); + if (db == null) { + LOG.warn("unknown database, database=" + dbName); + result.setStatus(TTransactionStatus.UNKNOWN); + return result; + } + + try { + TTransactionStatus status = GlobalStateMgr.getCurrentGlobalTransactionMgr().getTxnStatus(db, request.getTxnId()); + LOG.debug("txn {} status is {}", request.getTxnId(), status); + result.setStatus(status); + } catch (Throwable e) { + result.setStatus(TTransactionStatus.UNKNOWN); + LOG.warn("catch unknown result.", e); + } + return result; + } + @Override public TLoadTxnCommitResult loadTxnPrepare(TLoadTxnCommitRequest request) throws TException { String clientAddr = getClientAddrAsString(); diff --git a/fe/fe-core/src/main/java/com/starrocks/task/PublishVersionTask.java b/fe/fe-core/src/main/java/com/starrocks/task/PublishVersionTask.java index 729651f9c35d8..857fb0d8536a1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/PublishVersionTask.java @@ -64,10 +64,11 @@ public class PublishVersionTask extends AgentTask { private final long commitTimestamp; private final TransactionState txnState; private Span span; + private boolean enableSyncPublish; public PublishVersionTask(long backendId, long transactionId, long dbId, long commitTimestamp, List partitionVersionInfos, String traceParent, Span txnSpan, - long createTime, TransactionState state) { + long createTime, TransactionState state, boolean enableSyncPublish) { super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime, traceParent); this.transactionId = transactionId; this.partitionVersionInfos = partitionVersionInfos; @@ -75,6 +76,7 @@ public PublishVersionTask(long backendId, long transactionId, long dbId, long co this.isFinished = false; this.commitTimestamp = commitTimestamp; this.txnState = state; + this.enableSyncPublish = enableSyncPublish; if (txnSpan != null) { span = TraceManager.startSpan("publish_version_task", txnSpan); span.setAttribute("backend_id", backendId); @@ -89,6 +91,7 @@ public TPublishVersionRequest toThrift() { TPublishVersionRequest publishVersionRequest = new TPublishVersionRequest(transactionId, partitionVersionInfos); publishVersionRequest.setCommit_timestamp(commitTimestamp); publishVersionRequest.setTxn_trace_parent(traceParent); + publishVersionRequest.setEnable_sync_publish(enableSyncPublish); return publishVersionRequest; } diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java index b520f1675d753..c816978726f4a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java @@ -68,6 +68,7 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.analyzer.FeNameFormat; import com.starrocks.statistic.StatisticUtils; +import com.starrocks.thrift.TTransactionStatus; import com.starrocks.thrift.TUniqueId; import io.opentelemetry.api.trace.Span; import org.apache.commons.collections4.CollectionUtils; @@ -1701,4 +1702,41 @@ private List populateTransactionStateListeners(@NotNul } return stateListeners; } + + public TTransactionStatus getTxnStatus(long txnId) { + TransactionState transactionState; + readLock(); + try { + transactionState = unprotectedGetTransactionState(txnId); + } finally { + readUnlock(); + } + if (transactionState == null) { + return TTransactionStatus.UNKNOWN; + } + TransactionStatus status = transactionState.getTransactionStatus(); + + switch (status.value()) { + //UNKNOWN + case 0: + return TTransactionStatus.UNKNOWN; + //PREPARE + case 1: + return TTransactionStatus.PREPARE; + //COMMITTED + case 2: + return TTransactionStatus.COMMITTED; + //VISIBLE + case 3: + return TTransactionStatus.VISIBLE; + //ABORTED + case 4: + return TTransactionStatus.ABORTED; + //PREPARED + case 5: + return TTransactionStatus.PREPARED; + default: + return TTransactionStatus.UNKNOWN; + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java index 97d18913e99bd..f796857aad05e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java @@ -61,6 +61,7 @@ import com.starrocks.thrift.TNetworkAddress; import com.starrocks.thrift.TStatusCode; import com.starrocks.thrift.TTabletCommitInfo; +import com.starrocks.thrift.TTransactionStatus; import com.starrocks.thrift.TUniqueId; import com.starrocks.transaction.TransactionState.LoadJobSourceType; import com.starrocks.transaction.TransactionState.TxnCoordinator; @@ -531,6 +532,11 @@ public void abortTransaction(Long dbId, String label, String reason) throws User dbTransactionMgr.abortTransaction(label, reason); } + public TTransactionStatus getTxnStatus(Database db, long transactionId) throws UserException { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(db.getId()); + return dbTransactionMgr.getTxnStatus(transactionId); + } + /** * get all txns which is ready to publish * a ready-to-publish txn's partition's visible version should be ONE less than txn's commit version. diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java index baf09d7f74f93..ec9f15da0cbee 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java @@ -935,7 +935,8 @@ public List createPublishVersionTask() { traceParent, txnSpan, createTime, - this); + this, + Config.enable_sync_publish); this.addPublishVersionTask(backendId, task); tasks.add(task); } diff --git a/fe/fe-core/src/test/java/com/starrocks/pseudocluster/DecommissionTest.java b/fe/fe-core/src/test/java/com/starrocks/pseudocluster/DecommissionTest.java index decfffabf6591..24bebf66e387c 100644 --- a/fe/fe-core/src/test/java/com/starrocks/pseudocluster/DecommissionTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/pseudocluster/DecommissionTest.java @@ -51,6 +51,7 @@ public static void tearDown() throws Exception { @Test public void testDecommission() throws Exception { PseudoCluster cluster = PseudoCluster.getInstance(); + Config.statistic_collect_query_timeout = 3600; int numTable = 10; final String[] tableNames = new String[numTable]; final String[] createTableSqls = new String[numTable]; diff --git a/fe/fe-core/src/test/java/com/starrocks/pseudocluster/PseudoCluster.java b/fe/fe-core/src/test/java/com/starrocks/pseudocluster/PseudoCluster.java index 27b2b08dacc55..482d3a627e3f5 100644 --- a/fe/fe-core/src/test/java/com/starrocks/pseudocluster/PseudoCluster.java +++ b/fe/fe-core/src/test/java/com/starrocks/pseudocluster/PseudoCluster.java @@ -260,7 +260,7 @@ public Set getBackendIdsByShard(long shardId, long workerGroupId) throws U public ClusterConfig getConfig() { return config; } - + public PseudoBackend getBackend(long beId) { String host = backendIdToHost.get(beId); if (host == null) { diff --git a/fe/fe-core/src/test/java/com/starrocks/pseudocluster/PseudoFrontend.java b/fe/fe-core/src/test/java/com/starrocks/pseudocluster/PseudoFrontend.java index fa797bdc0ab83..fbce2cec56802 100644 --- a/fe/fe-core/src/test/java/com/starrocks/pseudocluster/PseudoFrontend.java +++ b/fe/fe-core/src/test/java/com/starrocks/pseudocluster/PseudoFrontend.java @@ -176,6 +176,7 @@ public void run() { try { // init config new Config().init(frontend.getRunningDir() + "/conf/fe.conf"); + Config.statistic_collect_query_timeout = 60; // check it after Config is initialized, otherwise the config 'check_java_version' won't work. if (!JdkUtils.checkJavaVersion()) { diff --git a/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java b/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java index f1e5112386e8c..56470084e2e5d 100644 --- a/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java @@ -22,6 +22,7 @@ import com.starrocks.common.AnalysisException; import com.starrocks.common.Config; import com.starrocks.common.FeConstants; +import com.starrocks.ha.FrontendNodeType; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.DDLStmtExecutor; import com.starrocks.qe.scheduler.slot.ResourceUsageMonitor; @@ -36,6 +37,8 @@ import com.starrocks.thrift.TCreatePartitionResult; import com.starrocks.thrift.TDescribeTableParams; import com.starrocks.thrift.TDescribeTableResult; +import com.starrocks.thrift.TGetLoadTxnStatusRequest; +import com.starrocks.thrift.TGetLoadTxnStatusResult; import com.starrocks.thrift.TGetTablesInfoRequest; import com.starrocks.thrift.TGetTablesInfoResponse; import com.starrocks.thrift.TGetTablesParams; @@ -46,8 +49,13 @@ import com.starrocks.thrift.TTableInfo; import com.starrocks.thrift.TTableStatus; import com.starrocks.thrift.TTableType; +import com.starrocks.thrift.TTransactionStatus; +import com.starrocks.thrift.TUniqueId; import com.starrocks.thrift.TUpdateResourceUsageRequest; import com.starrocks.thrift.TUserIdentity; +import com.starrocks.transaction.TransactionState; +import com.starrocks.transaction.TransactionState.TxnCoordinator; +import com.starrocks.transaction.TransactionState.TxnSourceType; import com.starrocks.utframe.StarRocksAssert; import com.starrocks.utframe.UtFrameUtils; import mockit.Expectations; @@ -62,6 +70,7 @@ import java.util.Collection; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; public class FrontendServiceImplTest { @@ -733,4 +742,32 @@ public void testGetSpecialColumn() throws Exception { Assert.assertEquals(1, response.tables.size()); } + @Test + public void testGetLoadTxnStatus() throws Exception { + Database db = GlobalStateMgr.getCurrentState().getDb("test"); + Table table = db.getTable("site_access_day"); + UUID uuid = UUID.randomUUID(); + TUniqueId requestId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + long transactionId = GlobalStateMgr.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), + Lists.newArrayList(table.getId()), "1jdc689-xd232", requestId, + new TxnCoordinator(TxnSourceType.BE, "1.1.1.1"), + TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, 600); + FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv); + TGetLoadTxnStatusRequest request = new TGetLoadTxnStatusRequest(); + request.setDb("non-exist-db"); + request.setTbl("non-site_access_day-tbl"); + request.setTxnId(100); + TGetLoadTxnStatusResult result1 = impl.getLoadTxnStatus(request); + Assert.assertEquals(TTransactionStatus.UNKNOWN, result1.getStatus()); + request.setDb("test"); + TGetLoadTxnStatusResult result2 = impl.getLoadTxnStatus(request); + Assert.assertEquals(TTransactionStatus.UNKNOWN, result2.getStatus()); + request.setTxnId(transactionId); + GlobalStateMgr.getCurrentState().setFrontendNodeType(FrontendNodeType.FOLLOWER); + TGetLoadTxnStatusResult result3 = impl.getLoadTxnStatus(request); + Assert.assertEquals(TTransactionStatus.UNKNOWN, result3.getStatus()); + GlobalStateMgr.getCurrentState().setFrontendNodeType(FrontendNodeType.LEADER); + TGetLoadTxnStatusResult result4 = impl.getLoadTxnStatus(request); + Assert.assertEquals(TTransactionStatus.PREPARE, result4.getStatus()); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java index 4217d36d9086a..8f7f65e893a49 100644 --- a/fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java @@ -45,6 +45,7 @@ import com.starrocks.common.UserException; import com.starrocks.common.util.TimeUtils; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.thrift.TTransactionStatus; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -112,6 +113,8 @@ public Map addTransactionToTransactionMgr() throws UserException { transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); masterTransMgr.commitTransaction(GlobalStateMgrTestUtil.testDbId1, transactionId1, transTablets); + DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(GlobalStateMgrTestUtil.testDbId1); + assertEquals(TTransactionStatus.COMMITTED, masterDbTransMgr.getTxnStatus(transactionId1)); masterTransMgr.finishTransaction(GlobalStateMgrTestUtil.testDbId1, transactionId1, null); lableToTxnId.put(GlobalStateMgrTestUtil.testTxnLable1, transactionId1); @@ -168,13 +171,16 @@ public void testNormal() throws UserException { masterDbTransMgr.getTransactionState(lableToTxnId.get(GlobalStateMgrTestUtil.testTxnLable1)); assertEquals(txnId1.longValue(), transactionState1.getTransactionId()); assertEquals(TransactionStatus.VISIBLE, transactionState1.getTransactionStatus()); + assertEquals(TTransactionStatus.VISIBLE, masterDbTransMgr.getTxnStatus(txnId1.longValue())); Long txnId2 = masterDbTransMgr.unprotectedGetTxnIdsByLabel(GlobalStateMgrTestUtil.testTxnLable2).iterator().next(); assertEquals(txnId2, lableToTxnId.get(GlobalStateMgrTestUtil.testTxnLable2)); TransactionState transactionState2 = masterDbTransMgr.getTransactionState(txnId2); assertEquals(txnId2.longValue(), transactionState2.getTransactionId()); assertEquals(TransactionStatus.PREPARE, transactionState2.getTransactionStatus()); + assertEquals(TTransactionStatus.PREPARE, masterDbTransMgr.getTxnStatus(txnId2.longValue())); + assertEquals(TTransactionStatus.UNKNOWN, masterDbTransMgr.getTxnStatus(12134)); } @Test @@ -187,6 +193,7 @@ public void testAbortTransaction() throws UserException { assertEquals(0, masterDbTransMgr.getRunningRoutineLoadTxnNums()); assertEquals(2, masterDbTransMgr.getFinishedTxnNums()); assertEquals(4, masterDbTransMgr.getTransactionNum()); + assertEquals(TTransactionStatus.ABORTED, masterDbTransMgr.getTxnStatus(txnId2)); long txnId3 = lableToTxnId.get(GlobalStateMgrTestUtil.testTxnLable3); masterDbTransMgr.abortTransaction(txnId3, "test abort transaction", null); @@ -194,6 +201,7 @@ public void testAbortTransaction() throws UserException { assertEquals(0, masterDbTransMgr.getRunningRoutineLoadTxnNums()); assertEquals(3, masterDbTransMgr.getFinishedTxnNums()); assertEquals(4, masterDbTransMgr.getTransactionNum()); + assertEquals(TTransactionStatus.ABORTED, masterDbTransMgr.getTxnStatus(txnId3)); } @Test diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 4b5c7e6e69a9d..dff59bd563fa0 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -313,6 +313,7 @@ struct TPublishVersionRequest { 3: optional bool strict_mode = false // Deprecated 4: optional i64 commit_timestamp 5: optional string txn_trace_parent + 6: optional bool enable_sync_publish = false } struct TClearAlterTaskRequest { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index dce4cb9e14615..4f62b57982feb 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -792,6 +792,7 @@ struct TLoadTxnBeginResult { 1: required Status.TStatus status 2: optional i64 txnId 3: optional string job_status // if label already used, set status of existing job + 4: optional i64 timeout } // StreamLoad request, used to load a streaming to engine @@ -937,6 +938,21 @@ struct TLoadTxnRollbackRequest { 11: optional list failInfos } +struct TGetLoadTxnStatusResult { + 1: required Status.TTransactionStatus status +} + +struct TGetLoadTxnStatusRequest { + 1: optional string cluster + 2: required string user + 3: required string passwd + 4: required string db + 5: required string tbl + 6: optional string user_ip + 7: optional i64 auth_code + 8: required i64 txnId +} + struct TLoadTxnRollbackResult { 1: required Status.TStatus status } @@ -1555,5 +1571,7 @@ service FrontendService { TRequireSlotResponse requireSlotAsync(1: TRequireSlotRequest request) TFinishSlotRequirementResponse finishSlotRequirement(1: TFinishSlotRequirementRequest request) TReleaseSlotResponse releaseSlot(1: TReleaseSlotRequest request) + + TGetLoadTxnStatusResult getLoadTxnStatus(1: TGetLoadTxnStatusRequest request) } diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 9b985f3e9ac60..0ce95a73978e3 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -61,6 +61,7 @@ struct TTabletInfo { 16: optional Types.TVersion min_readable_version 17: optional i64 binlog_config_version 18: optional bool is_error_state + 19: optional Types.TVersion max_readable_version } struct TTabletVersionPair { @@ -86,6 +87,7 @@ struct TFinishTaskRequest { 15: optional i64 copy_size 16: optional i64 copy_time_ms 17: optional list tablet_versions; + 18: optional list tablet_publish_versions; } struct TTablet { diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift index 31381d4143c4b..b096e1fd3496a 100644 --- a/gensrc/thrift/Status.thrift +++ b/gensrc/thrift/Status.thrift @@ -32,7 +32,7 @@ // specific language governing permissions and limitations // under the License. -namespace cpp starrocks +namespace cpp starrocks namespace java com.starrocks.thrift include "StatusCode.thrift" @@ -41,3 +41,11 @@ struct TStatus { 2: optional list error_msgs } +enum TTransactionStatus { + UNKNOWN = 0, + PREPARE = 1, + COMMITTED = 2, + VISIBLE = 3, + ABORTED = 4, + PREPARED = 5 +}