Skip to content

Commit

Permalink
[yugabyte#19365] DocDB: Enable thread safety for members passed by re…
Browse files Browse the repository at this point in the history
…ference

Summary:
Setting `Wthread-safety-reference`. This checks when guarded members are passed by reference.

Fixing up all build errors that arise from enabling this extra thread safety protection.

Fixes yugabyte#19365
Jira: DB-8165

Test Plan: All tests

Reviewers: mbautin, timur

Reviewed By: mbautin

Subscribers: ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D28968
  • Loading branch information
hari90 committed Oct 5, 2023
1 parent 7ffe88d commit f858054
Show file tree
Hide file tree
Showing 24 changed files with 155 additions and 99 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ if(IS_CLANG)
ADD_CXX_FLAGS("-Werror=implicit-fallthrough")
ADD_CXX_FLAGS("-D_LIBCPP_ENABLE_THREAD_SAFETY_ANNOTATIONS")
ADD_CXX_FLAGS("-Wthread-safety-analysis")
ADD_CXX_FLAGS("-Wthread-safety-reference")
ADD_CXX_FLAGS("-Wshorten-64-to-32")
endif()

Expand Down
16 changes: 10 additions & 6 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1181,12 +1181,11 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
void DoCommit(
CoarseTimePoint deadline, SealOnly seal_only, const Status& status,
const YBTransactionPtr& transaction) EXCLUDES(mutex_) {
UniqueLock lock(mutex_);
VLOG_WITH_PREFIX(1)
<< Format("Commit, seal_only: $0, tablets: $1, status: $2",
seal_only, tablets_, status);

UniqueLock lock(mutex_);

if (!status.ok()) {
VLOG_WITH_PREFIX(4) << "Commit failed: " << status;
auto commit_callback = std::move(commit_callback_);
Expand Down Expand Up @@ -1511,9 +1510,9 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
client::UseCache::kTrue);
}

void LookupTabletDone(const Result<client::internal::RemoteTabletPtr>& result,
const YBTransactionPtr& transaction,
TransactionPromoting promoting) {
void LookupTabletDone(
const Result<client::internal::RemoteTabletPtr>& result, const YBTransactionPtr& transaction,
TransactionPromoting promoting) EXCLUDES(mutex_) {
TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(1) << "Lookup tablet done: " << yb::ToString(result);

Expand All @@ -1527,7 +1526,12 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {

if (status == TransactionStatus::ABORTED) {
DCHECK(promoting);
SendAbortToOldStatusTabletIfNeeded(TransactionRpcDeadline(), transaction, old_status_tablet_);
decltype(old_status_tablet_) old_status_tablet;
{
SharedLock lock(mutex_);
old_status_tablet = old_status_tablet_;
}
SendAbortToOldStatusTabletIfNeeded(TransactionRpcDeadline(), transaction, old_status_tablet);
} else {
SendHeartbeat(status, metadata_.transaction_id, transaction_->shared_from_this(),
SendHeartbeatToNewTablet(promoting));
Expand Down
5 changes: 5 additions & 0 deletions src/yb/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,11 @@ Status Log::GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segment
return Status::OK();
}

Status Log::GetSegmentsToGC(int64_t min_op_idx, SegmentSequence* segments_to_gc) const {
PerCpuRwSharedLock read_lock(state_lock_);
return GetSegmentsToGCUnlocked(min_op_idx, segments_to_gc);
}

void Log::ApplyTimeRetentionPolicy(SegmentSequence* segments_to_gc) const {
// Don't GC segments that are newer than the configured time-based retention.
int64_t now = GetCurrentTimeMicros() + FLAGS_time_based_wal_gc_clock_delta_usec;
Expand Down
5 changes: 4 additions & 1 deletion src/yb/consensus/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,10 @@ class Log : public RefCountedThreadSafe<Log> {
Status UpdateSegmentReadableOffset() EXCLUDES(active_segment_mutex_);

// Helper method to get the segment sequence to GC based on the provided min_op_idx.
Status GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segments_to_gc) const;
Status GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segments_to_gc) const
REQUIRES_SHARED(state_lock_);
Status GetSegmentsToGC(int64_t min_op_idx, SegmentSequence* segments_to_gc) const
EXCLUDES(state_lock_);

// Discards segments from 'segments_to_gc' if they have not yet met the minimim retention time.
void ApplyTimeRetentionPolicy(SegmentSequence* segments_to_gc) const;
Expand Down
16 changes: 8 additions & 8 deletions src/yb/integration-tests/cdc_service-int-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1788,16 +1788,16 @@ TEST_F(CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime) {
// Since we haven't updated the minimum cdc index, and the elapsed time is less than
// kMaxSecondsToRetain, no log files should be returned.
log::SegmentSequence segment_sequence;
ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(),
&segment_sequence));
ASSERT_OK(
tablet_peer->log()->GetSegmentsToGC(std::numeric_limits<int64_t>::max(), &segment_sequence));
ASSERT_EQ(segment_sequence.size(), 0);
LOG(INFO) << "No segments to be GCed because less than " << kMaxSecondsToRetain
<< " seconds have elapsed";

SleepFor(time_to_sleep);

ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(),
&segment_sequence));
ASSERT_OK(
tablet_peer->log()->GetSegmentsToGC(std::numeric_limits<int64_t>::max(), &segment_sequence));
ASSERT_GT(segment_sequence.size(), 0);
const auto& segments_violate =
*(tablet_peer->log()->reader_->TEST_segments_violate_max_time_policy_);
Expand Down Expand Up @@ -1979,14 +1979,14 @@ TEST_F(CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace) {
}

log::SegmentSequence segment_sequence;
ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(),
&segment_sequence));
ASSERT_OK(
tablet_peer->log()->GetSegmentsToGC(std::numeric_limits<int64_t>::max(), &segment_sequence));
ASSERT_EQ(segment_sequence.size(), 0);

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_simulate_free_space_bytes) = 128;

ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(),
&segment_sequence));
ASSERT_OK(
tablet_peer->log()->GetSegmentsToGC(std::numeric_limits<int64_t>::max(), &segment_sequence));
ASSERT_GT(segment_sequence.size(), 0);
const auto& segments_violate =
*(tablet_peer->log()->reader_->TEST_segments_violate_min_space_policy_);
Expand Down
9 changes: 5 additions & 4 deletions src/yb/master/backfill_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,7 @@ Status BackfillTable::LaunchComputeSafeTimeForRead() {
SCHECK(!res->is_special(), InvalidArgument, "Invalid xCluster safe time for namespace ",
indexed_table_->namespace_id());

LOG_WITH_PREFIX(INFO) << "Using xCluster safe time " << read_time_for_backfill_
<< " as the backfill read time";
LOG_WITH_PREFIX(INFO) << "Using xCluster safe time " << *res << " as the backfill read time";
return SetSafeTimeAndStartBackfill(*res);
} else {
if (res.status().IsNotFound()) {
Expand Down Expand Up @@ -958,8 +957,10 @@ Status BackfillTable::DoBackfill() {
LOG(INFO) << Format("Blocking $0 for $1", __func__, kSpinWait);
SleepFor(kSpinWait);
}
VLOG_WITH_PREFIX(1) << "starting backfill with timestamp: "
<< read_time_for_backfill_;
if (VLOG_IS_ON(1)) {
std::lock_guard l(mutex_);
VLOG_WITH_PREFIX(1) << "starting backfill with timestamp: " << read_time_for_backfill_;
}

num_tablets_.store(tablets.size(), std::memory_order_release);
tablets_pending_.store(tablets.size(), std::memory_order_release);
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/backfill_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class BackfillTable : public std::enable_shared_from_this<BackfillTable> {
void LaunchBackfillOrAbort();
Status WaitForTabletSplitting();
Status DoLaunchBackfill();
Status LaunchComputeSafeTimeForRead();
Status LaunchComputeSafeTimeForRead() EXCLUDES(mutex_);
Status DoBackfill();

Status MarkAllIndexesAsFailed();
Expand Down
5 changes: 3 additions & 2 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2660,7 +2660,7 @@ void CatalogManager::RefreshTablespaceInfoPeriodically() {
return;
}

LeaderEpoch epoch(-1);
LeaderEpoch epoch;
{
SCOPED_LEADER_SHARED_LOCK(l, this);
if (!l.IsInitializedAndIsLeader()) {
Expand Down Expand Up @@ -9830,12 +9830,13 @@ Status CatalogManager::GetUDTypeInfo(const GetUDTypeInfoRequestPB* req,
}

if (req->type().has_type_id()) {
SharedLock lock(mutex_);
tp = FindPtrOrNull(udtype_ids_map_, req->type().type_id());
} else if (req->type().has_type_name() && req->type().has_namespace_()) {
// Lookup the type and verify if it exists.
TRACE("Looking up namespace");
ns = VERIFY_NAMESPACE_FOUND(FindNamespace(req->type().namespace_()), resp);

SharedLock lock(mutex_);
tp = FindPtrOrNull(udtype_names_map_, std::make_pair(ns->id(), req->type().type_name()));
}

Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
Status EnableBgTasks();

// Helper function for RebuildYQLSystemPartitions to get the system.partitions tablet.
Status GetYQLPartitionsVTable(std::shared_ptr<SystemTablet>* tablet);
Status GetYQLPartitionsVTable(std::shared_ptr<SystemTablet>* tablet) REQUIRES(mutex_);
// Background task for automatically rebuilding system.partitions every
// partitions_vtable_cache_refresh_secs seconds.
void RebuildYQLSystemPartitions();
Expand Down
3 changes: 1 addition & 2 deletions src/yb/master/cluster_balance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ bool ClusterLoadBalancer::IsLoadBalancerEnabled() const {
ClusterLoadBalancer::ClusterLoadBalancer(CatalogManager* cm)
: random_(GetRandomSeed32()),
is_enabled_(FLAGS_enable_load_balancing),
cbuf_activities_(FLAGS_load_balancer_num_idle_runs),
epoch_(LeaderEpoch(-1)) {
cbuf_activities_(FLAGS_load_balancer_num_idle_runs) {
ResetGlobalState(false /* initialize_ts_descs */);

catalog_manager_ = cm;
Expand Down
2 changes: 2 additions & 0 deletions src/yb/master/leader_epoch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct LeaderEpoch {
: leader_term(term), pitr_count(pitr_count) {}

explicit LeaderEpoch(int64_t term) : LeaderEpoch(term, 0) {}

LeaderEpoch() : LeaderEpoch(-1, 0) {}
};

} // namespace master
Expand Down
41 changes: 22 additions & 19 deletions src/yb/master/master_snapshot_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "yb/util/status_format.h"
#include "yb/util/status_log.h"
#include "yb/util/stopwatch.h"
#include "yb/util/unique_lock.h"

using std::vector;
using std::string;
Expand Down Expand Up @@ -143,7 +144,7 @@ struct NoOp {
// Finds appropriate entry in passed collection and invokes Done on it.
template <class Collection, class PostProcess = NoOp>
auto MakeDoneCallback(
std::mutex* mutex, const Collection& collection, const typename Collection::key_type& key,
std::mutex* mutex, const Collection* collection, const typename Collection::key_type& key,
const TabletId& tablet_id, const PostProcess& post_process = PostProcess()) {
struct DoneFunctor {
std::mutex& mutex;
Expand All @@ -165,12 +166,12 @@ auto MakeDoneCallback(
}
};

return DoneFunctor {
.mutex = *mutex,
.collection = collection,
.key = key,
.tablet_id = tablet_id,
.post_process = post_process,
return DoneFunctor{
.mutex = *mutex,
.collection = *collection,
.key = key,
.tablet_id = tablet_id,
.post_process = post_process,
};
}

Expand Down Expand Up @@ -335,27 +336,26 @@ class MasterSnapshotCoordinator::Impl {
return Status::OK();
}

void UpdateSnapshotIfPresent(const TxnSnapshotId& id, int64_t leader_term)
NO_THREAD_SAFETY_ANALYSIS EXCLUDES(mutex_) {
std::unique_lock<std::mutex> lock(mutex_);
void UpdateSnapshotIfPresent(const TxnSnapshotId& id, int64_t leader_term) EXCLUDES(mutex_) {
UniqueLock lock(mutex_);
auto it = snapshots_.find(id);
if (it != snapshots_.end()) {
UpdateSnapshot(it->get(), leader_term, &lock);
UpdateSnapshot(it->get(), leader_term, &GetLockForCondition(&lock));
}
}

template <class Pb, class Map>
Status LoadEntryOfType(
tablet::Tablet* tablet, const SysRowEntryType& type, Map* m) REQUIRES(mutex_) {
return EnumerateSysCatalog(tablet, context_.schema(), type,
[this, &m](const Slice& id, const Slice& data) NO_THREAD_SAFETY_ANALYSIS -> Status {
[this, &m](const Slice& id, const Slice& data) REQUIRES(mutex_) -> Status {
return LoadEntry<Pb>(id, data, m);
});
}

Status LoadSnapshotEntry(tablet::Tablet* tablet) REQUIRES(mutex_) {
return EnumerateSysCatalog(tablet, context_.schema(), SysRowEntryType::SNAPSHOT,
[this](const Slice& id, const Slice& data) NO_THREAD_SAFETY_ANALYSIS -> Status {
[this](const Slice& id, const Slice& data) REQUIRES(mutex_) -> Status {
RETURN_NOT_OK(LoadEntry<SysSnapshotEntryPB>(id, data, &snapshots_));
auto snapshot_id = TryFullyDecodeTxnSnapshotId(id);
UpdateCoveringMap(snapshot_id);
Expand Down Expand Up @@ -1262,7 +1262,7 @@ class MasterSnapshotCoordinator::Impl {
const TabletSnapshotOperation& operation, const TabletInfoPtr& tablet_info,
int64_t leader_term) {
auto callback = MakeDoneCallback(
&mutex_, snapshots_, operation.snapshot_id, operation.tablet_id,
&mutex_, &snapshots_, operation.snapshot_id, operation.tablet_id,
std::bind(&Impl::UpdateSnapshot, this, _1, leader_term, _2));
if (!tablet_info) {
callback(STATUS_EC_FORMAT(NotFound, MasterError(MasterErrorPB::TABLET_NOT_RUNNING),
Expand Down Expand Up @@ -1332,7 +1332,7 @@ class MasterSnapshotCoordinator::Impl {
const TabletRestoreOperation& operation, const TabletInfoPtr& tablet_info,
int64_t leader_term) {
auto callback = MakeDoneCallback(
&mutex_, restorations_, operation.restoration_id, operation.tablet_id,
&mutex_, &restorations_, operation.restoration_id, operation.tablet_id,
std::bind(&Impl::FinishRestoration, this, _1, leader_term));
if (!tablet_info) {
callback(STATUS_EC_FORMAT(
Expand Down Expand Up @@ -1429,10 +1429,11 @@ class MasterSnapshotCoordinator::Impl {
}
r->PrepareOperations(&restore_operations, tablets_snapshot, db_oid);
}
for (const auto& id : cleanup_snapshots) {
CleanupObject(leader_term, id, snapshots_, EncodedSnapshotKey(id, &context_));
}
}
for (const auto& id : cleanup_snapshots) {
CleanupObject(leader_term, id, snapshots_, EncodedSnapshotKey(id, &context_));
}

ExecuteOperations(operations, leader_term);
PollSchedulesComplete(schedules_data, l.epoch());
ExecuteRestoreOperations(restore_operations, leader_term);
Expand Down Expand Up @@ -1504,11 +1505,13 @@ class MasterSnapshotCoordinator::Impl {
WARN_NOT_OK(ExecuteScheduleOperation(operation, epoch.leader_term),
Format("Failed to execute operation on $0", operation.schedule_id));
break;
case SnapshotScheduleOperationType::kCleanup:
case SnapshotScheduleOperationType::kCleanup: {
std::lock_guard l(mutex_);
CleanupObject(
epoch.leader_term, operation.schedule_id, schedules_,
SnapshotScheduleState::EncodedKey(operation.schedule_id, &context_));
break;
}
default:
LOG(DFATAL) << "Unexpected operation type: " << operation.type;
break;
Expand Down
1 change: 0 additions & 1 deletion src/yb/master/scoped_leader_shared_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ ScopedLeaderSharedLock::ScopedLeaderSharedLock(
: catalog_(DCHECK_NOTNULL(catalog)),
leader_shared_lock_(catalog->leader_lock_, std::try_to_lock),
start_(std::chrono::steady_clock::now()),
epoch_(LeaderEpoch(-1)),
file_name_(file_name),
line_number_(line_number),
function_name_(function_name) {
Expand Down
14 changes: 9 additions & 5 deletions src/yb/master/ysql_backends_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ Status BackendsCatalogVersionJob::Launch(LeaderEpoch epoch) {
std::lock_guard l(mutex_);

// Commit term now.
epoch_ = std::move(epoch);
epoch_ = epoch;

for (const auto& ts_desc : descs) {
if (!ts_desc->IsLive()) {
Expand All @@ -489,7 +489,7 @@ Status BackendsCatalogVersionJob::Launch(LeaderEpoch epoch) {
}

for (const auto& ts_uuid : ts_uuids) {
RETURN_NOT_OK(LaunchTS(ts_uuid, -1 /* num_lagging_backends */, epoch_));
RETURN_NOT_OK(LaunchTS(ts_uuid, -1 /* num_lagging_backends */, epoch));
}

return Status::OK();
Expand All @@ -498,7 +498,7 @@ Status BackendsCatalogVersionJob::Launch(LeaderEpoch epoch) {
Status BackendsCatalogVersionJob::LaunchTS(
TabletServerId ts_uuid, int num_lagging_backends, const LeaderEpoch& epoch) {
auto task = std::make_shared<BackendsCatalogVersionTS>(
shared_from_this(), ts_uuid, num_lagging_backends, epoch_);
shared_from_this(), ts_uuid, num_lagging_backends, epoch);
Status s = threadpool()->SubmitFunc([this, &ts_uuid, task]() {
Status s = task->Run();
if (!s.ok()) {
Expand Down Expand Up @@ -610,12 +610,14 @@ void BackendsCatalogVersionJob::Update(TabletServerId ts_uuid, Result<int> num_l
auto s = num_lagging_backends.status();
if (s.IsTryAgain()) {
int last_known_num_lagging_backends;
LeaderEpoch epoch;
{
std::lock_guard l(mutex_);
last_known_num_lagging_backends = ts_map_[ts_uuid];
epoch = epoch_;
}
// Ignore returned status since it is already logged/handled.
(void)LaunchTS(ts_uuid, last_known_num_lagging_backends, epoch_);
(void)LaunchTS(ts_uuid, last_known_num_lagging_backends, epoch);
} else {
LOG_WITH_PREFIX(WARNING) << "got bad status " << s.ToString() << " from TS " << ts_uuid;
master_->ysql_backends_manager()->TerminateJob(
Expand All @@ -626,8 +628,10 @@ void BackendsCatalogVersionJob::Update(TabletServerId ts_uuid, Result<int> num_l
DCHECK_GE(*num_lagging_backends, 0);

// Update num_lagging_backends.
LeaderEpoch epoch;
{
std::lock_guard l(mutex_);
epoch = epoch_;

#ifndef NDEBUG
if (ts_map_[ts_uuid] != -1) {
Expand All @@ -644,7 +648,7 @@ void BackendsCatalogVersionJob::Update(TabletServerId ts_uuid, Result<int> num_l
VLOG_WITH_PREFIX(2) << "still waiting on " << *num_lagging_backends << " backends of TS "
<< ts_uuid;
// Ignore returned status since it is already logged/handled.
(void)LaunchTS(ts_uuid, *num_lagging_backends, epoch_);
(void)LaunchTS(ts_uuid, *num_lagging_backends, epoch);
return;
}
DCHECK_EQ(*num_lagging_backends, 0);
Expand Down
Loading

0 comments on commit f858054

Please sign in to comment.