Skip to content

Commit

Permalink
change block creation subscription implementation (#307)
Browse files Browse the repository at this point in the history
* change block creation subscription implementation

* address comments
  • Loading branch information
CoderZhi authored Nov 17, 2018
1 parent cba5ad0 commit 909a28c
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 216 deletions.
4 changes: 2 additions & 2 deletions action/protocols/multichain/mainchain/createdeposit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestValidateDeposit(t *testing.T) {
chain := mock_blockchain.NewMockBlockchain(ctrl)
chain.EXPECT().ChainID().Return(uint32(1)).AnyTimes()
chain.EXPECT().GetFactory().Return(sf).AnyTimes()
chain.EXPECT().SubscribeBlockCreation(gomock.Any()).Return(nil).AnyTimes()
chain.EXPECT().AddSubscriber(gomock.Any()).Return(nil).AnyTimes()

defer func() {
require.NoError(t, sf.Stop(ctx))
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestMutateDeposit(t *testing.T) {
chain := mock_blockchain.NewMockBlockchain(ctrl)
chain.EXPECT().ChainID().Return(uint32(1)).AnyTimes()
chain.EXPECT().GetFactory().Return(sf).AnyTimes()
chain.EXPECT().SubscribeBlockCreation(gomock.Any()).Return(nil).AnyTimes()
chain.EXPECT().AddSubscriber(gomock.Any()).Return(nil).AnyTimes()

defer func() {
require.NoError(t, sf.Stop(ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestHandleStartSubChain(t *testing.T) {
chain := mock_blockchain.NewMockBlockchain(ctrl)
chain.EXPECT().ChainID().Return(uint32(1)).AnyTimes()
chain.EXPECT().GetFactory().Return(sf).AnyTimes()
chain.EXPECT().SubscribeBlockCreation(gomock.Any()).Return(nil).AnyTimes()
chain.EXPECT().AddSubscriber(gomock.Any()).Return(nil).AnyTimes()

defer func() {
require.NoError(t, sf.Stop(ctx))
Expand Down
41 changes: 19 additions & 22 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ type Blockchain interface {
// cause any state change
ExecuteContractRead(ex *action.Execution) (*action.Receipt, error)

// SubscribeBlockCreation make you listen to every single produced block
SubscribeBlockCreation(ch chan *Block) error
// AddSubscriber make you listen to every single produced block
AddSubscriber(BlockCreationSubscriber) error

// UnsubscribeBlockCreation make you listen to every single produced block
UnsubscribeBlockCreation(ch chan *Block) error
// RemoveSubscriber make you listen to every single produced block
RemoveSubscriber(BlockCreationSubscriber) error
}

// blockchain implements the Blockchain interface
Expand All @@ -153,7 +153,7 @@ type blockchain struct {
validator Validator
lifecycle lifecycle.Lifecycle
clk clock.Clock
blocklistener []chan *Block
blocklistener []BlockCreationSubscriber

// used by account-based model
sf factory.Factory
Expand Down Expand Up @@ -708,19 +708,19 @@ func (bc *blockchain) Validator() Validator {
return bc.validator
}

func (bc *blockchain) SubscribeBlockCreation(ch chan *Block) error {
func (bc *blockchain) AddSubscriber(s BlockCreationSubscriber) error {
bc.mu.Lock()
defer bc.mu.Unlock()
logger.Info().Msg("Add a subscriber")
bc.blocklistener = append(bc.blocklistener, ch)
bc.blocklistener = append(bc.blocklistener, s)
return nil
}

func (bc *blockchain) UnsubscribeBlockCreation(ch chan *Block) error {
func (bc *blockchain) RemoveSubscriber(s BlockCreationSubscriber) error {
bc.mu.Lock()
defer bc.mu.Unlock()
for i, handler := range bc.blocklistener {
if ch == handler {
for i, sub := range bc.blocklistener {
if sub == s {
bc.blocklistener = append(bc.blocklistener[:i], bc.blocklistener[i+1:]...)
logger.Info().Msg("Successfully unsubscribe block creation")
return nil
Expand Down Expand Up @@ -944,9 +944,7 @@ func (bc *blockchain) commitBlock(blk *Block) error {
return err
}
// emit block to all block subscribers
if err := bc.emitToSubscribers(blk); err != nil {
return errors.Wrap(err, "failed to emit to block subscribers")
}
bc.emitToSubscribers(blk)
// update tip hash and height
bc.tipHeight = blk.Header.height
bc.tipHash = blk.HashBlock()
Expand Down Expand Up @@ -1000,18 +998,17 @@ func (bc *blockchain) runActions(blk *Block, ws factory.WorkingSet, verify bool)
return root, nil
}

func (bc *blockchain) emitToSubscribers(blk *Block) error {
// return if there is no subscribers
func (bc *blockchain) emitToSubscribers(blk *Block) {
if bc.blocklistener == nil {
return nil
return
}

for _, handler := range bc.blocklistener {
go func(handler chan *Block) {
handler <- blk
}(handler)
for _, s := range bc.blocklistener {
go func(bcs BlockCreationSubscriber, b *Block) {
if err := bcs.HandleBlock(b); err != nil {
logger.Error().Err(err).Msg("Failed to handle new block")
}
}(s, blk)
}
return nil
}

func (bc *blockchain) now() uint64 { return uint64(bc.clk.Now().Unix()) }
52 changes: 21 additions & 31 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ func TestCreateBlockchain(t *testing.T) {
require.Equal(5, int(height))
}

type MockSubscriber struct {
counter int
}

func (ms *MockSubscriber) HandleBlock(blk *Block) error {
tsfs, _, _ := action.ClassifyActions(blk.Actions)
ms.counter += len(tsfs)
return nil
}

func TestLoadBlockchainfromDB(t *testing.T) {
require := require.New(t)
ctx := context.Background()
Expand Down Expand Up @@ -248,27 +258,17 @@ func TestLoadBlockchainfromDB(t *testing.T) {
require.NoError(bc.Start(ctx))
require.NotNil(bc)

var transfers = int(0)
testchan := make(chan *Block)
err = bc.SubscribeBlockCreation(testchan)
require.Nil(err)
go func() {
for {
select {
case blk := <-testchan:
tsfs, _, _ := action.ClassifyActions(blk.Actions)
transfers += len(tsfs)
}
}
}()
require.Equal(0, transfers)
ms := &MockSubscriber{counter: 0}
err = bc.AddSubscriber(ms)
require.Nil(err)
require.Equal(0, ms.counter)

height := bc.TipHeight()
fmt.Printf("Open blockchain pass, height = %d\n", height)
require.Nil(addTestingTsfBlocks(bc))
err = bc.Stop(ctx)
require.NoError(err)
require.Equal(27, transfers)
require.Equal(27, ms.counter)

// Load a blockchain from DB
bc = NewBlockchain(cfg, PrecreatedStateFactoryOption(sf), BoltDBDaoOption())
Expand Down Expand Up @@ -473,29 +473,19 @@ func TestLoadBlockchainfromDBWithoutExplorer(t *testing.T) {
require.NoError(bc.Start(ctx))
require.NotNil(bc)

var transfers = int(0)
testchan := make(chan *Block)
err = bc.SubscribeBlockCreation(testchan)
require.Nil(err)
go func() {
for {
select {
case blk := <-testchan:
tsfs, _, _ := action.ClassifyActions(blk.Actions)
transfers += len(tsfs)
}
}
}()
require.Equal(0, transfers)
err = bc.UnsubscribeBlockCreation(testchan)
ms := &MockSubscriber{counter: 0}
err = bc.AddSubscriber(ms)
require.Nil(err)
require.Equal(0, ms.counter)
err = bc.RemoveSubscriber(ms)
require.Nil(err)

height := bc.TipHeight()
fmt.Printf("Open blockchain pass, height = %d\n", height)
require.Nil(addTestingTsfBlocks(bc))
err = bc.Stop(ctx)
require.NoError(err)
require.Equal(0, transfers)
require.Equal(0, ms.counter)

// Load a blockchain from DB
bc = NewBlockchain(cfg, PrecreatedStateFactoryOption(sf), BoltDBDaoOption())
Expand Down
12 changes: 12 additions & 0 deletions blockchain/blockcreationsubscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2018 IoTeX
// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no
// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent
// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache
// License 2.0 that can be found in the LICENSE file.

package blockchain

// BlockCreationSubscriber is an interface which will get notified when a block is created
type BlockCreationSubscriber interface {
HandleBlock(*Block) error
}
38 changes: 25 additions & 13 deletions chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import (

// ChainService is a blockchain service with all blockchain components.
type ChainService struct {
actpool actpool.ActPool
blocksync blocksync.BlockSync
consensus consensus.Consensus
chain blockchain.Blockchain
explorer *explorer.Server
indexservice *indexservice.Server
protocols []Protocol
actpool actpool.ActPool
blocksync blocksync.BlockSync
consensus consensus.Consensus
chain blockchain.Blockchain
explorer *explorer.Server
indexservice *indexservice.Server
protocols []Protocol
runningStatus bool
}

type optionParams struct {
Expand Down Expand Up @@ -124,12 +125,13 @@ func New(cfg config.Config, p2p network.Overlay, dispatcher dispatcher.Dispatche
}

return &ChainService{
actpool: actPool,
chain: chain,
blocksync: bs,
consensus: consensus,
indexservice: idx,
explorer: exp,
actpool: actPool,
chain: chain,
blocksync: bs,
consensus: consensus,
indexservice: idx,
explorer: exp,
runningStatus: false,
}, nil
}

Expand All @@ -154,11 +156,15 @@ func (cs *ChainService) Start(ctx context.Context) error {
return errors.Wrap(err, "error when starting explorer")
}
}
cs.runningStatus = true
return nil
}

// Stop stops the server
func (cs *ChainService) Stop(ctx context.Context) error {
if !cs.runningStatus {
return nil
}
if cs.explorer != nil {
if err := cs.explorer.Stop(ctx); err != nil {
return errors.Wrap(err, "error when stopping explorer")
Expand All @@ -178,9 +184,15 @@ func (cs *ChainService) Stop(ctx context.Context) error {
if err := cs.chain.Stop(ctx); err != nil {
return errors.Wrap(err, "error when stopping blockchain")
}
cs.runningStatus = false
return nil
}

// IsRunning returns whether the chain service is running
func (cs *ChainService) IsRunning() bool {
return cs.runningStatus
}

// HandleAction handles incoming action request.
func (cs *ChainService) HandleAction(actPb *pb.ActionPb) error {
var act action.Action
Expand Down
5 changes: 5 additions & 0 deletions indexservice/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ var (
ErrAlreadyExist = errors.New("already exist in DB")
)

// HandleBlock is an implementation of interface BlockCreationSubscriber
func (idx *Indexer) HandleBlock(blk *blockchain.Block) error {
return idx.BuildIndex(blk)
}

// BuildIndex build the index for a block
func (idx *Indexer) BuildIndex(blk *blockchain.Block) error {
idx.rds.Transact(func(tx *sql.Tx) error {
Expand Down
43 changes: 14 additions & 29 deletions indexservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,30 @@ import (

// Server is the container of the index service
type Server struct {
cfg config.Config
idx *Indexer
bc blockchain.Blockchain
blockCh chan *blockchain.Block
cfg config.Config
idx *Indexer
bc blockchain.Blockchain
}

// NewServer instantiates an index service
func NewServer(
cfg config.Config,
bc blockchain.Blockchain,
) *Server {
blockCh := make(chan *blockchain.Block)
if err := bc.SubscribeBlockCreation(blockCh); err != nil {
indexer := &Indexer{
cfg: cfg.Indexer,
rds: nil,
hexEncodedNodeAddr: "",
}
if err := bc.AddSubscriber(indexer); err != nil {
logger.Error().Err(err).Msg("error when subscribe to block")
return nil
}

return &Server{
cfg: cfg,
idx: &Indexer{
cfg: cfg.Indexer,
rds: nil,
hexEncodedNodeAddr: "",
},
bc: bc,
blockCh: blockCh,
idx: indexer,
bc: bc,
}
}

Expand All @@ -66,17 +64,6 @@ func (s *Server) Start(ctx context.Context) error {
return errors.Wrap(err, "error when start rds store")
}

go func() {
for {
select {
case blk := <-s.blockCh:
if err := s.idx.BuildIndex(blk); err != nil {
logger.Error().Err(err).Uint64("height", blk.Height()).Msg("failed to build index for block")
}
}
}
}()

return nil
}

Expand All @@ -86,12 +73,10 @@ func (s *Server) Stop(ctx context.Context) error {
return errors.Wrap(err, "error when shutting down explorer http server")
}
logger.Info().Msgf("Unsubscribe block creation for chain %d", s.bc.ChainID())
if err := s.bc.UnsubscribeBlockCreation(s.blockCh); err != nil {
return errors.Wrap(err, "error when un subscribe block creation")
}
close(s.blockCh)
for range s.blockCh {
if err := s.bc.RemoveSubscriber(s.idx); err != nil {
return errors.Wrap(err, "error when unsubscribe block creation")
}

return nil
}

Expand Down
Loading

0 comments on commit 909a28c

Please sign in to comment.