Skip to content

Commit

Permalink
move handler goroutine logic from subscriber to blockchain (#1830)
Browse files Browse the repository at this point in the history
* move handler goroutine logic from subscriber to blockchain

* refactor to create pubSubManager with util functions
  • Loading branch information
koseoyoung authored Jan 21, 2020
1 parent c9c9cdf commit 18bbbb1
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 108 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func NewServer(
broadcastHandler: apiCfg.broadcastHandler,
cfg: cfg,
registry: registry,
chainListener: NewChainListener(cfg.BlockSync.BufferSize),
chainListener: NewChainListener(),
gs: gasstation.NewGasStation(chain, sf.SimulateExecution, dao, cfg.API),
electionCommittee: apiCfg.electionCommittee,
}
Expand Down
64 changes: 24 additions & 40 deletions api/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,64 +26,48 @@ type (

// chainListener implements the Listener interface
chainListener struct {
pendingBlks chan *block.Block
cancelChan chan struct{}
streamMap sync.Map // all registered <Responder, chan error>
streamMap sync.Map // all registered <Responder, chan error>
}
)

// NewChainListener returns a new blockchain chainListener
func NewChainListener(bufferSize uint64) Listener {
return &chainListener{
pendingBlks: make(chan *block.Block, bufferSize),
cancelChan: make(chan struct{}),
}
func NewChainListener() Listener {
return &chainListener{}
}

// Start starts the chainListener
func (cl *chainListener) Start() error {
go func() {
for {
select {
case <-cl.cancelChan:
// notify all responders to exit
cl.streamMap.Range(func(key, _ interface{}) bool {
r, ok := key.(Responder)
if !ok {
log.S().Panic("streamMap stores a key which is not a Responder")
}
r.Exit()
cl.streamMap.Delete(key)
return true
})
return
case blk := <-cl.pendingBlks:
// pass the block to every responder
cl.streamMap.Range(func(key, _ interface{}) bool {
r, ok := key.(Responder)
if !ok {
log.S().Panic("streamMap stores a key which is not a Responder")
}
if err := r.Respond(blk); err != nil {
cl.streamMap.Delete(key)
}
return true
})
}
}
}()
return nil
}

// Stop stops the block chainListener
func (cl *chainListener) Stop() error {
close(cl.cancelChan)
// notify all responders to exit
cl.streamMap.Range(func(key, _ interface{}) bool {
r, ok := key.(Responder)
if !ok {
log.S().Panic("streamMap stores a key which is not a Responder")
}
r.Exit()
cl.streamMap.Delete(key)
return true
})
return nil
}

// ReceiveBlock handles the block
func (cl *chainListener) ReceiveBlock(blk *block.Block) error {
cl.pendingBlks <- blk
// pass the block to every responder
cl.streamMap.Range(func(key, _ interface{}) bool {
r, ok := key.(Responder)
if !ok {
log.S().Panic("streamMap stores a key which is not a Responder")
}
if err := r.Respond(blk); err != nil {
cl.streamMap.Delete(key)
}
return true
})
return nil
}

Expand Down
33 changes: 11 additions & 22 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type blockchain struct {
minter Minter
lifecycle lifecycle.Lifecycle
clk clock.Clock
blocklistener []BlockCreationSubscriber
pubSubManager PubSubManager
timerFactory *prometheustimer.TimerFactory

// used by account-based model
Expand Down Expand Up @@ -217,10 +217,11 @@ func RegistryOption(registry *protocol.Registry) Option {
func NewBlockchain(cfg config.Config, dao blockdao.BlockDAO, sf factory.Factory, opts ...Option) Blockchain {
// create the Blockchain
chain := &blockchain{
config: cfg,
dao: dao,
sf: sf,
clk: clock.New(),
config: cfg,
dao: dao,
sf: sf,
clk: clock.New(),
pubSubManager: NewPubSub(cfg.BlockSync.BufferSize),
}
for _, opt := range opts {
if err := opt(chain, cfg); err != nil {
Expand Down Expand Up @@ -307,7 +308,6 @@ func (bc *blockchain) Start(ctx context.Context) error {
func (bc *blockchain) Stop(ctx context.Context) error {
bc.mu.Lock()
defer bc.mu.Unlock()

return bc.lifecycle.OnStop(ctx)
}

Expand Down Expand Up @@ -455,22 +455,15 @@ func (bc *blockchain) AddSubscriber(s BlockCreationSubscriber) error {
if s == nil {
return errors.New("subscriber could not be nil")
}
bc.blocklistener = append(bc.blocklistener, s)

return nil
return bc.pubSubManager.AddBlockListener(s)
}

func (bc *blockchain) RemoveSubscriber(s BlockCreationSubscriber) error {
bc.mu.Lock()
defer bc.mu.Unlock()
for i, sub := range bc.blocklistener {
if sub == s {
bc.blocklistener = append(bc.blocklistener[:i], bc.blocklistener[i+1:]...)
log.L().Info("Successfully unsubscribe block creation.")
return nil
}
}
return errors.New("cannot find subscription")

return bc.pubSubManager.RemoveBlockListener(s)
}

//======================================
Expand Down Expand Up @@ -612,12 +605,8 @@ func (bc *blockchain) commitBlock(blk *block.Block) error {
}

func (bc *blockchain) emitToSubscribers(blk *block.Block) {
if bc.blocklistener == nil {
if bc.pubSubManager == nil {
return
}
for _, s := range bc.blocklistener {
if err := s.ReceiveBlock(blk); err != nil {
log.L().Error("Failed to handle new block.", zap.Error(err))
}
}
bc.pubSubManager.SendBlockToSubscribers(blk)
}
87 changes: 86 additions & 1 deletion blockchain/blockcreationsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,94 @@

package blockchain

import "github.com/iotexproject/iotex-core/blockchain/block"
import (
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/pkg/log"
)

// BlockCreationSubscriber is an interface which will get notified when a block is created
type BlockCreationSubscriber interface {
ReceiveBlock(*block.Block) error
}

// PubSubManager is an interface which handles multi-thread publisher and subscribers
type PubSubManager interface {
AddBlockListener(BlockCreationSubscriber) error
RemoveBlockListener(BlockCreationSubscriber) error
SendBlockToSubscribers(*block.Block)
}

// pubSubElem includes Subscriber, buffered channel for storing the pending blocks and cancel channel to end the handler thread
type pubSubElem struct {
listener BlockCreationSubscriber
pendingBlksBuffer chan *block.Block
cancel chan interface{}
}

// pubSub defines array of blockListener to handle multi-thread publish/subscribe
type pubSub struct {
blocklisteners []*pubSubElem
pendingBlkBufferSize uint64
}

// NewPubSub creates new pubSub struct with buffersize for pendingBlock buffer channel
func NewPubSub(bufferSize uint64) PubSubManager {
return &pubSub{
blocklisteners: make([]*pubSubElem, 0),
pendingBlkBufferSize: bufferSize,
}
}

// AddBlockListener creates new pubSubElem subscriber and append it to blocklisteners
func (ps *pubSub) AddBlockListener(s BlockCreationSubscriber) error {
pendingBlksChan := make(chan *block.Block, ps.pendingBlkBufferSize)
cancelChan := make(chan interface{})
// create subscriber handler thread to handle pending blocks
go ps.handler(cancelChan, pendingBlksChan, s)

pubSubElem := &pubSubElem{
listener: s,
pendingBlksBuffer: pendingBlksChan,
cancel: cancelChan,
}
ps.blocklisteners = append(ps.blocklisteners, pubSubElem)

return nil
}

// RemoveBlockListener looks up blocklisteners and if exists, close the cancel channel and pop out the element
func (ps *pubSub) RemoveBlockListener(s BlockCreationSubscriber) error {
for i, elem := range ps.blocklisteners {
if elem.listener == s {
close(elem.cancel)
ps.blocklisteners = append(ps.blocklisteners[:i], ps.blocklisteners[i+1:]...)
log.L().Info("Successfully unsubscribe block creation.")
return nil
}
}
return errors.New("cannot find subscription")
}

// SendBlockToSubscribers sends block to every subscriber by using buffer channel
func (ps *pubSub) SendBlockToSubscribers(blk *block.Block) {
for _, elem := range ps.blocklisteners {
elem.pendingBlksBuffer <- blk
}
return
}

func (ps *pubSub) handler(cancelChan <-chan interface{}, pendingBlks <-chan *block.Block, s BlockCreationSubscriber) {
for {
select {
case <-cancelChan:
return
case blk := <-pendingBlks:
if err := s.ReceiveBlock(blk); err != nil {
log.L().Error("Failed to handle new block.", zap.Error(err))
}
}
}
}
59 changes: 22 additions & 37 deletions blockchain/blockdao/indexbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@ type addrIndex map[hash.Hash160]db.CountingIndex

// IndexBuilder defines the index builder
type IndexBuilder struct {
pendingBlks chan *block.Block
cancelChan chan interface{}
timerFactory *prometheustimer.TimerFactory
dao BlockDAO
indexer blockindex.Indexer
}

// NewIndexBuilder instantiates an index builder
func NewIndexBuilder(chainID uint32, dao BlockDAO, indexer blockindex.Indexer, bufferSize uint64) (*IndexBuilder, error) {
func NewIndexBuilder(chainID uint32, dao BlockDAO, indexer blockindex.Indexer) (*IndexBuilder, error) {
timerFactory, err := prometheustimer.New(
"iotex_indexer_batch_time",
"Indexer batch time",
Expand All @@ -57,8 +55,6 @@ func NewIndexBuilder(chainID uint32, dao BlockDAO, indexer blockindex.Indexer, b
return nil, err
}
return &IndexBuilder{
pendingBlks: make(chan *block.Block, bufferSize),
cancelChan: make(chan interface{}),
timerFactory: timerFactory,
dao: dao,
indexer: indexer,
Expand All @@ -74,13 +70,11 @@ func (ib *IndexBuilder) Start(ctx context.Context) error {
return err
}
// start handler to index incoming new block
go ib.handler()
return nil
}

// Stop stops the index builder
func (ib *IndexBuilder) Stop(ctx context.Context) error {
close(ib.cancelChan)
return ib.indexer.Stop(ctx)
}

Expand All @@ -91,37 +85,28 @@ func (ib *IndexBuilder) Indexer() blockindex.Indexer {

// ReceiveBlock handles the block and create the indices for the actions and receipts in it
func (ib *IndexBuilder) ReceiveBlock(blk *block.Block) error {
ib.pendingBlks <- blk
return nil
}

func (ib *IndexBuilder) handler() {
for {
select {
case <-ib.cancelChan:
return
case blk := <-ib.pendingBlks:
timer := ib.timerFactory.NewTimer("indexBlock")
if err := ib.indexer.PutBlock(blk); err != nil {
log.L().Error(
"Error when indexing the block",
zap.Uint64("height", blk.Height()),
zap.Error(err),
)
}
if err := ib.indexer.Commit(); err != nil {
log.L().Error(
"Error when committing the block index",
zap.Uint64("height", blk.Height()),
zap.Error(err),
)
}
timer.End()
if blk.Height()%100 == 0 {
log.L().Info("indexing new block", zap.Uint64("height", blk.Height()))
}
}
timer := ib.timerFactory.NewTimer("indexBlock")
if err := ib.indexer.PutBlock(blk); err != nil {
log.L().Error(
"Error when indexing the block",
zap.Uint64("height", blk.Height()),
zap.Error(err),
)
return err
}
if err := ib.indexer.Commit(); err != nil {
log.L().Error(
"Error when committing the block index",
zap.Uint64("height", blk.Height()),
zap.Error(err),
)
return err
}
timer.End()
if blk.Height()%100 == 0 {
log.L().Info("indexing new block", zap.Uint64("height", blk.Height()))
}
return nil
}

func (ib *IndexBuilder) init() error {
Expand Down
8 changes: 2 additions & 6 deletions blockchain/blockdao/indexbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/iotexproject/iotex-core/action"
"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockindex"
"github.com/iotexproject/iotex-core/config"
"github.com/iotexproject/iotex-core/db"
Expand Down Expand Up @@ -71,10 +70,8 @@ func TestIndexer(t *testing.T) {
}()

ib := &IndexBuilder{
pendingBlks: make(chan *block.Block, 1),
cancelChan: make(chan interface{}),
dao: dao,
indexer: indexer,
dao: dao,
indexer: indexer,
}
defer ib.Stop(context.Background())

Expand All @@ -93,7 +90,6 @@ func TestIndexer(t *testing.T) {
height, err := ib.indexer.GetBlockchainHeight()
require.NoError(err)
require.EqualValues(2, height)
go ib.handler()

// test handle 1 new block
require.NoError(dao.PutBlock(blks[2]))
Expand Down
Loading

0 comments on commit 18bbbb1

Please sign in to comment.