Skip to content

Commit

Permalink
ENG-2200: A race condition in TabletToFlush; also a concurrency fix i…
Browse files Browse the repository at this point in the history
…n permanent_uuid()

Summary:
- In TSTabletManager::TabletToFlush() we need to check if the tablet pointer within a TabletPeer
  is actually initialized. This might not be the case if the table is still being opened.
  Also we probably need to increment the shared pointer refcount while doing so.
- The TabletPeer::permanent_uuid() function did not have any synchronization.
- Also renaming TabletPeer::Init() -> InitTabletPeer() for better grep-ability.

Test Plan: Jenkins

Reviewers: karthik, bogdan, pritam.damania, sergei

Reviewed By: pritam.damania, sergei

Subscribers: bharat, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D3154
  • Loading branch information
mbautin committed Oct 13, 2017
1 parent 282cc6e commit 6a78735
Show file tree
Hide file tree
Showing 22 changed files with 244 additions and 182 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ java/yb-loadtester/src/main/csharp/*/packages/
# These files might appear when applying patches.
*.orig
*.rej

.tags
.tags_sorted_by_file
cscope.out
6 changes: 2 additions & 4 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,8 @@ MAKE_ENUM_LIMITS(yb::client::YBScanner::OrderMode,
DEFINE_int32(yb_num_shards_per_tserver, yb::NonTsanVsTsan(8, 2),
"The default number of shards per table per tablet server when a table is created.");

DEFINE_int32(yb_num_total_tablets, 0,
"The total number of tablets per table when a table is created. (For testing only!)");
TAG_FLAG(yb_num_total_tablets, unsafe);
TAG_FLAG(yb_num_total_tablets, hidden);
DEFINE_test_flag(int32, yb_num_total_tablets, 0,
"The total number of tablets per table when a table is created.");

namespace yb {
namespace client {
Expand Down
16 changes: 7 additions & 9 deletions src/yb/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,16 @@ TAG_FLAG(consensus_rpc_timeout_ms, advanced);

DECLARE_int32(raft_heartbeat_interval_ms);

DEFINE_double(fault_crash_on_leader_request_fraction, 0.0,
"Fraction of the time when the leader will crash just before sending an "
"UpdateConsensus RPC. (For testing only!)");
TAG_FLAG(fault_crash_on_leader_request_fraction, unsafe);
DEFINE_test_flag(double, fault_crash_on_leader_request_fraction, 0.0,
"Fraction of the time when the leader will crash just before sending an "
"UpdateConsensus RPC.");

// Allow for disabling remote bootstrap in unit tests where we want to test
// certain scenarios without triggering bootstrap of a remote peer.
DEFINE_bool(enable_remote_bootstrap, true,
"Whether remote bootstrap will be initiated by the leader when it "
"detects that a follower is out of date or does not have a tablet "
"replica. For testing purposes only.");
TAG_FLAG(enable_remote_bootstrap, unsafe);
DEFINE_test_flag(bool, enable_remote_bootstrap, true,
"Whether remote bootstrap will be initiated by the leader when it "
"detects that a follower is out of date or does not have a tablet "
"replica.");

namespace yb {
namespace consensus {
Expand Down
7 changes: 3 additions & 4 deletions src/yb/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,12 @@ DEFINE_int32(log_inject_latency_ms_mean, 100,
DEFINE_int32(log_inject_latency_ms_stddev, 100,
"The standard deviation of latency to inject in the log. "
"Only takes effect if --log_inject_latency is true");
DEFINE_double(fault_crash_before_append_commit, 0.0,
"Fraction of the time when the server will crash just before appending a "
"COMMIT message to the log. (For testing only!)");
DEFINE_test_flag(double, fault_crash_before_append_commit, 0.0,
"Fraction of the time when the server will crash just before appending a "
"COMMIT message to the log.");
TAG_FLAG(log_inject_latency, unsafe);
TAG_FLAG(log_inject_latency_ms_mean, unsafe);
TAG_FLAG(log_inject_latency_ms_stddev, unsafe);
TAG_FLAG(fault_crash_before_append_commit, unsafe);

// Validate that log_min_segments_to_retain >= 1
static bool ValidateLogsToRetain(const char* flagname, int value) {
Expand Down
6 changes: 2 additions & 4 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,8 @@ METRIC_DEFINE_gauge_uint32(cluster, num_tablet_servers_live,
"in the time interval defined by the gflag "
"FLAGS_tserver_unresponsive_timeout_ms.");

DEFINE_uint64(inject_latency_during_remote_bootstrap_secs, 0,
"Number of seconds to sleep during a remote bootstrap. (For testing only!)");
TAG_FLAG(inject_latency_during_remote_bootstrap_secs, unsafe);
TAG_FLAG(inject_latency_during_remote_bootstrap_secs, hidden);
DEFINE_test_flag(uint64, inject_latency_during_remote_bootstrap_secs, 0,
"Number of seconds to sleep during a remote bootstrap.");

DEFINE_string(cluster_uuid, "", "Cluster UUID to be used by this cluster");
TAG_FLAG(cluster_uuid, hidden);
Expand Down
12 changes: 6 additions & 6 deletions src/yb/master/sys_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,12 @@ Status SysCatalogTable::OpenTablet(const scoped_refptr<tablet::TabletMetadata>&
// TODO: Do we have a setSplittable(false) or something from the outside is
// handling split in the TS?

RETURN_NOT_OK_PREPEND(tablet_peer_->Init(tablet,
nullptr,
scoped_refptr<server::Clock>(master_->clock()),
master_->messenger(),
log,
tablet->GetMetricEntity()),
RETURN_NOT_OK_PREPEND(tablet_peer_->InitTabletPeer(tablet,
nullptr,
scoped_refptr<server::Clock>(master_->clock()),
master_->messenger(),
log,
tablet->GetMetricEntity()),
"Failed to Init() TabletPeer");

RETURN_NOT_OK_PREPEND(tablet_peer_->Start(consensus_info),
Expand Down
6 changes: 2 additions & 4 deletions src/yb/tablet/operations/write_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@
#include "yb/util/locks.h"
#include "yb/util/trace.h"

DEFINE_int32(tablet_inject_latency_on_apply_write_txn_ms, 0,
"How much latency to inject when a write operation is applied. "
"For testing only!");
TAG_FLAG(tablet_inject_latency_on_apply_write_txn_ms, unsafe);
DEFINE_test_flag(int32, tablet_inject_latency_on_apply_write_txn_ms, 0,
"How much latency to inject when a write operation is applied.");
TAG_FLAG(tablet_inject_latency_on_apply_write_txn_ms, runtime);

namespace yb {
Expand Down
8 changes: 3 additions & 5 deletions src/yb/tablet/tablet_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ DEFINE_bool(skip_remove_old_recovery_dir, false,
"Skip removing WAL recovery dir after startup. (useful for debugging)");
TAG_FLAG(skip_remove_old_recovery_dir, hidden);

DEFINE_double(fault_crash_during_log_replay, 0.0,
"Fraction of the time when the tablet will crash immediately "
"after processing a log entry during log replay. "
"(For testing only!)");
TAG_FLAG(fault_crash_during_log_replay, unsafe);
DEFINE_test_flag(double, fault_crash_during_log_replay, 0.0,
"Fraction of the time when the tablet will crash immediately "
"after processing a log entry during log replay.");

DECLARE_uint64(max_clock_sync_error_usec);

Expand Down
12 changes: 6 additions & 6 deletions src/yb/tablet/tablet_peer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ class TabletPeerTest : public YBTabletTest,
tablet()->metadata()->schema_version(), metric_entity_.get(), &log));

tablet_peer_->SetBootstrapping();
ASSERT_OK(tablet_peer_->Init(tablet(),
nullptr /* client */,
clock(),
messenger_,
log,
metric_entity_));
ASSERT_OK(tablet_peer_->InitTabletPeer(tablet(),
nullptr /* client */,
clock(),
messenger_,
log,
metric_entity_));
}

Status StartPeer(const ConsensusBootstrapInfo& info) {
Expand Down
42 changes: 30 additions & 12 deletions src/yb/tablet/tablet_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
#include "yb/util/trace.h"

using std::shared_ptr;
using std::string;

namespace yb {
namespace tablet {
Expand Down Expand Up @@ -143,12 +144,12 @@ TabletPeer::~TabletPeer() {
<< TabletStatePB_Name(state_);
}

Status TabletPeer::Init(const shared_ptr<TabletClass>& tablet,
const client::YBClientPtr& client,
const scoped_refptr<server::Clock>& clock,
const shared_ptr<Messenger>& messenger,
const scoped_refptr<Log>& log,
const scoped_refptr<MetricEntity>& metric_entity) {
Status TabletPeer::InitTabletPeer(const shared_ptr<TabletClass> &tablet,
const client::YBClientPtr &client,
const scoped_refptr<server::Clock> &clock,
const shared_ptr<Messenger> &messenger,
const scoped_refptr<Log> &log,
const scoped_refptr<MetricEntity> &metric_entity) {

DCHECK(tablet) << "A TabletPeer must be provided with a Tablet";
DCHECK(log) << "A TabletPeer must be provided with a Log";
Expand Down Expand Up @@ -626,6 +627,21 @@ Status TabletPeer::StartReplicaOperation(const scoped_refptr<ConsensusRound>& ro
return Status::OK();
}

string TabletPeer::permanent_uuid() const {
if (cached_permanent_uuid_initialized_.load(std::memory_order_acquire)) {
return cached_permanent_uuid_;
}
std::lock_guard<simple_spinlock> lock(lock_);
if (tablet_ == nullptr)
return "";

if (!cached_permanent_uuid_initialized_.load(std::memory_order_acquire)) {
cached_permanent_uuid_ = tablet_->metadata()->fs_manager()->uuid();
cached_permanent_uuid_initialized_.store(std::memory_order_release);
}
return cached_permanent_uuid_;
}

Status TabletPeer::NewOperationDriver(std::unique_ptr<Operation> operation,
consensus::DriverType type,
scoped_refptr<OperationDriver>* driver) {
Expand All @@ -649,13 +665,15 @@ void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {

DCHECK(maintenance_ops_.empty());

gscoped_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
maint_mgr->RegisterOp(mrs_flush_op.get());
maintenance_ops_.push_back(mrs_flush_op.release());
if (table_type() == TableType::KUDU_COLUMNAR_TABLE_TYPE) {
gscoped_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
maint_mgr->RegisterOp(mrs_flush_op.get());
maintenance_ops_.push_back(mrs_flush_op.release());

gscoped_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
maint_mgr->RegisterOp(dms_flush_op.get());
maintenance_ops_.push_back(dms_flush_op.release());
gscoped_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
maint_mgr->RegisterOp(dms_flush_op.get());
maintenance_ops_.push_back(dms_flush_op.release());
}

gscoped_ptr<MaintenanceOp> log_gc(new LogGCOp(this));
maint_mgr->RegisterOp(log_gc.get());
Expand Down
25 changes: 11 additions & 14 deletions src/yb/tablet/tablet_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,

// Initializes the TabletPeer, namely creating the Log and initializing
// Consensus.
CHECKED_STATUS Init(const std::shared_ptr<TabletClass>& tablet,
const client::YBClientPtr& client,
const scoped_refptr<server::Clock>& clock,
const std::shared_ptr<rpc::Messenger>& messenger,
const scoped_refptr<log::Log>& log,
const scoped_refptr<MetricEntity>& metric_entity);
CHECKED_STATUS InitTabletPeer(const std::shared_ptr<TabletClass> &tablet,
const client::YBClientPtr &client,
const scoped_refptr<server::Clock> &clock,
const std::shared_ptr<rpc::Messenger> &messenger,
const scoped_refptr<log::Log> &log,
const scoped_refptr<MetricEntity> &metric_entity);

// Starts the TabletPeer, making it available for Write()s. If this
// TabletPeer is part of a consensus configuration this will connect it to other peers
Expand Down Expand Up @@ -171,11 +171,6 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
// Returns the current Raft configuration.
const consensus::RaftConfigPB RaftConfig() const;

// If any peers in the consensus configuration lack permanent uuids, get them via an
// RPC call and update.
// TODO: move this to raft_consensus.h.
CHECKED_STATUS UpdatePermanentUuids();

TabletStatusListener* status_listener() const {
return status_listener_.get();
}
Expand Down Expand Up @@ -255,9 +250,7 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
const std::string& tablet_id() const override { return tablet_id_; }

// Convenience method to return the permanent_uuid of this peer.
std::string permanent_uuid() const {
return tablet_ != nullptr ? tablet_->metadata()->fs_manager()->uuid() : "";
}
std::string permanent_uuid() const;

CHECKED_STATUS NewOperationDriver(std::unique_ptr<Operation> operation,
consensus::DriverType type,
Expand Down Expand Up @@ -371,6 +364,10 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
// can provide.
std::vector<MaintenanceOp*> maintenance_ops_;

// Cache the permanent of the tablet UUID to retrieve it without a lock in the common case.
mutable std::atomic<bool> cached_permanent_uuid_initialized_ { false };
mutable std::string cached_permanent_uuid_;

private:
DISALLOW_COPY_AND_ASSIGN(TabletPeer);
};
Expand Down
13 changes: 5 additions & 8 deletions src/yb/tserver/remote_bootstrap_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,11 @@ TAG_FLAG(committed_config_change_role_timeout_sec, hidden);

DECLARE_int32(rpc_max_message_size);

DEFINE_double(fault_crash_bootstrap_client_before_changing_role, 0.0,
"The remote bootstrap client will crash before closing the session with the leader. "
"Because the session won't be closed successfully, the leader won't issue a "
"ChangeConfig request to change this tserver role *(from PRE_VOTER or PRE_OBSERVER "
"to VOTER or OBSERVER respectively). "
"For testing only!)");
TAG_FLAG(fault_crash_bootstrap_client_before_changing_role, unsafe);
TAG_FLAG(fault_crash_bootstrap_client_before_changing_role, hidden);
DEFINE_test_flag(double, fault_crash_bootstrap_client_before_changing_role, 0.0,
"The remote bootstrap client will crash before closing the session with the "
"leader. Because the session won't be closed successfully, the leader won't issue "
"a ChangeConfig request to change this tserver role *(from PRE_VOTER or "
"PRE_OBSERVER to VOTER or OBSERVER respectively).");

// RETURN_NOT_OK_PREPEND() with a remote-error unwinding step.
#define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \
Expand Down
50 changes: 20 additions & 30 deletions src/yb/tserver/remote_bootstrap_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,36 +78,26 @@ DEFINE_uint64(remote_bootstrap_timeout_poll_period_ms, 10000,
"remote bootstrap sessions, in millis");
TAG_FLAG(remote_bootstrap_timeout_poll_period_ms, hidden);

DEFINE_double(fault_crash_on_handle_rb_fetch_data, 0.0,
"Fraction of the time when the tablet will crash while "
"servicing a RemoteBootstrapService FetchData() RPC call. "
"(For testing only!)");
TAG_FLAG(fault_crash_on_handle_rb_fetch_data, unsafe);

DEFINE_uint64(inject_latency_before_change_role_secs, 0,
"Number of seconds to sleep before we call ChangeRole. "
"(For testing only!)");
TAG_FLAG(inject_latency_before_change_role_secs, unsafe);

DEFINE_bool(skip_change_role, false,
"When set, we don't call ChangeRole after successfully finishing a remote bootstrap. "
"(For testing only!)");
TAG_FLAG(skip_change_role, unsafe);

DEFINE_double(fault_crash_leader_before_changing_role, 0.0,
"The leader will crash before changing the role (from PRE_VOTER or PRE_OBSERVER to "
"VOTER or OBSERVER respectively) of the tablet server it is remote bootstrapping. "
"For testing only!)");
TAG_FLAG(fault_crash_leader_before_changing_role, unsafe);
TAG_FLAG(fault_crash_leader_before_changing_role, hidden);

DEFINE_double(fault_crash_leader_after_changing_role, 0.0,
"The leader will crash after successfully sending a ChangeConfig (CHANGE_ROLE from "
"PRE_VOTER or PRE_OBSERVER to VOTER or OBSERVER respectively) for the tablet server "
"it is remote bootstrapping, but before it sends a success response."
"For testing only!)");
TAG_FLAG(fault_crash_leader_after_changing_role, unsafe);
TAG_FLAG(fault_crash_leader_after_changing_role, hidden);
DEFINE_test_flag(double, fault_crash_on_handle_rb_fetch_data, 0.0,
"Fraction of the time when the tablet will crash while "
"servicing a RemoteBootstrapService FetchData() RPC call.");

DEFINE_test_flag(uint64, inject_latency_before_change_role_secs, 0,
"Number of seconds to sleep before we call ChangeRole.");

DEFINE_test_flag(bool, skip_change_role, false,
"When set, we don't call ChangeRole after successfully finishing a remote "
"bootstrap.");

DEFINE_test_flag(double, fault_crash_leader_before_changing_role, 0.0,
"The leader will crash before changing the role (from PRE_VOTER or PRE_OBSERVER "
"to VOTER or OBSERVER respectively) of the tablet server it is remote "
"bootstrapping.");

DEFINE_test_flag(double, fault_crash_leader_after_changing_role, 0.0,
"The leader will crash after successfully sending a ChangeConfig (CHANGE_ROLE "
"from PRE_VOTER or PRE_OBSERVER to VOTER or OBSERVER respectively) for the tablet "
"server it is remote bootstrapping, but before it sends a success response.");

namespace yb {
namespace tserver {
Expand Down
12 changes: 6 additions & 6 deletions src/yb/tserver/remote_bootstrap_session-test.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ class RemoteBootstrapTest : public YBTabletTest {

log_anchor_registry_.reset(new LogAnchorRegistry());
tablet_peer_->SetBootstrapping();
CHECK_OK(tablet_peer_->Init(tablet(),
nullptr /* client */,
clock(),
messenger,
log,
metric_entity));
CHECK_OK(tablet_peer_->InitTabletPeer(tablet(),
nullptr /* client */,
clock(),
messenger,
log,
metric_entity));
consensus::ConsensusBootstrapInfo boot_info;
CHECK_OK(tablet_peer_->Start(boot_info));

Expand Down
18 changes: 9 additions & 9 deletions src/yb/tserver/tablet_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ namespace yb {
namespace tserver {

TabletServer::TabletServer(const TabletServerOptions& opts)
: RpcAndWebServerBase("TabletServer", opts, "yb.tabletserver"),
initted_(false),
fail_heartbeats_for_tests_(false),
opts_(opts),
tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())),
scanner_manager_(new ScannerManager(metric_entity())),
path_handlers_(new TabletServerPathHandlers(this)),
maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)),
master_config_index_(0) {
: RpcAndWebServerBase("TabletServer", opts, "yb.tabletserver"),
initted_(false),
fail_heartbeats_for_tests_(false),
opts_(opts),
tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())),
scanner_manager_(new ScannerManager(metric_entity())),
path_handlers_(new TabletServerPathHandlers(this)),
maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)),
master_config_index_(0) {
}

TabletServer::~TabletServer() {
Expand Down
Loading

0 comments on commit 6a78735

Please sign in to comment.