From f55b9f415e23e7ad4e02f6de4d1cb96e5e604f12 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Thu, 9 May 2024 00:14:58 -0700 Subject: [PATCH] perf: Make every gossip thread use its own randomness instance, reducing contention (#3006) Closes #3005 As noted in that issue, we currently are doing extra CPU overhead and mutex contention for getting a random number. This PR removes this overhead by making every performance sensitive thread have its own rand instance. In a subsequent PR, we can cleanup all the testing usages, and likely just entirely delete our internal randomness package. I didn't do that in this PR to keep it straightforward to verify. --- - [x] Tests written/updated - [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments - [ ] Title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec --- ...thread-independent-randomness-in-gossip.md | 2 + consensus/reactor.go | 45 ++++++++++--------- libs/bits/bit_array.go | 8 ++-- libs/bits/bit_array_test.go | 7 ++- libs/rand/random.go | 20 ++++++++- libs/rand/random_test.go | 18 ++++++++ 6 files changed, 72 insertions(+), 28 deletions(-) create mode 100644 .changelog/unreleased/improvements/3006-use-thread-independent-randomness-in-gossip.md diff --git a/.changelog/unreleased/improvements/3006-use-thread-independent-randomness-in-gossip.md b/.changelog/unreleased/improvements/3006-use-thread-independent-randomness-in-gossip.md new file mode 100644 index 0000000000..290da0b1c8 --- /dev/null +++ b/.changelog/unreleased/improvements/3006-use-thread-independent-randomness-in-gossip.md @@ -0,0 +1,2 @@ +- [`consensus`] Use an independent rng for gossip threads, reducing mutex contention. + ([\#3005](https://github.com/cometbft/cometbft/issues/3005) diff --git a/consensus/reactor.go b/consensus/reactor.go index 1c5de7f1f6..bf7642bd49 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -3,6 +3,7 @@ package consensus import ( "errors" "fmt" + "math/rand" "reflect" "sync" "time" @@ -12,6 +13,7 @@ import ( cmtevents "github.com/cometbft/cometbft/libs/events" cmtjson "github.com/cometbft/cometbft/libs/json" "github.com/cometbft/cometbft/libs/log" + cmtrand "github.com/cometbft/cometbft/libs/rand" cmtsync "github.com/cometbft/cometbft/libs/sync" "github.com/cometbft/cometbft/p2p" cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus" @@ -541,6 +543,7 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState { func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + rng := cmtrand.NewStdlibRand() OUTER_LOOP: for { @@ -553,7 +556,7 @@ OUTER_LOOP: // Send proposal Block parts? if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) { - if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { + if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(rng); ok { part := rs.ProposalBlockParts.GetPart(index) parts, err := part.ToProto() if err != nil { @@ -592,7 +595,7 @@ OUTER_LOOP: // continue the loop since prs is a copy and not effected by this initialization continue OUTER_LOOP } - conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer) + conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer, rng) continue OUTER_LOOP } @@ -647,9 +650,9 @@ OUTER_LOOP: } func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState, - prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer) { + prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer, rng *rand.Rand) { - if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { + if index, ok := prs.ProposalBlockParts.Not().PickRandom(rng); ok { // Ensure that the peer's PartSetHeader is correct blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) if blockMeta == nil { @@ -700,6 +703,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + rng := cmtrand.NewStdlibRand() // Simple hack to throttle logs upon sleep. var sleeping = 0 @@ -726,7 +730,7 @@ OUTER_LOOP: // If height matches, then send LastCommit, Prevotes, Precommits. if rs.Height == prs.Height { heightLogger := logger.With("height", prs.Height) - if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) { + if conR.gossipVotesForHeight(heightLogger, rs, prs, ps, rng) { continue OUTER_LOOP } } @@ -734,7 +738,7 @@ OUTER_LOOP: // Special catchup logic. // If peer is lagging by height 1, send LastCommit. if prs.Height != 0 && rs.Height == prs.Height+1 { - if ps.PickSendVote(rs.LastCommit) { + if ps.PickSendVote(rng, rs.LastCommit) { logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) continue OUTER_LOOP } @@ -747,7 +751,7 @@ OUTER_LOOP: // Load the block commit for prs.Height, // which contains precommit signatures for prs.Height. if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil { - if ps.PickSendVote(commit) { + if ps.PickSendVote(rng, commit) { logger.Debug("Picked Catchup commit to send", "height", prs.Height) continue OUTER_LOOP } @@ -757,9 +761,9 @@ OUTER_LOOP: if sleeping == 0 { // We sent nothing. Sleep... sleeping = 1 - logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height, - "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, - "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits) + // logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height, + // "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, + // "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits) } else if sleeping == 2 { // Continued sleep... sleeping = 1 @@ -775,11 +779,12 @@ func (conR *Reactor) gossipVotesForHeight( rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, + rng *rand.Rand, ) bool { // If there are lastCommits to send... if prs.Step == cstypes.RoundStepNewHeight { - if ps.PickSendVote(rs.LastCommit) { + if ps.PickSendVote(rng, rs.LastCommit) { logger.Debug("Picked rs.LastCommit to send") return true } @@ -787,7 +792,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are POL prevotes to send... if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { + if ps.PickSendVote(rng, polPrevotes) { logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) return true @@ -796,21 +801,21 @@ func (conR *Reactor) gossipVotesForHeight( } // If there are prevotes to send... if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) { logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) return true } } // If there are precommits to send... if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { + if ps.PickSendVote(rng, rs.Votes.Precommits(prs.Round)) { logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round) return true } } // If there are prevotes to send...Needed because of validBlock mechanism if prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) { logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) return true } @@ -818,7 +823,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are POLPrevotes to send... if prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { + if ps.PickSendVote(rng, polPrevotes) { logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) return true @@ -1132,8 +1137,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in // PickSendVote picks a vote and sends it to the peer. // Returns true if vote was sent. -func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { - if vote, ok := ps.PickVoteToSend(votes); ok { +func (ps *PeerState) PickSendVote(rng *rand.Rand, votes types.VoteSetReader) bool { + if vote, ok := ps.PickVoteToSend(rng, votes); ok { ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) if ps.peer.SendEnvelope(p2p.Envelope{ ChannelID: VoteChannel, @@ -1152,7 +1157,7 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { // PickVoteToSend picks a vote to send to the peer. // Returns true if a vote was picked. // NOTE: `votes` must be the correct Size() for the Height(). -func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) { +func (ps *PeerState) PickVoteToSend(rng *rand.Rand, votes types.VoteSetReader) (vote *types.Vote, ok bool) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1173,7 +1178,7 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote if psVotes == nil { return nil, false // Not something worth sending } - if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok { + if index, ok := votes.BitArray().Sub(psVotes).PickRandom(rng); ok { return votes.GetByIndex(int32(index)), true } return nil, false diff --git a/libs/bits/bit_array.go b/libs/bits/bit_array.go index f9744f9c7b..e186b05b8d 100644 --- a/libs/bits/bit_array.go +++ b/libs/bits/bit_array.go @@ -4,12 +4,12 @@ import ( "encoding/binary" "fmt" "math/bits" + "math/rand" "regexp" "strings" "sync" cmtmath "github.com/cometbft/cometbft/libs/math" - cmtrand "github.com/cometbft/cometbft/libs/rand" cmtprotobits "github.com/cometbft/cometbft/proto/tendermint/libs/bits" ) @@ -261,8 +261,8 @@ func (bA *BitArray) IsFull() bool { // PickRandom returns a random index for a set bit in the bit array. // If there is no such value, it returns 0, false. -// It uses the global randomness in `random.go` to get this index. -func (bA *BitArray) PickRandom() (int, bool) { +// It uses the provided randomness to get this index. +func (bA *BitArray) PickRandom(r *rand.Rand) (int, bool) { if bA == nil { return 0, false } @@ -273,7 +273,7 @@ func (bA *BitArray) PickRandom() (int, bool) { bA.mtx.Unlock() return 0, false } - index := bA.getNthTrueIndex(cmtrand.Intn(numTrueIndices)) + index := bA.getNthTrueIndex(r.Intn(numTrueIndices)) bA.mtx.Unlock() if index == -1 { return 0, false diff --git a/libs/bits/bit_array_test.go b/libs/bits/bit_array_test.go index 99b1d97a34..04ee4547bc 100644 --- a/libs/bits/bit_array_test.go +++ b/libs/bits/bit_array_test.go @@ -4,7 +4,9 @@ import ( "bytes" "encoding/json" "fmt" + "math/rand" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,6 +19,7 @@ var ( empty64Bits = empty16Bits + empty16Bits + empty16Bits + empty16Bits full16bits = "xxxxxxxxxxxxxxxx" full64bits = full16bits + full16bits + full16bits + full16bits + grand = rand.New(rand.NewSource(time.Now().UnixNano())) ) func randBitArray(bits int) *BitArray { @@ -139,7 +142,7 @@ func TestPickRandom(t *testing.T) { var bitArr *BitArray err := json.Unmarshal([]byte(tc.bA), &bitArr) require.NoError(t, err) - _, ok := bitArr.PickRandom() + _, ok := bitArr.PickRandom(grand) require.Equal(t, tc.ok, ok, "PickRandom got an unexpected result on input %s", tc.bA) } } @@ -395,6 +398,6 @@ func BenchmarkPickRandomBitArray(b *testing.B) { require.NoError(b, err) b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = bitArr.PickRandom() + _, _ = bitArr.PickRandom(grand) } } diff --git a/libs/rand/random.go b/libs/rand/random.go index 49b12a3b20..0919e122da 100644 --- a/libs/rand/random.go +++ b/libs/rand/random.go @@ -37,14 +37,28 @@ func NewRand() *Rand { return rand } -func (r *Rand) init() { +// Make a new stdlib rand source. Its up to the caller to ensure +// that the rand source is not called in parallel. +// The failure mode of calling the returned rand multiple times in parallel is +// repeated values across threads. +func NewStdlibRand() *mrand.Rand { + // G404: Use of weak random number generator (math/rand instead of crypto/rand) + //nolint:gosec + return mrand.New(mrand.NewSource(newSeed())) +} + +func newSeed() int64 { bz := cRandBytes(8) var seed uint64 for i := 0; i < 8; i++ { seed |= uint64(bz[i]) seed <<= 8 } - r.reset(int64(seed)) + return int64(seed) +} + +func (r *Rand) init() { + r.reset(newSeed()) } func (r *Rand) reset(seed int64) { @@ -302,6 +316,8 @@ func (r *Rand) Perm(n int) []int { // NOTE: This relies on the os's random number generator. // For real security, we should salt that with some seed. // See github.com/cometbft/cometbft/crypto for a more secure reader. +// This function is thread safe, see: +// https://stackoverflow.com/questions/75685374/is-golang-crypto-rand-thread-safe func cRandBytes(numBytes int) []byte { b := make([]byte, numBytes) _, err := crand.Read(b) diff --git a/libs/rand/random_test.go b/libs/rand/random_test.go index 10bb601b5e..1be112e1c9 100644 --- a/libs/rand/random_test.go +++ b/libs/rand/random_test.go @@ -83,6 +83,24 @@ func TestRngConcurrencySafety(t *testing.T) { wg.Wait() } +// Makes a new stdlib random instance 100 times concurrently. +// Ensures that it is concurrent safe to create rand instances, and call independent rand +// sources in parallel. +func TestStdlibRngConcurrencySafety(_ *testing.T) { + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + r := NewStdlibRand() + _ = r.Uint64() + <-time.After(time.Millisecond * time.Duration(Intn(100))) + _ = r.Perm(3) + }() + } + wg.Wait() +} + func BenchmarkRandBytes10B(b *testing.B) { benchmarkRandBytes(b, 10) }