Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Make every gossip thread use its own randomness instance, reduc… #77

Merged
merged 3 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- [`consensus`] Use an independent rng for gossip threads, reducing mutex contention.
([\#3005](https://github.com/cometbft/cometbft/issues/3005)
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
* [#74](https://github.com/osmosis-labs/cometbft/pull/74) perf(consensus): Minor speedup to mark late vote metrics
* [#75](https://github.com/osmosis-labs/cometbft/pull/75) perf(p2p): 4% speedup to readMsg by removing one allocation
* [#76](https://github.com/osmosis-labs/cometbft/pull/76) perf(consensus): Add LRU caches for blockstore operations used in gossip
* [#77](https://github.com/osmosis-labs/cometbft/pull/77) perf(consensus): Make every gossip thread use its own randomness instance, reducing mutex contention

## v0.37.4-v25-osmo-4

* [#69](https://github.com/osmosis-labs/cometbft/pull/69) perf: Make mempool update async from block.Commit (#3008)
* [#67](https://github.com/osmosis-labs/cometbft/pull/67) fix: TimeoutTicker returns wrong value/timeout pair when timeouts are…

## v0.37.4-v25-osmo-3

* [#61](https://github.com/osmosis-labs/cometbft/pull/61) refactor(p2p/connection): Slight refactor to sendManyPackets that helps highlight performance improvements (backport #2953) (#2978)
* [#62](https://github.com/osmosis-labs/cometbft/pull/62) perf(consensus/blockstore): Remove validate basic call from LoadBlock
* [#71](https://github.com/osmosis-labs/cometbft/pull/71) perf(consensus): Make mempool update async from Commit
* [#59](https://github.com/osmosis-labs/cometbft/pull/59) `[blockstore]` Remove a redundant `Header.ValidateBasic` call in `LoadBlockMeta`, 75% reducing this time. ([\#2964](https://github.com/cometbft/cometbft/pull/2964))
* [#59](https://github.com/osmosis-labs/cometbft/pull/59) `[p2p/conn]` Speedup connection.WritePacketMsgTo, by reusing internal buffers rather than re-allocating. ([\#2986](https://github.com/cometbft/cometbft/pull/2986))

Expand Down
45 changes: 25 additions & 20 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"errors"
"fmt"
"math/rand"
"reflect"
"sync"
"time"
Expand All @@ -12,6 +13,7 @@ import (
cmtevents "github.com/cometbft/cometbft/libs/events"
cmtjson "github.com/cometbft/cometbft/libs/json"
"github.com/cometbft/cometbft/libs/log"
cmtrand "github.com/cometbft/cometbft/libs/rand"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus"
Expand Down Expand Up @@ -541,6 +543,7 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState {

func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
rng := cmtrand.NewStdlibRand()

OUTER_LOOP:
for {
Expand All @@ -553,7 +556,7 @@ OUTER_LOOP:

// Send proposal Block parts?
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(rng); ok {
part := rs.ProposalBlockParts.GetPart(index)
parts, err := part.ToProto()
if err != nil {
Expand Down Expand Up @@ -592,7 +595,7 @@ OUTER_LOOP:
// continue the loop since prs is a copy and not effected by this initialization
continue OUTER_LOOP
}
conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer, rng)
continue OUTER_LOOP
}

Expand Down Expand Up @@ -647,9 +650,9 @@ OUTER_LOOP:
}

func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState,
prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer) {
prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer, rng *rand.Rand) {

if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
if index, ok := prs.ProposalBlockParts.Not().PickRandom(rng); ok {
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
Expand Down Expand Up @@ -700,6 +703,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt

func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
rng := cmtrand.NewStdlibRand()

// Simple hack to throttle logs upon sleep.
var sleeping = 0
Expand All @@ -726,15 +730,15 @@ OUTER_LOOP:
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
heightLogger := logger.With("height", prs.Height)
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps, rng) {
continue OUTER_LOOP
}
}

// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ps.PickSendVote(rs.LastCommit) {
if ps.PickSendVote(rng, rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
}
Expand All @@ -747,7 +751,7 @@ OUTER_LOOP:
// Load the block commit for prs.Height,
// which contains precommit signatures for prs.Height.
if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if ps.PickSendVote(commit) {
if ps.PickSendVote(rng, commit) {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
continue OUTER_LOOP
}
Expand All @@ -757,9 +761,9 @@ OUTER_LOOP:
if sleeping == 0 {
// We sent nothing. Sleep...
sleeping = 1
logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
"localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
"localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
// logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
// "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
// "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
} else if sleeping == 2 {
// Continued sleep...
sleeping = 1
Expand All @@ -775,19 +779,20 @@ func (conR *Reactor) gossipVotesForHeight(
rs *cstypes.RoundState,
prs *cstypes.PeerRoundState,
ps *PeerState,
rng *rand.Rand,
) bool {

// If there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
if ps.PickSendVote(rs.LastCommit) {
if ps.PickSendVote(rng, rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send")
return true
}
}
// If there are POL prevotes to send...
if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
if ps.PickSendVote(rng, polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
Expand All @@ -796,29 +801,29 @@ func (conR *Reactor) gossipVotesForHeight(
}
// If there are prevotes to send...
if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are precommits to send...
if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Precommits(prs.Round)) {
logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are prevotes to send...Needed because of validBlock mechanism
if prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
if ps.PickSendVote(rng, polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
Expand Down Expand Up @@ -1132,8 +1137,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in

// PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
func (ps *PeerState) PickSendVote(rng *rand.Rand, votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(rng, votes); ok {
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.SendEnvelope(p2p.Envelope{
ChannelID: VoteChannel,
Expand All @@ -1152,7 +1157,7 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
// PickVoteToSend picks a vote to send to the peer.
// Returns true if a vote was picked.
// NOTE: `votes` must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
func (ps *PeerState) PickVoteToSend(rng *rand.Rand, votes types.VoteSetReader) (vote *types.Vote, ok bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

Expand All @@ -1173,7 +1178,7 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote
if psVotes == nil {
return nil, false // Not something worth sending
}
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(rng); ok {
return votes.GetByIndex(int32(index)), true
}
return nil, false
Expand Down
8 changes: 4 additions & 4 deletions libs/bits/bit_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"encoding/binary"
"fmt"
"math/bits"
"math/rand"
"regexp"
"strings"
"sync"

cmtmath "github.com/cometbft/cometbft/libs/math"
cmtrand "github.com/cometbft/cometbft/libs/rand"
cmtprotobits "github.com/cometbft/cometbft/proto/tendermint/libs/bits"
)

Expand Down Expand Up @@ -261,8 +261,8 @@ func (bA *BitArray) IsFull() bool {

// PickRandom returns a random index for a set bit in the bit array.
// If there is no such value, it returns 0, false.
// It uses the global randomness in `random.go` to get this index.
func (bA *BitArray) PickRandom() (int, bool) {
// It uses the provided randomness to get this index.
func (bA *BitArray) PickRandom(r *rand.Rand) (int, bool) {
if bA == nil {
return 0, false
}
Expand All @@ -273,7 +273,7 @@ func (bA *BitArray) PickRandom() (int, bool) {
bA.mtx.Unlock()
return 0, false
}
index := bA.getNthTrueIndex(cmtrand.Intn(numTrueIndices))
index := bA.getNthTrueIndex(r.Intn(numTrueIndices))
bA.mtx.Unlock()
if index == -1 {
return 0, false
Expand Down
7 changes: 5 additions & 2 deletions libs/bits/bit_array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -17,6 +19,7 @@ var (
empty64Bits = empty16Bits + empty16Bits + empty16Bits + empty16Bits
full16bits = "xxxxxxxxxxxxxxxx"
full64bits = full16bits + full16bits + full16bits + full16bits
grand = rand.New(rand.NewSource(time.Now().UnixNano()))
)

func randBitArray(bits int) *BitArray {
Expand Down Expand Up @@ -139,7 +142,7 @@ func TestPickRandom(t *testing.T) {
var bitArr *BitArray
err := json.Unmarshal([]byte(tc.bA), &bitArr)
require.NoError(t, err)
_, ok := bitArr.PickRandom()
_, ok := bitArr.PickRandom(grand)
require.Equal(t, tc.ok, ok, "PickRandom got an unexpected result on input %s", tc.bA)
}
}
Expand Down Expand Up @@ -395,6 +398,6 @@ func BenchmarkPickRandomBitArray(b *testing.B) {
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = bitArr.PickRandom()
_, _ = bitArr.PickRandom(grand)
}
}
20 changes: 18 additions & 2 deletions libs/rand/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,28 @@ func NewRand() *Rand {
return rand
}

func (r *Rand) init() {
// Make a new stdlib rand source. Its up to the caller to ensure
// that the rand source is not called in parallel.
// The failure mode of calling the returned rand multiple times in parallel is
// repeated values across threads.
func NewStdlibRand() *mrand.Rand {
// G404: Use of weak random number generator (math/rand instead of crypto/rand)
//nolint:gosec
return mrand.New(mrand.NewSource(newSeed()))
}

func newSeed() int64 {
bz := cRandBytes(8)
var seed uint64
for i := 0; i < 8; i++ {
seed |= uint64(bz[i])
seed <<= 8
}
r.reset(int64(seed))
return int64(seed)
}

func (r *Rand) init() {
r.reset(newSeed())
}

func (r *Rand) reset(seed int64) {
Expand Down Expand Up @@ -302,6 +316,8 @@ func (r *Rand) Perm(n int) []int {
// NOTE: This relies on the os's random number generator.
// For real security, we should salt that with some seed.
// See github.com/cometbft/cometbft/crypto for a more secure reader.
// This function is thread safe, see:
// https://stackoverflow.com/questions/75685374/is-golang-crypto-rand-thread-safe
func cRandBytes(numBytes int) []byte {
b := make([]byte, numBytes)
_, err := crand.Read(b)
Expand Down
18 changes: 18 additions & 0 deletions libs/rand/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ func TestRngConcurrencySafety(t *testing.T) {
wg.Wait()
}

// Makes a new stdlib random instance 100 times concurrently.
// Ensures that it is concurrent safe to create rand instances, and call independent rand
// sources in parallel.
func TestStdlibRngConcurrencySafety(_ *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r := NewStdlibRand()
_ = r.Uint64()
<-time.After(time.Millisecond * time.Duration(Intn(100)))
_ = r.Perm(3)
}()
}
wg.Wait()
}

func BenchmarkRandBytes10B(b *testing.B) {
benchmarkRandBytes(b, 10)
}
Expand Down
Loading