Skip to content

Commit

Permalink
[#10519] Reset master leader on meta cache timeouts
Browse files Browse the repository at this point in the history
Summary:
When GetTableLocation times out, MetaCache checks whether whole operation timed out or not.
If it is already timed out, then it just fail, w/o resetting master leader.
It results in situation where we always try to connect to non responding master leader.

Fixed by resetting master leader in scenario above.

Test Plan: CqlTest.NonRespondingMaster

Reviewers: bogdan, timur

Reviewed By: timur

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D13819
  • Loading branch information
spolitov committed Nov 9, 2021
1 parent 061493a commit b472191
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 41 deletions.
56 changes: 31 additions & 25 deletions src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ DEFINE_test_flag(string, assert_tablet_server_select_is_in_zone, "",
"Verify that SelectTServer selected a talet server in the AZ specified by this "
"flag.");

DECLARE_int64(reset_master_leader_timeout_ms);

DECLARE_string(flagfile);

namespace yb {
Expand Down Expand Up @@ -1218,7 +1220,7 @@ class ClientMasterRpc : public Rpc {

virtual void ProcessResponse(const Status& status) = 0;

void ResetLeaderMasterAndRetry();
void ResetMasterLeader(Retry retry);

void NewLeaderMasterDeterminedCb(const Status& status);

Expand Down Expand Up @@ -1364,13 +1366,14 @@ void ClientMasterRpc<Req, Resp>::SendRpc() {
}

template <class Req, class Resp>
void ClientMasterRpc<Req, Resp>::ResetLeaderMasterAndRetry() {
void ClientMasterRpc<Req, Resp>::ResetMasterLeader(Retry retry) {
client_->data_->SetMasterServerProxyAsync(
retrier().deadline(),
retry ? retrier().deadline()
: CoarseMonoClock::now() + FLAGS_reset_master_leader_timeout_ms * 1ms,
false /* skip_resolution */,
true, /* wait for leader election */
Bind(&ClientMasterRpc::NewLeaderMasterDeterminedCb,
Unretained(this)));
retry ? std::bind(&ClientMasterRpc::NewLeaderMasterDeterminedCb, this, _1)
: StdStatusCallback([](auto){}));
}

template <class Req, class Resp>
Expand All @@ -1394,44 +1397,47 @@ void ClientMasterRpc<Req, Resp>::Finished(const Status& status) {
if (new_status.ok() && resp_.has_error()) {
if (resp_.error().code() == MasterErrorPB::NOT_THE_LEADER ||
resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
LOG(WARNING) << "Leader Master has changed ("
LOG(WARNING) << resp_.GetTypeName() << ": Leader Master has changed ("
<< client_->data_->leader_master_hostport().ToString()
<< " is no longer the leader), re-trying...";
ResetLeaderMasterAndRetry();
ResetMasterLeader(Retry::kTrue);
return;
}

if (resp_.error().status().code() == AppStatusPB::LEADER_NOT_READY_TO_SERVE ||
resp_.error().status().code() == AppStatusPB::LEADER_HAS_NO_LEASE) {
LOG(WARNING) << "Leader Master " << client_->data_->leader_master_hostport().ToString()
LOG(WARNING) << resp_.GetTypeName() << ": Leader Master "
<< client_->data_->leader_master_hostport().ToString()
<< " does not have a valid exclusive lease: "
<< resp_.error().status().ShortDebugString() << ", re-trying...";
ResetLeaderMasterAndRetry();
ResetMasterLeader(Retry::kTrue);
return;
}
VLOG(2) << "resp.error().status()=" << resp_.error().status().DebugString();
new_status = StatusFromPB(resp_.error().status());
}

if (new_status.IsTimedOut()) {
if (CoarseMonoClock::Now() < retrier().deadline()) {
LOG(WARNING) << "Leader Master ("
auto now = CoarseMonoClock::Now();
if (now < retrier().deadline()) {
LOG(WARNING) << resp_.GetTypeName() << ": Leader Master ("
<< client_->data_->leader_master_hostport().ToString()
<< ") timed out, re-trying...";
ResetLeaderMasterAndRetry();
<< ") timed out, " << MonoDelta(retrier().deadline() - now) << " left, re-trying...";
ResetMasterLeader(Retry::kTrue);
return;
} else {
// Operation deadline expired during this latest RPC.
new_status = new_status.CloneAndPrepend(
"RPC timed out after deadline expired");
ResetMasterLeader(Retry::kFalse);
}
}

if (new_status.IsNetworkError()) {
LOG(WARNING) << "Encountered a network error from the Master("
LOG(WARNING) << resp_.GetTypeName() << ": Encountered a network error from the Master("
<< client_->data_->leader_master_hostport().ToString() << "): "
<< new_status.ToString() << ", retrying...";
ResetLeaderMasterAndRetry();
ResetMasterLeader(Retry::kTrue);
return;
}

Expand Down Expand Up @@ -2162,10 +2168,10 @@ void YBClient::Data::LeaderMasterDetermined(const Status& status,
VLOG(4) << "YBClient: Leader master determined: status="
<< status.ToString() << ", host port ="
<< host_port.ToString();
std::vector<StatusCallback> cbs;
std::vector<StdStatusCallback> callbacks;
{
std::lock_guard<simple_spinlock> l(leader_master_lock_);
cbs.swap(leader_master_callbacks_);
callbacks.swap(leader_master_callbacks_);

if (new_status.ok()) {
leader_master_hostport_ = host_port;
Expand All @@ -2175,8 +2181,8 @@ void YBClient::Data::LeaderMasterDetermined(const Status& status,
rpcs_.Unregister(&leader_master_rpc_);
}

for (const StatusCallback& cb : cbs) {
cb.Run(new_status);
for (const auto& callback : callbacks) {
callback(new_status);
}
}

Expand All @@ -2186,14 +2192,14 @@ Status YBClient::Data::SetMasterServerProxy(CoarseTimePoint deadline,

Synchronizer sync;
SetMasterServerProxyAsync(deadline, skip_resolution,
wait_for_leader_election, sync.AsStatusCallback());
wait_for_leader_election, sync.AsStdStatusCallback());
return sync.Wait();
}

void YBClient::Data::SetMasterServerProxyAsync(CoarseTimePoint deadline,
bool skip_resolution,
bool wait_for_leader_election,
const StatusCallback& cb) {
const StdStatusCallback& callback) {
DCHECK(deadline != CoarseTimePoint::max());

server::MasterAddresses master_addrs;
Expand All @@ -2202,19 +2208,19 @@ void YBClient::Data::SetMasterServerProxyAsync(CoarseTimePoint deadline,
{
std::lock_guard<simple_spinlock> l(master_server_addrs_lock_);
if (!s.ok() && full_master_server_addrs_.empty()) {
cb.Run(s);
callback(s);
return;
}
for (const string &master_server_addr : full_master_server_addrs_) {
std::vector<HostPort> addrs;
// TODO: Do address resolution asynchronously as well.
s = HostPort::ParseStrings(master_server_addr, master::kMasterDefaultPort, &addrs);
if (!s.ok()) {
cb.Run(s);
callback(s);
return;
}
if (addrs.empty()) {
cb.Run(STATUS_FORMAT(
callback(STATUS_FORMAT(
InvalidArgument,
"No master address specified by '$0' (all master server addresses: $1)",
master_server_addr, full_master_server_addrs_));
Expand All @@ -2237,7 +2243,7 @@ void YBClient::Data::SetMasterServerProxyAsync(CoarseTimePoint deadline,
// Instead, we simply piggy-back onto the existing request by adding our own
// callback to leader_master_callbacks_.
std::unique_lock<simple_spinlock> l(leader_master_lock_);
leader_master_callbacks_.push_back(cb);
leader_master_callbacks_.push_back(callback);
if (skip_resolution && !master_addrs.empty() && !master_addrs.front().empty()) {
l.unlock();
LeaderMasterDetermined(Status::OK(), master_addrs.front().front());
Expand Down
6 changes: 4 additions & 2 deletions src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class MasterServiceProxy;

namespace client {

YB_STRONGLY_TYPED_BOOL(Retry);

class YBClient::Data {
public:
Data();
Expand Down Expand Up @@ -328,7 +330,7 @@ class YBClient::Data {
void SetMasterServerProxyAsync(CoarseTimePoint deadline,
bool skip_resolution,
bool wait_for_leader_election,
const StatusCallback& cb);
const StdStatusCallback& cb);

// Synchronous version of SetMasterServerProxyAsync method above.
//
Expand Down Expand Up @@ -435,7 +437,7 @@ class YBClient::Data {
// itself, as to avoid a "use-after-free" scenario.
rpc::Rpcs rpcs_;
rpc::Rpcs::Handle leader_master_rpc_;
std::vector<StatusCallback> leader_master_callbacks_;
std::vector<StdStatusCallback> leader_master_callbacks_;

// Protects 'leader_master_rpc_', 'leader_master_hostport_',
// and master_proxy_
Expand Down
32 changes: 19 additions & 13 deletions src/yb/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@
using std::map;
using std::shared_ptr;
using strings::Substitute;
using namespace std::literals; // NOLINT
using namespace std::literals;
using namespace std::placeholders;

DEFINE_int32(max_concurrent_master_lookups, 500,
"Maximum number of concurrent tablet location lookups from YB client to master");
Expand All @@ -87,6 +88,9 @@ DEFINE_int64(meta_cache_lookup_throttling_step_ms, 5,
DEFINE_int64(meta_cache_lookup_throttling_max_delay_ms, 1000,
"Max delay between calls during lookup throttling.");

DEFINE_int64(reset_master_leader_timeout_ms, 15000,
"Timeout to reset master leader in milliseconds.");

DEFINE_test_flag(bool, force_master_lookup_all_tablets, false,
"If set, force the client to go to the master for all tablet lookup "
"instead of reading from cache.");
Expand Down Expand Up @@ -659,7 +663,7 @@ class LookupRpc : public Rpc, public RequestCleanup {
MetaCache* meta_cache() { return meta_cache_.get(); }
YBClient* client() const { return meta_cache_->client_; }

void ResetMasterLeaderAndRetry();
void ResetMasterLeader(Retry retry);

virtual void NotifyFailure(const Status& status) = 0;

Expand Down Expand Up @@ -770,13 +774,14 @@ void LookupRpc::SendRpc() {
DoSendRpc();
}

void LookupRpc::ResetMasterLeaderAndRetry() {
void LookupRpc::ResetMasterLeader(Retry retry) {
client()->data_->SetMasterServerProxyAsync(
retrier().deadline(),
retry ? retrier().deadline()
: CoarseMonoClock::now() + FLAGS_reset_master_leader_timeout_ms * 1ms,
false /* skip_resolution */,
true /* wait for leader election */,
Bind(&LookupRpc::NewLeaderMasterDeterminedCb,
Unretained(this)));
retry ? std::bind(&LookupRpc::NewLeaderMasterDeterminedCb, this, _1)
: StdStatusCallback([](auto){}));
}

void LookupRpc::NewLeaderMasterDeterminedCb(const Status& status) {
Expand Down Expand Up @@ -831,8 +836,8 @@ void LookupRpc::DoFinished(const Status& status, const Response& resp) {
if (resp.error().code() == master::MasterErrorPB::NOT_THE_LEADER ||
resp.error().code() == master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
if (client()->IsMultiMaster()) {
YB_LOG_EVERY_N_SECS(WARNING, 1) << "Leader Master has changed, re-trying...";
ResetMasterLeaderAndRetry();
YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1) << "Leader Master has changed, re-trying...";
ResetMasterLeader(Retry::kTrue);
} else {
ScheduleRetry(new_status);
}
Expand All @@ -842,19 +847,20 @@ void LookupRpc::DoFinished(const Status& status, const Response& resp) {

if (new_status.IsTimedOut()) {
if (CoarseMonoClock::Now() < retrier().deadline()) {
YB_LOG_EVERY_N_SECS(WARNING, 1) << "Leader Master timed out, re-trying...";
ResetMasterLeaderAndRetry();
YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1) << "Leader Master timed out, re-trying...";
ResetMasterLeader(Retry::kTrue);
return;
} else {
// Operation deadline expired during this latest RPC.
new_status = new_status.CloneAndPrepend("timed out after deadline expired");
ResetMasterLeader(Retry::kFalse);
}
}

if (new_status.IsNetworkError() || new_status.IsRemoteError()) {
YB_LOG_EVERY_N_SECS(WARNING, 1) << "Encountered a error from the Master: "
<< new_status << ", retrying...";
ResetMasterLeaderAndRetry();
YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1)
<< "Encountered a error from the Master: " << new_status << ", retrying...";
ResetMasterLeader(Retry::kTrue);
return;
}

Expand Down
34 changes: 33 additions & 1 deletion src/yb/integration-tests/cql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "yb/consensus/raft_consensus.h"

#include "yb/master/catalog_manager.h"
#include "yb/master/mini_master.h"

#include "yb/util/random_util.h"
Expand All @@ -23,11 +24,13 @@

using namespace std::literals;

DECLARE_bool(TEST_timeout_non_leader_get_table_locations);
DECLARE_int64(cql_processors_limit);
DECLARE_int32(client_read_write_timeout_ms);

DECLARE_string(TEST_fail_to_fast_resolve_address);
DECLARE_int32(partitions_vtable_cache_refresh_secs);
DECLARE_int32(client_read_write_timeout_ms);

namespace yb {

Expand Down Expand Up @@ -250,7 +253,7 @@ Status CheckNumAddressesInYqlPartitionsTable(CassandraSession* session, int expe
return Status::OK();
}

TEST_F(CqlThreeMastersTest, HostnameResolutionFailureInYqlPartitionsTable) {
TEST_F_EX(CqlTest, HostnameResolutionFailureInYqlPartitionsTable, CqlThreeMastersTest) {
google::FlagSaver flag_saver;
auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
ASSERT_OK(CheckNumAddressesInYqlPartitionsTable(&session, 3));
Expand All @@ -272,5 +275,34 @@ TEST_F(CqlThreeMastersTest, HostnameResolutionFailureInYqlPartitionsTable) {
ASSERT_OK(CheckNumAddressesInYqlPartitionsTable(&session, 2));
}

TEST_F_EX(CqlTest, NonRespondingMaster, CqlThreeMastersTest) {
FLAGS_TEST_timeout_non_leader_get_table_locations = true;
auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
ASSERT_OK(session.ExecuteQuery("CREATE TABLE t1 (i INT PRIMARY KEY, j INT)"));
ASSERT_OK(session.ExecuteQuery("INSERT INTO t1 (i, j) VALUES (1, 1)"));
ASSERT_OK(session.ExecuteQuery("CREATE TABLE t2 (i INT PRIMARY KEY, j INT)"));

LOG(INFO) << "Prepare";
auto prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t2 (i, j) VALUES (?, ?)"));
LOG(INFO) << "Step down";
auto peer = ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()->catalog_manager()
->tablet_peer();
ASSERT_OK(StepDown(peer, std::string(), ForceStepDown::kTrue));
LOG(INFO) << "Insert";
FLAGS_client_read_write_timeout_ms = 5000;
bool has_ok = false;
for (int i = 0; i != 3; ++i) {
auto stmt = prepared.Bind();
stmt.Bind(0, i);
stmt.Bind(1, 1);
auto status = session.Execute(stmt);
if (status.ok()) {
has_ok = true;
break;
}
ASSERT_NE(status.message().ToBuffer().find("timed out"), std::string::npos) << status;
}
ASSERT_TRUE(has_ok);
}

} // namespace yb
9 changes: 9 additions & 0 deletions src/yb/master/master_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ DEFINE_int32(tablet_report_limit, 1000,
"If this is set to INT32_MAX, then heartbeat will report all dirty tablets.");
TAG_FLAG(tablet_report_limit, advanced);

DEFINE_test_flag(bool, timeout_non_leader_get_table_locations, false,
"Timeout all GetTableLocations requests to non leader.");

DECLARE_CAPABILITY(TabletReportLimit);
DECLARE_int32(heartbeat_rpc_timeout_ms);

Expand Down Expand Up @@ -393,6 +396,12 @@ void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
RpcContext rpc) {
// We can't use the HANDLE_ON_LEADER_WITH_LOCK macro here because we have to inject latency
// before acquiring the leader lock.
if (FLAGS_TEST_timeout_non_leader_get_table_locations) {
ScopedLeaderSharedLock lock(server_->catalog_manager(), __FILE__, __LINE__, __func__);
if (!lock.leader_status().ok()) {
std::this_thread::sleep_until(rpc.GetClientDeadline());
}
}
HandleOnLeader(req, resp, &rpc, [&]() -> Status {
if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
Expand Down

0 comments on commit b472191

Please sign in to comment.