Skip to content

Commit

Permalink
put/commit block in order into indexer by using buffer (#1827)
Browse files Browse the repository at this point in the history
* In subscriber, not using goroutine but just in-order handling block 

* In indexbuilder and listener, change to use buffer channel
  • Loading branch information
koseoyoung authored Jan 15, 2020
1 parent b90ebc7 commit 4292c6d
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 71 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func NewServer(
broadcastHandler: apiCfg.broadcastHandler,
cfg: cfg,
registry: registry,
chainListener: NewChainListener(),
chainListener: NewChainListener(cfg.BlockSync.BufferSize),
gs: gasstation.NewGasStation(chain, sf.SimulateExecution, dao, cfg.API),
electionCommittee: apiCfg.electionCommittee,
}
Expand Down
10 changes: 5 additions & 5 deletions api/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type (
Listener interface {
Start() error
Stop() error
HandleBlock(*block.Block) error
ReceiveBlock(*block.Block) error
AddResponder(Responder) error
}

Expand All @@ -33,9 +33,9 @@ type (
)

// NewChainListener returns a new blockchain chainListener
func NewChainListener() Listener {
func NewChainListener(bufferSize uint64) Listener {
return &chainListener{
pendingBlks: make(chan *block.Block, 4),
pendingBlks: make(chan *block.Block, bufferSize),
cancelChan: make(chan struct{}),
}
}
Expand Down Expand Up @@ -81,8 +81,8 @@ func (cl *chainListener) Stop() error {
return nil
}

// HandleBlock handles the block
func (cl *chainListener) HandleBlock(blk *block.Block) error {
// ReceiveBlock handles the block
func (cl *chainListener) ReceiveBlock(blk *block.Block) error {
cl.pendingBlks <- blk
return nil
}
Expand Down
8 changes: 3 additions & 5 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,9 @@ func (bc *blockchain) emitToSubscribers(blk *block.Block) {
return
}
for _, s := range bc.blocklistener {
go func(bcs BlockCreationSubscriber, b *block.Block) {
if err := bcs.HandleBlock(b); err != nil {
log.L().Error("Failed to handle new block.", zap.Error(err))
}
}(s, blk)
if err := s.ReceiveBlock(blk); err != nil {
log.L().Error("Failed to handle new block.", zap.Error(err))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ type MockSubscriber struct {
mu sync.RWMutex
}

func (ms *MockSubscriber) HandleBlock(blk *block.Block) error {
func (ms *MockSubscriber) ReceiveBlock(blk *block.Block) error {
ms.mu.Lock()
tsfs, _ := action.ClassifyActions(blk.Actions)
ms.counter += len(tsfs)
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockcreationsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import "github.com/iotexproject/iotex-core/blockchain/block"

// BlockCreationSubscriber is an interface which will get notified when a block is created
type BlockCreationSubscriber interface {
HandleBlock(*block.Block) error
ReceiveBlock(*block.Block) error
}
8 changes: 4 additions & 4 deletions blockchain/blockdao/indexbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type IndexBuilder struct {
}

// NewIndexBuilder instantiates an index builder
func NewIndexBuilder(chainID uint32, dao BlockDAO, indexer blockindex.Indexer) (*IndexBuilder, error) {
func NewIndexBuilder(chainID uint32, dao BlockDAO, indexer blockindex.Indexer, bufferSize uint64) (*IndexBuilder, error) {
timerFactory, err := prometheustimer.New(
"iotex_indexer_batch_time",
"Indexer batch time",
Expand All @@ -57,7 +57,7 @@ func NewIndexBuilder(chainID uint32, dao BlockDAO, indexer blockindex.Indexer) (
return nil, err
}
return &IndexBuilder{
pendingBlks: make(chan *block.Block, 8),
pendingBlks: make(chan *block.Block, bufferSize),
cancelChan: make(chan interface{}),
timerFactory: timerFactory,
dao: dao,
Expand Down Expand Up @@ -89,8 +89,8 @@ func (ib *IndexBuilder) Indexer() blockindex.Indexer {
return ib.indexer
}

// HandleBlock handles the block and create the indices for the actions and receipts in it
func (ib *IndexBuilder) HandleBlock(blk *block.Block) error {
// ReceiveBlock handles the block and create the indices for the actions and receipts in it
func (ib *IndexBuilder) ReceiveBlock(blk *block.Block) error {
ib.pendingBlks <- blk
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions blockchain/blockdao/indexbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestIndexer(t *testing.T) {
}()

ib := &IndexBuilder{
pendingBlks: make(chan *block.Block, 8),
pendingBlks: make(chan *block.Block, 1),
cancelChan: make(chan interface{}),
dao: dao,
indexer: indexer,
Expand All @@ -98,7 +98,7 @@ func TestIndexer(t *testing.T) {
// test handle 1 new block
require.NoError(dao.PutBlock(blks[2]))
require.NoError(dao.Commit())
ib.HandleBlock(blks[2])
ib.ReceiveBlock(blks[2])
time.Sleep(500 * time.Millisecond)

height, err = ib.indexer.GetBlockchainHeight()
Expand Down
2 changes: 1 addition & 1 deletion chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func New(
// config asks for a standalone indexer
var indexBuilder *blockdao.IndexBuilder
if gateway && cfg.Chain.EnableAsyncIndexWrite {
if indexBuilder, err = blockdao.NewIndexBuilder(chain.ChainID(), dao, indexer); err != nil {
if indexBuilder, err = blockdao.NewIndexBuilder(chain.ChainID(), dao, indexer, cfg.BlockSync.BufferSize); err != nil {
return nil, errors.Wrap(err, "failed to create index builder")
}
if err := chain.AddSubscriber(indexBuilder); err != nil {
Expand Down
30 changes: 0 additions & 30 deletions test/mock/mock_blockchain/mock_blockchain.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 30 additions & 15 deletions test/mock/mock_blockdao/mock_blockdao.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4292c6d

Please sign in to comment.