Skip to content

Commit

Permalink
Remove requests from BFT memory pool when syncing
Browse files Browse the repository at this point in the history
When a BFT node commits a block, it goes through the transactions in the block and searches whether they exist in the in-memory pool, and if so, it removes them from the pool.

When a follower node syncs blocks from another node, the requests of these blocks still remain in its request pool.
They not only take up memory, but also holds the semaphore resources which throttle needlessly the client.

This commit goes through the requests of a block that is committed through synchronization and removes them as well.
This is needed because the library only sees the last block committed during synchronization and not the entire range.

Signed-off-by: Yacov Manevich <yacov.manevich@ibm.com>
  • Loading branch information
yacovm authored and C0rWin committed May 3, 2023
1 parent 0f2c2af commit c53daa3
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 5 deletions.
44 changes: 39 additions & 5 deletions orderer/consensus/smartbft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/base64"
"fmt"
"reflect"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -197,11 +198,14 @@ func bftSmartConsensusBuild(
sync := &Synchronizer{
selfID: rtc.id,
BlockToDecision: c.blockToDecision,
OnCommit: c.updateRuntimeConfig,
Support: c.support,
BlockPuller: c.BlockPuller,
ClusterSize: clusterSize,
Logger: c.Logger,
OnCommit: func(block *cb.Block) types.Reconfig {
c.pruneCommittedRequests(block)
return c.updateRuntimeConfig(block)
},
Support: c.support,
BlockPuller: c.BlockPuller,
ClusterSize: clusterSize,
Logger: c.Logger,
LatestConfig: func() (types.Configuration, []uint64) {
rtc := c.RuntimeConfig.Load().(RuntimeConfig)
return rtc.BFTConfig, rtc.Nodes
Expand Down Expand Up @@ -272,6 +276,36 @@ func bftSmartConsensusBuild(
return consensus
}

func (c *BFTChain) pruneCommittedRequests(block *cb.Block) {
workerNum := runtime.NumCPU()

var workers []*worker

for i := 0; i < workerNum; i++ {
workers = append(workers, &worker{
id: i,
work: block.Data.Data,
workerNum: workerNum,
f: func(tx []byte) {
ri := c.verifier.ReqInspector.RequestID(tx)
c.consensus.Pool.RemoveRequest(ri)
},
})
}

var wg sync.WaitGroup
wg.Add(len(workers))

for i := 0; i < len(workers); i++ {
go func(w *worker) {
defer wg.Done()
w.doWork()
}(workers[i])
}

wg.Wait()
}

func (c *BFTChain) submit(env *cb.Envelope, configSeq uint64) error {
reqBytes, err := proto.Marshal(env)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions orderer/consensus/smartbft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,33 @@ func (conCert ConsenterCertificate) IsConsenterOfChannel(configBlock *cb.Block)
}
return cluster.ErrNotInChannel
}

type worker struct {
work [][]byte
f func([]byte)
workerNum int
id int
}

func (w *worker) doWork() {
// sanity check
if w.workerNum == 0 {
panic("worker number is not defined")
}

if w.f == nil {
panic("worker function is not defined")
}

if len(w.work) == 0 {
panic("work is not defined")
}

for i, datum := range w.work {
if i%w.workerNum != w.id {
continue
}

w.f(datum)
}
}
57 changes: 57 additions & 0 deletions orderer/consensus/smartbft/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package smartbft

import (
"encoding/binary"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestWorker(t *testing.T) {
work := make([][]byte, 13)

for i := 0; i < 13; i++ {
work[i] = make([]byte, 2)
binary.BigEndian.PutUint16(work[i], uint16(i))
}

workDone := make(map[int]struct{})

var lock sync.Mutex

var workers []worker
for i := 0; i < 7; i++ {
workers = append(workers, worker{
workerNum: 7,
work: work,
id: i,
f: func(data []byte) {
lock.Lock()
defer lock.Unlock()

workDone[int(binary.BigEndian.Uint16(data))] = struct{}{}
},
})
}

var wg sync.WaitGroup
wg.Add(7)

for i := 0; i < len(workers); i++ {
go func(i int, w worker) {
defer wg.Done()
w.doWork()
}(i, workers[i])
}

wg.Wait()

assert.Len(t, workDone, 13)
}

0 comments on commit c53daa3

Please sign in to comment.