Skip to content

Commit

Permalink
perf: Make every gossip thread use its own randomness instance, reduc…
Browse files Browse the repository at this point in the history
…ing contention (cometbft#3006)

Closes cometbft#3005

As noted in that issue, we currently are doing extra CPU overhead and
mutex contention for getting a random number. This PR removes this
overhead by making every performance sensitive thread have its own rand
instance.

In a subsequent PR, we can cleanup all the testing usages, and likely
just entirely delete our internal randomness package. I didn't do that
in this PR to keep it straightforward to verify.

---

- [x] Tests written/updated
- [ ] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [ ] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec
  • Loading branch information
ValarDragon committed May 26, 2024
1 parent a9d1091 commit f55b9f4
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- [`consensus`] Use an independent rng for gossip threads, reducing mutex contention.
([\#3005](https://github.com/cometbft/cometbft/issues/3005)
45 changes: 25 additions & 20 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"errors"
"fmt"
"math/rand"
"reflect"
"sync"
"time"
Expand All @@ -12,6 +13,7 @@ import (
cmtevents "github.com/cometbft/cometbft/libs/events"
cmtjson "github.com/cometbft/cometbft/libs/json"
"github.com/cometbft/cometbft/libs/log"
cmtrand "github.com/cometbft/cometbft/libs/rand"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus"
Expand Down Expand Up @@ -541,6 +543,7 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState {

func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
rng := cmtrand.NewStdlibRand()

OUTER_LOOP:
for {
Expand All @@ -553,7 +556,7 @@ OUTER_LOOP:

// Send proposal Block parts?
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(rng); ok {
part := rs.ProposalBlockParts.GetPart(index)
parts, err := part.ToProto()
if err != nil {
Expand Down Expand Up @@ -592,7 +595,7 @@ OUTER_LOOP:
// continue the loop since prs is a copy and not effected by this initialization
continue OUTER_LOOP
}
conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer, rng)
continue OUTER_LOOP
}

Expand Down Expand Up @@ -647,9 +650,9 @@ OUTER_LOOP:
}

func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState,
prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer) {
prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer, rng *rand.Rand) {

if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
if index, ok := prs.ProposalBlockParts.Not().PickRandom(rng); ok {
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
Expand Down Expand Up @@ -700,6 +703,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt

func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
rng := cmtrand.NewStdlibRand()

// Simple hack to throttle logs upon sleep.
var sleeping = 0
Expand All @@ -726,15 +730,15 @@ OUTER_LOOP:
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
heightLogger := logger.With("height", prs.Height)
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps, rng) {
continue OUTER_LOOP
}
}

// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ps.PickSendVote(rs.LastCommit) {
if ps.PickSendVote(rng, rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
}
Expand All @@ -747,7 +751,7 @@ OUTER_LOOP:
// Load the block commit for prs.Height,
// which contains precommit signatures for prs.Height.
if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if ps.PickSendVote(commit) {
if ps.PickSendVote(rng, commit) {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
continue OUTER_LOOP
}
Expand All @@ -757,9 +761,9 @@ OUTER_LOOP:
if sleeping == 0 {
// We sent nothing. Sleep...
sleeping = 1
logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
"localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
"localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
// logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
// "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
// "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
} else if sleeping == 2 {
// Continued sleep...
sleeping = 1
Expand All @@ -775,19 +779,20 @@ func (conR *Reactor) gossipVotesForHeight(
rs *cstypes.RoundState,
prs *cstypes.PeerRoundState,
ps *PeerState,
rng *rand.Rand,
) bool {

// If there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
if ps.PickSendVote(rs.LastCommit) {
if ps.PickSendVote(rng, rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send")
return true
}
}
// If there are POL prevotes to send...
if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
if ps.PickSendVote(rng, polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
Expand All @@ -796,29 +801,29 @@ func (conR *Reactor) gossipVotesForHeight(
}
// If there are prevotes to send...
if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are precommits to send...
if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Precommits(prs.Round)) {
logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are prevotes to send...Needed because of validBlock mechanism
if prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
if ps.PickSendVote(rng, polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
Expand Down Expand Up @@ -1132,8 +1137,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in

// PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
func (ps *PeerState) PickSendVote(rng *rand.Rand, votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(rng, votes); ok {
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.SendEnvelope(p2p.Envelope{
ChannelID: VoteChannel,
Expand All @@ -1152,7 +1157,7 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
// PickVoteToSend picks a vote to send to the peer.
// Returns true if a vote was picked.
// NOTE: `votes` must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
func (ps *PeerState) PickVoteToSend(rng *rand.Rand, votes types.VoteSetReader) (vote *types.Vote, ok bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

Expand All @@ -1173,7 +1178,7 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote
if psVotes == nil {
return nil, false // Not something worth sending
}
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(rng); ok {
return votes.GetByIndex(int32(index)), true
}
return nil, false
Expand Down
8 changes: 4 additions & 4 deletions libs/bits/bit_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"encoding/binary"
"fmt"
"math/bits"
"math/rand"
"regexp"
"strings"
"sync"

cmtmath "github.com/cometbft/cometbft/libs/math"
cmtrand "github.com/cometbft/cometbft/libs/rand"
cmtprotobits "github.com/cometbft/cometbft/proto/tendermint/libs/bits"
)

Expand Down Expand Up @@ -261,8 +261,8 @@ func (bA *BitArray) IsFull() bool {

// PickRandom returns a random index for a set bit in the bit array.
// If there is no such value, it returns 0, false.
// It uses the global randomness in `random.go` to get this index.
func (bA *BitArray) PickRandom() (int, bool) {
// It uses the provided randomness to get this index.
func (bA *BitArray) PickRandom(r *rand.Rand) (int, bool) {
if bA == nil {
return 0, false
}
Expand All @@ -273,7 +273,7 @@ func (bA *BitArray) PickRandom() (int, bool) {
bA.mtx.Unlock()
return 0, false
}
index := bA.getNthTrueIndex(cmtrand.Intn(numTrueIndices))
index := bA.getNthTrueIndex(r.Intn(numTrueIndices))
bA.mtx.Unlock()
if index == -1 {
return 0, false
Expand Down
7 changes: 5 additions & 2 deletions libs/bits/bit_array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -17,6 +19,7 @@ var (
empty64Bits = empty16Bits + empty16Bits + empty16Bits + empty16Bits
full16bits = "xxxxxxxxxxxxxxxx"
full64bits = full16bits + full16bits + full16bits + full16bits
grand = rand.New(rand.NewSource(time.Now().UnixNano()))
)

func randBitArray(bits int) *BitArray {
Expand Down Expand Up @@ -139,7 +142,7 @@ func TestPickRandom(t *testing.T) {
var bitArr *BitArray
err := json.Unmarshal([]byte(tc.bA), &bitArr)
require.NoError(t, err)
_, ok := bitArr.PickRandom()
_, ok := bitArr.PickRandom(grand)
require.Equal(t, tc.ok, ok, "PickRandom got an unexpected result on input %s", tc.bA)
}
}
Expand Down Expand Up @@ -395,6 +398,6 @@ func BenchmarkPickRandomBitArray(b *testing.B) {
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = bitArr.PickRandom()
_, _ = bitArr.PickRandom(grand)
}
}
20 changes: 18 additions & 2 deletions libs/rand/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,28 @@ func NewRand() *Rand {
return rand
}

func (r *Rand) init() {
// Make a new stdlib rand source. Its up to the caller to ensure
// that the rand source is not called in parallel.
// The failure mode of calling the returned rand multiple times in parallel is
// repeated values across threads.
func NewStdlibRand() *mrand.Rand {
// G404: Use of weak random number generator (math/rand instead of crypto/rand)
//nolint:gosec
return mrand.New(mrand.NewSource(newSeed()))
}

func newSeed() int64 {
bz := cRandBytes(8)
var seed uint64
for i := 0; i < 8; i++ {
seed |= uint64(bz[i])
seed <<= 8
}
r.reset(int64(seed))
return int64(seed)
}

func (r *Rand) init() {
r.reset(newSeed())
}

func (r *Rand) reset(seed int64) {
Expand Down Expand Up @@ -302,6 +316,8 @@ func (r *Rand) Perm(n int) []int {
// NOTE: This relies on the os's random number generator.
// For real security, we should salt that with some seed.
// See github.com/cometbft/cometbft/crypto for a more secure reader.
// This function is thread safe, see:
// https://stackoverflow.com/questions/75685374/is-golang-crypto-rand-thread-safe
func cRandBytes(numBytes int) []byte {
b := make([]byte, numBytes)
_, err := crand.Read(b)
Expand Down
18 changes: 18 additions & 0 deletions libs/rand/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ func TestRngConcurrencySafety(t *testing.T) {
wg.Wait()
}

// Makes a new stdlib random instance 100 times concurrently.
// Ensures that it is concurrent safe to create rand instances, and call independent rand
// sources in parallel.
func TestStdlibRngConcurrencySafety(_ *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r := NewStdlibRand()
_ = r.Uint64()
<-time.After(time.Millisecond * time.Duration(Intn(100)))
_ = r.Perm(3)
}()
}
wg.Wait()
}

func BenchmarkRandBytes10B(b *testing.B) {
benchmarkRandBytes(b, 10)
}
Expand Down

0 comments on commit f55b9f4

Please sign in to comment.