Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Optimization and Parallelization of processes and reduction of Goroutines #11058

Merged
merged 18 commits into from
Jul 16, 2024
Prev Previous commit
Next Next commit
save
  • Loading branch information
Giulio2002 committed Jul 6, 2024
commit 3039142c5e67d3986211e5db1425ee51f94ff24e
2 changes: 1 addition & 1 deletion cl/cltypes/solid/uint64slice_byte.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/ledgerwatch/erigon/cl/utils"
)

const treeCacheDepthUint64Slice = 3
const treeCacheDepthUint64Slice = 4

func convertDepthToChunkSize(d int) int {
return (1 << d) // just power of 2
Expand Down
2 changes: 1 addition & 1 deletion cl/cltypes/solid/validator_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (

const (
validatorSetCapacityMultiplier = 1.01 // allocate 20% to the validator set when re-allocation is needed.
validatorTreeCacheGroupLayer = 3 // It will cache group validatorTreeCacheGroupLayer^2 accordingly
validatorTreeCacheGroupLayer = 4 // It will cache group validatorTreeCacheGroupLayer^2 accordingly
)

// This is all stuff used by phase0 state transition. It makes many operations faster.
Expand Down
124 changes: 59 additions & 65 deletions cl/phase1/core/state/raw/hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,64 @@
package raw

import (
"fmt"
"sync"
"time"

"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/log/v3"
"github.com/ledgerwatch/erigon-lib/types/ssz"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/merkle_tree"
)

type parallelBeaconStateHasher struct {
jobs map[StateLeafIndex]ssz.HashableSSZ
results sync.Map
}

func (p *parallelBeaconStateHasher) run(b *BeaconState) {
wg := sync.WaitGroup{}
if p.jobs == nil {
p.jobs = make(map[StateLeafIndex]ssz.HashableSSZ)
}

for idx, job := range p.jobs {
wg.Add(1)
go func(idx StateLeafIndex, job ssz.HashableSSZ) {
defer wg.Done()
root, err := job.HashSSZ()
if err != nil {
panic(err)
}
p.results.Store(idx, root)
}(idx, job)
}
wg.Wait()
p.results.Range(func(key, value any) bool {
idx := key.(StateLeafIndex)
root := value.([32]byte)
b.updateLeaf(idx, root)
return true
})
}

func (b *parallelBeaconStateHasher) add(idx StateLeafIndex, job ssz.HashableSSZ) {
if b.jobs == nil {
b.jobs = make(map[StateLeafIndex]ssz.HashableSSZ)
}
b.jobs[idx] = job
}

func (b *BeaconState) HashSSZ() (out [32]byte, err error) {
b.mu.Lock()
defer b.mu.Unlock()

start := time.Now()
if err = b.computeDirtyLeaves(); err != nil {
return [32]byte{}, err
}
fmt.Println(time.Since(start))

// for i := 0; i < len(b.leaves); i += 32 {
// fmt.Println(i/32, libcommon.BytesToHash(b.leaves[i:i+32]))
Expand Down Expand Up @@ -89,6 +132,8 @@ func preparateRootsForHashing(roots []common.Hash) [][32]byte {
}

func (b *BeaconState) computeDirtyLeaves() error {

parallelHasher := parallelBeaconStateHasher{}
// Update all dirty leafs
// ----
// Field(0): GenesisTime
Expand Down Expand Up @@ -142,8 +187,6 @@ func (b *BeaconState) computeDirtyLeaves() error {
b.updateLeaf(StateRootsLeafIndex, root)
}

begin := time.Now()

// Field(7): HistoricalRoots
if b.isLeafDirty(HistoricalRootsLeafIndex) {
root, err := b.historicalRoots.HashSSZ()
Expand All @@ -152,7 +195,6 @@ func (b *BeaconState) computeDirtyLeaves() error {
}
b.updateLeaf(HistoricalRootsLeafIndex, root)
}
log.Trace("HistoricalRoots hashing", "elapsed", time.Since(begin))

// Field(8): Eth1Data
if b.isLeafDirty(Eth1DataLeafIndex) {
Expand All @@ -177,88 +219,45 @@ func (b *BeaconState) computeDirtyLeaves() error {
b.updateLeaf(Eth1DepositIndexLeafIndex, merkle_tree.Uint64Root(b.eth1DepositIndex))
}

begin = time.Now()

// Field(11): Validators
if b.isLeafDirty(ValidatorsLeafIndex) {
root, err := b.validators.HashSSZ()
if err != nil {
return err
}
b.updateLeaf(ValidatorsLeafIndex, root)

parallelHasher.add(ValidatorsLeafIndex, b.validators)
}
log.Trace("ValidatorSet hashing", "elapsed", time.Since(begin))

begin = time.Now()
// Field(12): Balances
if b.isLeafDirty(BalancesLeafIndex) {
root, err := b.balances.HashSSZ()
if err != nil {
return err
}
b.updateLeaf(BalancesLeafIndex, root)
parallelHasher.add(BalancesLeafIndex, b.balances)
}
log.Trace("Balances hashing", "elapsed", time.Since(begin))

begin = time.Now()
// Field(13): RandaoMixes

if b.isLeafDirty(RandaoMixesLeafIndex) {
root, err := b.randaoMixes.HashSSZ()
if err != nil {
return err
}
b.updateLeaf(RandaoMixesLeafIndex, root)
parallelHasher.add(RandaoMixesLeafIndex, b.randaoMixes)
}
log.Trace("RandaoMixes hashing", "elapsed", time.Since(begin))

begin = time.Now()
// Field(14): Slashings
if b.isLeafDirty(SlashingsLeafIndex) {
root, err := b.slashings.HashSSZ()
if err != nil {
return err
}
b.updateLeaf(SlashingsLeafIndex, root)
parallelHasher.add(SlashingsLeafIndex, b.slashings)
}
log.Trace("Slashings hashing", "elapsed", time.Since(begin))
// Field(15) and Field(16) are special due to the fact that they have different format in Phase0.

begin = time.Now()
// Field(15): PreviousEpochParticipation
if b.isLeafDirty(PreviousEpochParticipationLeafIndex) {
var root libcommon.Hash
var err error
if b.version == clparams.Phase0Version {
root, err = b.previousEpochAttestations.HashSSZ()
parallelHasher.add(PreviousEpochParticipationLeafIndex, b.previousEpochAttestations)
} else {
root, err = b.previousEpochParticipation.HashSSZ()
}
if err != nil {
return err
parallelHasher.add(PreviousEpochParticipationLeafIndex, b.previousEpochParticipation)
}

b.updateLeaf(PreviousEpochParticipationLeafIndex, root)
}
log.Trace("PreviousEpochParticipation hashing", "elapsed", time.Since(begin))

begin = time.Now()

// Field(16): CurrentEpochParticipation
if b.isLeafDirty(CurrentEpochParticipationLeafIndex) {
var root libcommon.Hash
var err error
if b.version == clparams.Phase0Version {
root, err = b.currentEpochAttestations.HashSSZ()
parallelHasher.add(CurrentEpochParticipationLeafIndex, b.currentEpochAttestations)
} else {
root, err = b.currentEpochParticipation.HashSSZ()
}
if err != nil {
return err
parallelHasher.add(CurrentEpochParticipationLeafIndex, b.currentEpochParticipation)
}
b.updateLeaf(CurrentEpochParticipationLeafIndex, root)
}
log.Trace("CurrentEpochParticipation hashing", "elapsed", time.Since(begin))

// Field(17): JustificationBits
if b.isLeafDirty(JustificationBitsLeafIndex) {
Expand Down Expand Up @@ -293,18 +292,14 @@ func (b *BeaconState) computeDirtyLeaves() error {
b.updateLeaf(FinalizedCheckpointLeafIndex, checkpointRoot)
}
if b.version == clparams.Phase0Version {
parallelHasher.run(b)
return nil
}
begin = time.Now()

// Field(21): Inactivity Scores
if b.isLeafDirty(InactivityScoresLeafIndex) {
root, err := b.inactivityScores.HashSSZ()
if err != nil {
return err
}
b.updateLeaf(InactivityScoresLeafIndex, root)
parallelHasher.add(InactivityScoresLeafIndex, b.inactivityScores)
}
log.Trace("InactivityScores hashing", "elapsed", time.Since(begin))

// Field(22): CurrentSyncCommitte
if b.isLeafDirty(CurrentSyncCommitteeLeafIndex) {
Expand All @@ -324,6 +319,7 @@ func (b *BeaconState) computeDirtyLeaves() error {
b.updateLeaf(NextSyncCommitteeLeafIndex, committeeRoot)
}

parallelHasher.run(b)
if b.version < clparams.BellatrixVersion {
return nil
}
Expand All @@ -350,7 +346,6 @@ func (b *BeaconState) computeDirtyLeaves() error {
b.updateLeaf(NextWithdrawalValidatorIndexLeafIndex, merkle_tree.Uint64Root(b.nextWithdrawalValidatorIndex))
}

begin = time.Now()
// Field(27): HistoricalSummaries
if b.isLeafDirty(HistoricalSummariesLeafIndex) {
root, err := b.historicalSummaries.HashSSZ()
Expand All @@ -359,7 +354,6 @@ func (b *BeaconState) computeDirtyLeaves() error {
}
b.updateLeaf(HistoricalSummariesLeafIndex, root)
}
log.Trace("HistoricalSummaries hashing", "elapsed", time.Since(begin))

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ type GossipSubscription struct {
func (sub *GossipSubscription) Listen() {
go func() {
var err error
checkingInterval := time.NewTicker(100 * time.Millisecond)
checkingInterval := time.NewTicker(5 * time.Millisecond)
for {
select {
case <-sub.ctx.Done():
Expand Down
Loading