Skip to content

[enhancement](cloud) improve the retry policy of cloud mode #49067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <brpc/channel.h>
#include <brpc/controller.h>
#include <brpc/errno.pb.h>
#include <bthread/bthread.h>
#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
Expand Down Expand Up @@ -385,9 +386,11 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
}
cntl.set_max_retry(kBrpcRetryTimes);
res->Clear();
int error_code = 0;
(stub.get()->*method)(&cntl, &req, res, nullptr);
if (cntl.Failed()) [[unlikely]] {
error_msg = cntl.ErrorText();
error_code = cntl.ErrorCode();
proxy->set_unhealthy();
} else if (res->status().code() == MetaServiceCode::OK) {
return Status::OK();
Expand All @@ -401,7 +404,12 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
error_msg = res->status().msg();
}

if (++retry_times > config::meta_service_rpc_retry_times) {
++retry_times;
if (retry_times > config::meta_service_rpc_retry_times ||
(retry_times > config::meta_service_rpc_timeout_retry_times &&
error_code == brpc::ERPCTIMEDOUT) ||
(retry_times > config::meta_service_conflict_error_retry_times &&
res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
break;
}

Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ DEFINE_Bool(meta_service_connection_pooled, "true");
DEFINE_mInt64(meta_service_connection_pool_size, "20");
DEFINE_mInt32(meta_service_connection_age_base_seconds, "30");
DEFINE_mInt32(meta_service_idle_connection_timeout_ms, "0");
DEFINE_mInt32(meta_service_rpc_retry_times, "200");
DEFINE_mInt32(meta_service_rpc_retry_times, "20");
DEFINE_mInt32(meta_service_brpc_timeout_ms, "10000");
DEFINE_mInt32(meta_service_rpc_timeout_retry_times, "1");

DEFINE_Int64(tablet_cache_capacity, "100000");
DEFINE_Int64(tablet_cache_shards, "16");
Expand Down Expand Up @@ -81,5 +82,7 @@ DEFINE_mBool(enable_cloud_tablet_report, "true");
DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");

DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");

DEFINE_mInt32(meta_service_conflict_error_retry_times, "10");
#include "common/compile_check_end.h"
} // namespace doris::config
3 changes: 3 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ DECLARE_mInt32(meta_service_rpc_timeout_ms);
DECLARE_mInt32(meta_service_rpc_retry_times);
// default brpc timeout
DECLARE_mInt32(meta_service_brpc_timeout_ms);
DECLARE_mInt32(meta_service_rpc_timeout_retry_times);

// CloudTabletMgr config
DECLARE_Int64(tablet_cache_capacity);
Expand Down Expand Up @@ -115,5 +116,7 @@ DECLARE_mInt32(delete_bitmap_rpc_retry_times);

DECLARE_mInt64(meta_service_rpc_reconnect_interval_ms);

DECLARE_mInt32(meta_service_conflict_error_retry_times);

#include "common/compile_check_end.h"
} // namespace doris::config
6 changes: 6 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "common/bvars.h"

#include <bvar/latency_recorder.h>

#include <cstdint>
#include <stdexcept>

Expand Down Expand Up @@ -86,6 +88,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", "get_cluster_statu
BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", "set_cluster_status");
BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv");
BvarLatencyRecorderWithTag g_bvar_ms_get_schema_dict("ms", "get_schema_dict");

bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
bvar::Window<bvar::Adder<int64_t> > g_bvar_update_delete_bitmap_fail_counter_minute("ms", "update_delete_bitmap_fail", &g_bvar_update_delete_bitmap_fail_counter, 60);
bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
Expand Down Expand Up @@ -118,6 +121,9 @@ bvar::Window<bvar::Adder<int64_t> > g_bvar_txn_kv_commit_error_counter_minute("t
bvar::Adder<int64_t> g_bvar_txn_kv_commit_conflict_counter;
bvar::Window<bvar::Adder<int64_t> > g_bvar_txn_kv_commit_conflict_counter_minute("txn_kv", "commit_conflict", &g_bvar_txn_kv_commit_conflict_counter, 60);

bvar::LatencyRecorder g_bvar_busynesss_reduced_counter("ms", "busynesss_reduced");
bvar::LatencyRecorder g_bvar_busynesss_disable_counter("ms", "busynesss_disable");

// fdb's bvars
const int64_t BVAR_FDB_INVALID_VALUE = -99999999L;
bvar::Status<int64_t> g_bvar_fdb_client_count("fdb_client_count", BVAR_FDB_INVALID_VALUE);
Expand Down
4 changes: 4 additions & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_schema_dict;

extern bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
extern bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;

Expand All @@ -185,6 +186,9 @@ extern bvar::LatencyRecorder g_bvar_txn_kv_get_read_version;
extern bvar::LatencyRecorder g_bvar_txn_kv_get_committed_version;
extern bvar::LatencyRecorder g_bvar_txn_kv_batch_get;

extern bvar::LatencyRecorder g_bvar_busynesss_reduced_counter;
extern bvar::LatencyRecorder g_bvar_busynesss_disable_counter;

extern bvar::Adder<int64_t> g_bvar_txn_kv_commit_error_counter;
extern bvar::Adder<int64_t> g_bvar_txn_kv_commit_conflict_counter;
extern bvar::Adder<int64_t> g_bvar_txn_kv_get_count_normalized;
Expand Down
4 changes: 4 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,8 @@ CONF_Bool(enable_loopback_address_for_ms, "false");
// Which vaults should be recycled. If empty, recycle all vaults.
// Comma seprated list: recycler_storage_vault_white_list="aaa,bbb,ccc"
CONF_Strings(recycler_storage_vault_white_list, "");

CONF_mInt32(retry_reduce_busyness_threshold, "80");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments

CONF_mInt32(retry_disable_busyness_threshold, "90");
CONF_mInt32(busyness_reduced_retry_times, "1");
} // namespace doris::cloud::config
1 change: 1 addition & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <google/protobuf/util/json_util.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/schema.h>
#include <unistd.h>

#include <algorithm>
#include <chrono>
Expand Down
39 changes: 36 additions & 3 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <random>
#include <type_traits>

#include "common/bvars.h"
#include "common/config.h"
#include "cpp/sync_point.h"
#include "meta-service/txn_kv.h"
Expand Down Expand Up @@ -711,6 +712,30 @@ class MetaServiceProxy final : public MetaService {
using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*,
const Request*, Response*,
::google::protobuf::Closure*);
int64_t get_fdb_client_thread_busyness_percent() {
//auto now = steady_clock::now();
auto now = std::chrono::steady_clock::now();
auto duration_s =
duration_cast<std::chrono::seconds>(now - buyness_last_update_time_).count();
if (duration_s > config::bvar_qps_update_second) {
cache_buyness_percent_ = g_bvar_fdb_client_thread_busyness_percent.get_value();
buyness_last_update_time_ = now;
}
return cache_buyness_percent_;
}

int get_dynamic_retry_count() {
int64_t busyness_percent = get_fdb_client_thread_busyness_percent();
if (busyness_percent > config::retry_disable_busyness_threshold) {
g_bvar_busynesss_disable_counter << 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bvar adder should do
promethues can process qps

return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add bvar to record reduce and disable count
consider latency recorder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} else if (busyness_percent > config::retry_reduce_busyness_threshold) {
g_bvar_busynesss_reduced_counter << 1;
return config::busyness_reduced_retry_times;
} else {
return config::txn_store_retry_times;
}
}

template <typename Request, typename Response>
void call_impl(MetaServiceMethod<Request, Response> method,
Expand Down Expand Up @@ -761,7 +786,8 @@ class MetaServiceProxy final : public MetaService {
0, config::txn_store_retry_base_intervals_ms)(rng);
}

if (retry_times >= config::txn_store_retry_times ||
int dynamic_max_retry_cnt = get_dynamic_retry_count();
if (retry_times >= dynamic_max_retry_cnt ||
// Retrying KV_TXN_TOO_OLD is very expensive, so we only retry once.
(retry_times > 1 && code == MetaServiceCode::KV_TXN_TOO_OLD)) {
// For KV_TXN_CONFLICT, we should return KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES,
Expand All @@ -771,7 +797,10 @@ class MetaServiceProxy final : public MetaService {
: code == MetaServiceCode::KV_TXN_STORE_GET_RETRYABLE ? KV_TXN_GET_ERR
: code == MetaServiceCode::KV_TXN_STORE_CREATE_RETRYABLE ? KV_TXN_CREATE_ERR
: code == MetaServiceCode::KV_TXN_CONFLICT
? KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES
? get_fdb_client_thread_busyness_percent() >
config::retry_disable_busyness_threshold
? MetaServiceCode::KV_TXN_CONFLICT_BUSY
: KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES
: MetaServiceCode::KV_TXN_TOO_OLD);
return;
}
Expand All @@ -784,14 +813,18 @@ class MetaServiceProxy final : public MetaService {
retry_times += 1;
LOG(WARNING) << __PRETTY_FUNCTION__ << " sleep " << duration_ms
<< " ms before next round, retry times left: "
<< (config::txn_store_retry_times - retry_times)
<< (dynamic_max_retry_cnt - retry_times)
<< ", max retry count: " << dynamic_max_retry_cnt
<< ", code: " << MetaServiceCode_Name(code)
<< ", msg: " << resp->status().msg();
bthread_usleep(duration_ms * 1000);
}
}

std::unique_ptr<MetaServiceImpl> impl_;
std::chrono::steady_clock::time_point buyness_last_update_time_ =
std::chrono::steady_clock::now() - std::chrono::seconds(100);
int32_t cache_buyness_percent_ = 0;
};

} // namespace doris::cloud
14 changes: 13 additions & 1 deletion fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3109,7 +3109,7 @@ public static boolean isNotCloudMode() {
public static int meta_service_connection_pool_size = 20;

@ConfField(mutable = true)
public static int meta_service_rpc_retry_times = 200;
public static int meta_service_rpc_retry_times = 20;

public static int metaServiceRpcRetryTimes() {
if (isCloudMode() && enable_check_compatibility_mode) {
Expand Down Expand Up @@ -3372,6 +3372,18 @@ public static int metaServiceRpcRetryTimes() {
"Whether to enable the use of ShowCacheHotSpotStmt, default is false."})
public static boolean enable_show_file_cache_hotspot_stmt = false;

@ConfField(mutable = true, description = {"存算分离模式下FE连接meta service的请求超时, 默认10000ms",
"Request timeout for FE connecting to meta service in cloud mode, default is 10000ms."})
public static int meta_service_brpc_timeout_ms = 10000;

@ConfField(mutable = true, description = {"存算分离模式下FE连接meta service的连接超时,默认200ms",
"Connection timeout for FE connecting to meta service in cloud mode., default is 200ms."})
public static int meta_service_brpc_connect_timeout_ms = 200;

@ConfField(mutable = true, description = {"存算分离模式下FE请求meta service超时的重试次数,默认1次",
"In cloud mode, the retry number when the FE requests the meta service times out is 1 by default"})
public static int meta_service_rpc_timeout_retry_times = 1;

// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
Expand Down
Loading