Skip to content

Commit

Permalink
replica_rac2: inline Replica mutex assertions
Browse files Browse the repository at this point in the history
These are now accomplished via the ReplicaMutexAsserter struct, instead
of an interface.

go build -gcflags "-m -m" produces output like:

./processor.go:1142:46: inlining call to ReplicaMutexAsserter.RaftMuAssertHeld
./processor.go:1143:49: inlining call to ReplicaMutexAsserter.ReplicaMuAssertHeld
./processor.go:1142:46: inlining call to syncutil.(*Mutex).AssertHeld
./processor.go:1143:49: inlining call to syncutil.(*RWMutex).AssertHeld

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola committed Oct 23, 2024
1 parent ee6710a commit be0923c
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 116 deletions.
14 changes: 2 additions & 12 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,19 +455,9 @@ func (n noopReplicaFlowControlIntegration) handle() (kvflowcontrol.Handle, bool)

type replicaForRACv2 Replica

var _ replica_rac2.Replica = &replicaForRACv2{}
var _ replica_rac2.ReplicaForTesting = &replicaForRACv2{}

// RaftMuAssertHeld implements replica_rac2.Replica.
func (r *replicaForRACv2) RaftMuAssertHeld() {
r.raftMu.AssertHeld()
}

// MuAssertHeld implements replica_rac2.Replica.
func (r *replicaForRACv2) MuAssertHeld() {
r.mu.AssertHeld()
}

// IsScratchRange implements replica_rac2.Replica.
// IsScratchRange implements replica_rac2.ReplicaForTesting.
func (r *replicaForRACv2) IsScratchRange() bool {
return (*Replica)(r).IsScratchRange()
}
Expand Down
80 changes: 52 additions & 28 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,8 @@ import (
"github.com/cockroachdb/errors"
)

// Replica abstracts kvserver.Replica. It exposes internal implementation
// details of Replica, specifically the locking behavior, since it is
// essential to reason about correctness.
//
// TODO(sumeer): because the mutex assertions are hidden behind an interface,
// they are not free for production builds. Fix, and then add more assertions.
type Replica interface {
// RaftMuAssertHeld asserts that Replica.raftMu is held.
RaftMuAssertHeld()
// MuAssertHeld asserts that Replica.mu is held.
MuAssertHeld()
// ReplicaForTesting abstracts kvserver.Replica for testing.
type ReplicaForTesting interface {
// IsScratchRange returns true if this is range is a scratch range (i.e.
// overlaps with the scratch span and has a start key <=
// keys.ScratchRangeMin).
Expand Down Expand Up @@ -142,6 +133,37 @@ type RangeControllerFactory interface {
New(ctx context.Context, state rangeControllerInitState) rac2.RangeController
}

// ReplicaMutexAsserter must only be used to assert that mutexes are held.
// This is a concrete struct so that the assertions can be compiled away in
// production code.
type ReplicaMutexAsserter struct {
raftMu *syncutil.Mutex
replicaMu *syncutil.RWMutex
}

func MakeReplicaMutexAsserter(
raftMu *syncutil.Mutex, replicaMu *syncutil.RWMutex,
) ReplicaMutexAsserter {
return ReplicaMutexAsserter{
raftMu: raftMu,
replicaMu: replicaMu,
}
}

// RaftMuAssertHeld asserts that Replica.raftMu is held.
//
// gcassert:inline
func (rmu ReplicaMutexAsserter) RaftMuAssertHeld() {
rmu.raftMu.AssertHeld()
}

// ReplicaMuAssertHeld asserts that Replica.mu is held for writing.
//
// gcassert:inline
func (rmu ReplicaMutexAsserter) ReplicaMuAssertHeld() {
rmu.replicaMu.AssertHeld()
}

// ProcessorOptions are specified when creating a new Processor.
type ProcessorOptions struct {
// Various constant fields that are duplicated from Replica, since we
Expand All @@ -154,7 +176,8 @@ type ProcessorOptions struct {
RangeID roachpb.RangeID
ReplicaID roachpb.ReplicaID

Replica Replica
ReplicaForTesting ReplicaForTesting
ReplicaMutexAsserter ReplicaMutexAsserter
RaftScheduler RaftScheduler
AdmittedPiggybacker AdmittedPiggybacker
ACWorkQueue ACWorkQueue
Expand Down Expand Up @@ -519,8 +542,8 @@ func (p *processorImpl) isLeaderUsingV2ProcLocked() bool {
func (p *processorImpl) InitRaftLocked(
ctx context.Context, rn rac2.RaftInterface, logMark rac2.LogMark,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld()
if p.desc.replicas != nil {
log.Fatalf(ctx, "initializing RaftNode after replica is initialized")
}
Expand All @@ -530,7 +553,7 @@ func (p *processorImpl) InitRaftLocked(

// OnDestroyRaftMuLocked implements Processor.
func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.destroyed = true
p.closeLeaderStateRaftMuLocked(ctx)
// Release some memory.
Expand All @@ -541,7 +564,7 @@ func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {
func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked(
ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel, state RaftNodeBasicState,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if p.destroyed || p.enabledWhenLeader >= level {
return
}
Expand Down Expand Up @@ -572,8 +595,8 @@ func descToReplicaSet(desc *roachpb.RangeDescriptor) rac2.ReplicaSet {
func (p *processorImpl) OnDescChangedLocked(
ctx context.Context, desc *roachpb.RangeDescriptor, tenantID roachpb.TenantID,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld()
initialization := p.desc.replicas == nil
if initialization {
// Replica is initialized, in that we now have a descriptor.
Expand Down Expand Up @@ -754,7 +777,7 @@ func (p *processorImpl) createLeaderStateRaftMuLocked(
func (p *processorImpl) HandleRaftReadyRaftMuLocked(
ctx context.Context, state RaftNodeBasicState, e rac2.RaftEvent,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
// Register all snapshots / log appends without exception. If the replica is
// being destroyed, this should be a no-op, but there is no harm in
// registering the write just in case.
Expand Down Expand Up @@ -786,7 +809,7 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(
p.maybeSendAdmittedRaftMuLocked(ctx)
if rc := p.leader.rc; rc != nil {
if knobs := p.opts.Knobs; knobs == nil || !knobs.UseOnlyForScratchRanges ||
p.opts.Replica.IsScratchRange() {
p.opts.ReplicaForTesting.IsScratchRange() {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
Expand Down Expand Up @@ -873,6 +896,7 @@ func (p *processorImpl) registerStorageAppendRaftMuLocked(ctx context.Context, e

// AdmitRaftEntriesRaftMuLocked implements Processor.
func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2.RaftEvent) bool {
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
// Return false only if we're not destroyed and not using V2.
if p.destroyed || !p.isLeaderUsingV2ProcLocked() {
return p.destroyed
Expand Down Expand Up @@ -1003,7 +1027,7 @@ func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader(

// ProcessPiggybackedAdmittedAtLeaderRaftMuLocked implements Processor.
func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if p.destroyed {
return
}
Expand Down Expand Up @@ -1040,7 +1064,7 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte
func (p *processorImpl) SideChannelForPriorityOverrideAtFollowerRaftMuLocked(
info SideChannelInfoUsingRaftMessageRequest,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if p.destroyed {
return
}
Expand Down Expand Up @@ -1090,7 +1114,7 @@ func (p *processorImpl) AdmittedState() rac2.AdmittedVector {
func (p *processorImpl) AdmitRaftMuLocked(
ctx context.Context, replicaID roachpb.ReplicaID, av rac2.AdmittedVector,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
// NB: rc is always updated while raftMu is held.
if rc := p.leader.rc; rc != nil {
rc.AdmitRaftMuLocked(ctx, replicaID, av)
Expand All @@ -1099,16 +1123,16 @@ func (p *processorImpl) AdmitRaftMuLocked(

// MaybeSendPingsRaftMuLocked implements Processor.
func (p *processorImpl) MaybeSendPingsRaftMuLocked() {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if rc := p.leader.rc; rc != nil {
rc.MaybeSendPingsRaftMuLocked()
}
}

// HoldsSendTokensLocked implements Processor.
func (p *processorImpl) HoldsSendTokensLocked() bool {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld()
if rc := p.leader.rc; rc != nil {
return rc.HoldsSendTokensLocked()
}
Expand Down Expand Up @@ -1142,7 +1166,7 @@ func (p *processorImpl) AdmitForEval(
func (p *processorImpl) ProcessSchedulerEventRaftMuLocked(
ctx context.Context, mode rac2.RaftMsgAppMode, logSnapshot raft.LogSnapshot,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if p.destroyed {
return
}
Expand All @@ -1153,7 +1177,7 @@ func (p *processorImpl) ProcessSchedulerEventRaftMuLocked(

// InspectRaftMuLocked implements Processor.
func (p *processorImpl) InspectRaftMuLocked(ctx context.Context) (kvflowinspectpb.Handle, bool) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.leader.rcReferenceUpdateMu.RLock()
defer p.leader.rcReferenceUpdateMu.RUnlock()
if p.leader.rc == nil {
Expand Down
55 changes: 42 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ import (
)

type testReplica struct {
mu syncutil.RWMutex
raftNode *testRaftNode
b *strings.Builder

leaseholder roachpb.ReplicaID
}

var _ Replica = &testReplica{}
var _ ReplicaForTesting = &testReplica{}

func newTestReplica(b *strings.Builder) *testReplica {
return &testReplica{b: b}
Expand All @@ -57,14 +56,6 @@ func (r *testReplica) initRaft(stable rac2.LogMark) {
}
}

func (r *testReplica) RaftMuAssertHeld() {
fmt.Fprintf(r.b, " Replica.RaftMuAssertHeld\n")
}

func (r *testReplica) MuAssertHeld() {
fmt.Fprintf(r.b, " Replica.MuAssertHeld\n")
}

func (r *testReplica) IsScratchRange() bool {
return true
}
Expand Down Expand Up @@ -249,6 +240,28 @@ func (c *testRangeController) SendStreamStats(stats *rac2.RangeSendStreamStats)
fmt.Fprintf(c.b, " RangeController.SendStreamStats\n")
}

func makeTestMutexAsserter() ReplicaMutexAsserter {
var raftMu syncutil.Mutex
var replicaMu syncutil.RWMutex
return MakeReplicaMutexAsserter(&raftMu, &replicaMu)
}

func LockRaftMuAndReplicaMu(mu *ReplicaMutexAsserter) (unlockFunc func()) {
mu.raftMu.Lock()
mu.replicaMu.Lock()
return func() {
mu.replicaMu.Unlock()
mu.raftMu.Unlock()
}
}

func LockRaftMu(mu *ReplicaMutexAsserter) (unlockFunc func()) {
mu.raftMu.Lock()
return func() {
mu.raftMu.Unlock()
}
}

func TestProcessorBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -264,6 +277,7 @@ func TestProcessorBasic(t *testing.T) {
var rcFactory testRangeControllerFactory
var p *processorImpl
tenantID := roachpb.MustMakeTenantID(4)
muAsserter := makeTestMutexAsserter()
reset := func(enabled kvflowcontrol.V2EnabledWhenLeaderLevel) {
b.Reset()
r = newTestReplica(&b)
Expand All @@ -276,7 +290,8 @@ func TestProcessorBasic(t *testing.T) {
StoreID: 2,
RangeID: 3,
ReplicaID: replicaID,
Replica: r,
ReplicaForTesting: r,
ReplicaMutexAsserter: muAsserter,
RaftScheduler: &sched,
AdmittedPiggybacker: &piggybacker,
ACWorkQueue: &q,
Expand Down Expand Up @@ -311,10 +326,10 @@ func TestProcessorBasic(t *testing.T) {
var mark rac2.LogMark
d.ScanArgs(t, "log-term", &mark.Term)
d.ScanArgs(t, "log-index", &mark.Index)
r.mu.Lock()
r.initRaft(mark)
unlockFunc := LockRaftMuAndReplicaMu(&muAsserter)
p.InitRaftLocked(ctx, r.raftNode, r.raftNode.mark)
r.mu.Unlock()
unlockFunc()
return builderStr()

case "set-raft-state":
Expand Down Expand Up @@ -357,7 +372,9 @@ func TestProcessorBasic(t *testing.T) {
return builderStr()

case "on-destroy":
unlockFunc := LockRaftMu(&muAsserter)
p.OnDestroyRaftMuLocked(ctx)
unlockFunc()
return builderStr()

case "set-enabled-level":
Expand All @@ -372,7 +389,9 @@ func TestProcessorBasic(t *testing.T) {
Leaseholder: r.leaseholder,
}
}
unlockFunc := LockRaftMu(&muAsserter)
p.SetEnabledWhenLeaderRaftMuLocked(ctx, enabledLevel, state)
unlockFunc()
return builderStr()

case "get-enabled-level":
Expand All @@ -382,7 +401,9 @@ func TestProcessorBasic(t *testing.T) {

case "on-desc-changed":
desc := parseRangeDescriptor(t, d)
unlockFunc := LockRaftMuAndReplicaMu(&muAsserter)
p.OnDescChangedLocked(ctx, &desc, tenantID)
unlockFunc()
return builderStr()

case "handle-raft-ready-and-admit":
Expand All @@ -409,6 +430,7 @@ func TestProcessorBasic(t *testing.T) {
Leaseholder: r.leaseholder,
}
}
unlockFunc := LockRaftMu(&muAsserter)
p.HandleRaftReadyRaftMuLocked(ctx, state, event)
fmt.Fprintf(&b, ".....\n")
if len(event.Entries) > 0 {
Expand All @@ -417,6 +439,7 @@ func TestProcessorBasic(t *testing.T) {
fmt.Fprintf(&b, "destroyed-or-leader-using-v2: %t\n", destroyedOrV2)
printLogTracker()
}
unlockFunc()
return builderStr()

case "enqueue-piggybacked-admitted":
Expand All @@ -440,7 +463,9 @@ func TestProcessorBasic(t *testing.T) {
return builderStr()

case "process-piggybacked-admitted":
unlockFunc := LockRaftMu(&muAsserter)
p.ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx)
unlockFunc()
return builderStr()

case "side-channel":
Expand All @@ -464,7 +489,9 @@ func TestProcessorBasic(t *testing.T) {
Last: last,
LowPriOverride: lowPriOverride,
}
unlockFunc := LockRaftMu(&muAsserter)
p.SideChannelForPriorityOverrideAtFollowerRaftMuLocked(info)
unlockFunc()
return builderStr()

case "admitted-log-entry":
Expand Down Expand Up @@ -502,7 +529,9 @@ func TestProcessorBasic(t *testing.T) {
return builderStr()

case "inspect":
unlockFunc := LockRaftMu(&muAsserter)
p.InspectRaftMuLocked(ctx)
unlockFunc()
return builderStr()

case "send-stream-stats":
Expand Down
Loading

0 comments on commit be0923c

Please sign in to comment.