Skip to content

Tried Implementation of Thread Pool #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: concurrent-execution
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions core/bench_concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package core

import (
cryptorand "crypto/rand"
"errors"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
Expand Down Expand Up @@ -209,9 +211,8 @@ func generateRandomExecution(b *testing.B, numContracts int, numBlocks int, numT
return blocks, index
}

func benchmarkRandomBlockExecution(b *testing.B, numBlocks int, numTxs int, numContracts int, numKeys int, requireAccessList bool, memdb bool) {
func benchmarkRandomBlockExecution(b *testing.B, blocks []*types.Block, startIndex int, numBlocks int, numTxs int, numContracts int, numKeys int, requireAccessList bool, memdb bool) {
// Generate the slice of blocks whose execution we wish to benchmark.
blocks, startIndex := generateRandomExecution(b, numContracts, numBlocks, numTxs, numKeys)

for i := 0; i < b.N; i++ {
var diskdb ethdb.Database
Expand Down Expand Up @@ -369,9 +370,48 @@ func BenchmarkSimpleBlockTransactionParallelExecution(b *testing.B) {
}

func BenchmarkRandomBlockTransactionExecution(b *testing.B) {
benchmarkRandomBlockExecution(b, 50, 50, 100, 100, false, true)
blocks, startIndex := generateRandomExecution(b, 100, 50, 50, 100)
benchmarkRandomBlockExecution(b, blocks, startIndex, 50, 50, 100, 100, false, true)
}

func BenchmarkRandomBlockTransactionParallelExecution(b *testing.B) {
benchmarkRandomBlockExecution(b, 50, 50, 100, 100, true, true)
blocks, startIndex := generateRandomExecution(b, 100, 50, 50, 100)
benchmarkRandomBlockExecution(b, blocks, startIndex, 50, 50, 100, 100, true, true)
}

func GenerateFile(b *testing.B, filepath string, numBlocks int, numTxs int, numContracts int, numKeys int) {
blocks, _ := generateRandomExecution(b, numContracts, numBlocks, numTxs, numKeys)
file, err := os.Create(filepath)
if err != nil {
b.Error("error in creating file")
}
types.WriteBlocks(file, blocks)
}

func ReadFile(b *testing.B, numBlocks int, numTxs int, numContracts int, numKeys int) []*types.Block {
gopath := os.Getenv("GOPATH")
filepath := fmt.Sprintf(gopath+"/src/github.com/ethereum/go-ethereum/core/testing_folder/testing_file_numBlocks_%d.bin", numBlocks)
if _, err := os.Stat(filepath); errors.Is(err, os.ErrNotExist) {
GenerateFile(b, filepath, numBlocks, numTxs, numContracts, numKeys)
}
buf, err := ioutil.ReadFile(filepath)
if err != nil {
b.Error("error in reading file")
}
blocks, err := types.ReadBlocks(buf)
if err != nil {
b.Error("error in generating blocks")
}

return blocks
}

func BenchmarkParallelExecutionFromFile(b *testing.B) {
blocks := ReadFile(b, 500, 100, 100, 100)
benchmarkRandomBlockExecution(b, blocks, len(blocks), 1000, 100, 100, 100, true, true)
}

func BenchmarkSimpleExecutionFromFile(b *testing.B) {
blocks := ReadFile(b, 500, 100, 100, 100)
benchmarkRandomBlockExecution(b, blocks, len(blocks), 1000, 100, 100, 100, true, true)
}
3 changes: 3 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,9 @@ func (bc *BlockChain) Stop() {
triedb := bc.stateCache.TrieDB()
triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
}
if sp, ok := bc.processor.(*StateProcessor); ok {
sp.Close()
}
log.Info("Blockchain stopped")
}

Expand Down
71 changes: 17 additions & 54 deletions core/parallel/bounded_errgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package parallel

import (
"context"
"sync"
)

Expand All @@ -16,47 +15,29 @@ import (
//
// A zero Group is valid and does not cancel on error.
type BoundedGroup struct {
cancel func()

workerWG, taskWG sync.WaitGroup
workers chan struct{}
tasks chan func() error // A fixed size channel of
closer chan struct{}

errOnce sync.Once
err error
}

func NewBoundedErrGroupWithContext(parentCtx context.Context, numWorkers int, maxPendingTasks int) (*BoundedGroup, context.Context) {
bg := NewBoundedErrGroup(numWorkers, maxPendingTasks)
ctx, cancel := context.WithCancel(parentCtx)
bg.cancel = cancel
return bg, ctx
workerWG sync.WaitGroup
workers chan struct{}
tasks chan func() // A fixed size channel of
closer chan struct{}
}

func NewBoundedErrGroup(numWorkers int, maxPendingTasks int) *BoundedGroup {
return &BoundedGroup{
func NewBoundedErrGroup(numWorkers int) *BoundedGroup {
res := &BoundedGroup{
workers: make(chan struct{}, numWorkers),
tasks: make(chan func() error, maxPendingTasks),
tasks: make(chan func(), 10000),
closer: make(chan struct{}),
}
}

func (g *BoundedGroup) Go(f func() error) {
// Start an additional worker, if we have not yet reached the worker thread cap.
select {
case g.workers <- struct{}{}:
g.workerWG.Add(1)
go g.startWorker()
default:
//start the numWorker worker threads
for i := 0; i < numWorkers; i++ {
res.workerWG.Add(1)
go res.startWorker()
}
return res
}

func (g *BoundedGroup) Go(f func()) {
// Add [f] to the task queue
g.taskWG.Add(1)
g.tasks <- func() error {
defer g.taskWG.Done()
return f()
}
g.tasks <- f
}

func (g *BoundedGroup) startWorker() {
Expand All @@ -68,31 +49,13 @@ func (g *BoundedGroup) startWorker() {
case <-g.closer:
return
case f := <-g.tasks:
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
f()
}
}
}

func (g *BoundedGroup) Wait() error {
// Wait for all tasks to finish
g.taskWG.Wait()

func (g *BoundedGroup) Close() {
// Shut down the worker threads
close(g.closer)
g.workerWG.Wait()

// Call [cancel] if supplied.
if g.cancel != nil {
g.cancel()
}

// Return the appropriate error
return g.err
}
37 changes: 22 additions & 15 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/core/wrappers"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
Expand All @@ -40,17 +41,24 @@ import (
//
// StateProcessor implements Processor.
type StateProcessor struct {
config *params.ChainConfig // Chain configuration options
bc *BlockChain // Canonical block chain
engine consensus.Engine // Consensus engine used for block rewards
config *params.ChainConfig // Chain configuration options
bc *BlockChain // Canonical block chain
engine consensus.Engine // Consensus engine used for block rewards
threadpool *parallel.BoundedGroup //threadpool
}

func (p *StateProcessor) Close() {
p.threadpool.Close()
}

// NewStateProcessor initialises a new StateProcessor.
func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor {
bg := parallel.NewBoundedErrGroup(runtime.NumCPU() * 2)
return &StateProcessor{
config: config,
bc: bc,
engine: engine,
config: config,
bc: bc,
engine: engine,
threadpool: bg,
}
}

Expand Down Expand Up @@ -129,14 +137,17 @@ func (p *StateProcessor) processParallel(block *types.Block, statedb *state.Stat

txLocker := parallel.NewAccessListLocker(block.Transactions())

eg := parallel.NewBoundedErrGroup(runtime.NumCPU()*2, len(block.Transactions()))
var wg sync.WaitGroup
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
// Create closure with i and tx, so that the loop does not overwrite the memory used in
// the goroutine.
var errs wrappers.Errs
i := i
tx := tx
eg.Go(func() error {
wg.Add(1)
p.threadpool.Go(func() {
defer wg.Done()
log.Info(fmt.Sprintf("starting goroutine for tx (%s, %d)", tx.Hash(), i))
// Grab the locks for every item in the access list. This will block until the transaction
// can acquire all the necessary locks.
Expand All @@ -147,25 +158,21 @@ func (p *StateProcessor) processParallel(block *types.Block, statedb *state.Stat
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, txDB, p.config, cfg)
msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee)
if err != nil {
return fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
errs.Add(fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err))
}
receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, txDB, blockNumber, blockHash, tx, usedGas, vmenv)
if err != nil {
return fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
errs.Add(fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err))
}
log.Info(fmt.Sprintf("releasing locks for tx (%s, %d)", tx.Hash(), i))
txLocker.Unlock(tx)
log.Info(fmt.Sprintf("released locks for tx (%s, %d)", tx.Hash(), i))
// Set the receipt and logs at the correct index
receipts[i] = receipt
txLogs[i] = receipt.Logs
return nil
})
}

if err := eg.Wait(); err != nil {
return nil, nil, 0, err
}
wg.Wait()
// Coalesce the logs
for _, logs := range txLogs {
allLogs = append(allLogs, logs...)
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
32 changes: 32 additions & 0 deletions core/wrappers/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package wrappers

import (
"sync"
)

type Errs struct {
Err error
rw *sync.RWMutex
}

func (errs *Errs) Errored() bool {
errs.rw.RLock()
defer errs.rw.RUnlock()
return errs.Err != nil
}

func (errs *Errs) Add(errors ...error) {
errs.rw.RLock()
defer errs.rw.RUnlock()
if errs.Err == nil {
for _, err := range errors {
if err != nil {
errs.rw.RUnlock()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will cause a panic if an error is ever added since we have called RUnlock here without re-acquiring the read lock before the deferred RUnlock will execute.

errs.rw.Lock()
errs.Err = err
errs.rw.Unlock()
break
}
}
}
}