Skip to content

Commit

Permalink
refactor: model finalised notifier channel after imported notifier ch…
Browse files Browse the repository at this point in the history
…annel (#1816)
  • Loading branch information
edwardmack authored Oct 6, 2021
1 parent 2b1276d commit cf04c2d
Show file tree
Hide file tree
Showing 18 changed files with 114 additions and 334 deletions.
4 changes: 2 additions & 2 deletions dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type BlockState interface {
GetFinalisedHash(uint64, uint64) (common.Hash, error)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
GetFinalisedNotifierChannel() chan *types.FinalisationInfo
FreeFinalisedNotifierChannel(ch chan *types.FinalisationInfo)
HighestCommonAncestor(a, b common.Hash) (common.Hash, error)
SubChain(start, end common.Hash) ([]common.Hash, error)
GetBlockBody(hash common.Hash) (*types.Body, error)
Expand Down
47 changes: 21 additions & 26 deletions dot/core/mocks/block_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 4 additions & 12 deletions dot/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ type Handler struct {
grandpaState GrandpaState

// block notification channels
imported chan *types.Block
finalised chan *types.FinalisationInfo
finalisedID byte
imported chan *types.Block
finalised chan *types.FinalisationInfo

// GRANDPA changes
grandpaScheduledChange *grandpaChange
Expand All @@ -75,12 +74,7 @@ type resume struct {
func NewHandler(blockState BlockState, epochState EpochState, grandpaState GrandpaState) (*Handler, error) {
imported := blockState.GetImportedBlockNotifierChannel()

finalised := make(chan *types.FinalisationInfo, 16)

fid, err := blockState.RegisterFinalizedChannel(finalised)
if err != nil {
return nil, err
}
finalised := blockState.GetFinalisedNotifierChannel()

ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -92,7 +86,6 @@ func NewHandler(blockState BlockState, epochState EpochState, grandpaState Grand
grandpaState: grandpaState,
imported: imported,
finalised: finalised,
finalisedID: fid,
}, nil
}

Expand All @@ -107,8 +100,7 @@ func (h *Handler) Start() error {
func (h *Handler) Stop() error {
h.cancel()
h.blockState.FreeImportedBlockNotifierChannel(h.imported)
h.blockState.UnregisterFinalisedChannel(h.finalisedID)
close(h.finalised)
h.blockState.FreeFinalisedNotifierChannel(h.finalised)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions dot/digest/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type BlockState interface {
BestBlockHeader() (*types.Header, error)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
GetFinalisedNotifierChannel() chan *types.FinalisationInfo
FreeFinalisedNotifierChannel(ch chan *types.FinalisationInfo)
}

// EpochState is the interface for state.EpochState
Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type BlockAPI interface {
GetJustification(hash common.Hash) ([]byte, error)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
GetFinalisedNotifierChannel() chan *types.FinalisationInfo
FreeFinalisedNotifierChannel(ch chan *types.FinalisationInfo)
SubChain(start, end common.Hash) ([]common.Hash, error)
RegisterRuntimeUpdatedChannel(ch chan<- runtime.Version) (uint32, error)
UnregisterRuntimeUpdatedChannel(id uint32) bool
Expand Down
5 changes: 2 additions & 3 deletions dot/rpc/modules/api_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ func NewMockBlockAPI() *modulesmocks.MockBlockAPI {
m.On("GetHighestFinalisedHash").Return(common.Hash{}, nil)
m.On("GetImportedBlockNotifierChannel").Return(make(chan *types.Block, 5))
m.On("FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block"))
m.On("UnregisterImportedChannel", mock.AnythingOfType("uint8"))
m.On("RegisterFinalizedChannel", mock.AnythingOfType("chan<- *types.FinalisationInfo")).Return(byte(0), nil)
m.On("UnregisterFinalizedChannel", mock.AnythingOfType("uint8"))
m.On("GetFinalisedNotifierChannel").Return(make(chan *types.FinalisationInfo, 5))
m.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))
m.On("GetJustification", mock.AnythingOfType("common.Hash")).Return(make([]byte, 10), nil)
m.On("HasJustification", mock.AnythingOfType("common.Hash")).Return(true, nil)
m.On("SubChain", mock.AnythingOfType("common.Hash"), mock.AnythingOfType("common.Hash")).Return(make([]common.Hash, 0), nil)
Expand Down
47 changes: 21 additions & 26 deletions dot/rpc/modules/mocks/block_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 18 additions & 24 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func (l *BlockListener) Stop() error {
type BlockFinalizedListener struct {
channel chan *types.FinalisationInfo
wsconn *WSConn
chanID byte
subID uint32
done chan struct{}
cancel chan struct{}
Expand All @@ -189,7 +188,7 @@ type BlockFinalizedListener struct {
func (l *BlockFinalizedListener) Listen() {
go func() {
defer func() {
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.chanID)
l.wsconn.BlockAPI.FreeFinalisedNotifierChannel(l.channel)
close(l.done)
}()

Expand Down Expand Up @@ -229,12 +228,11 @@ type AllBlocksListener struct {
finalizedChan chan *types.FinalisationInfo
importedChan chan *types.Block

wsconn *WSConn
finalizedChanID byte
subID uint32
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
wsconn *WSConn
subID uint32
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
}

func newAllBlockListener(conn *WSConn) *AllBlocksListener {
Expand All @@ -243,7 +241,6 @@ func newAllBlockListener(conn *WSConn) *AllBlocksListener {
done: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
wsconn: conn,
finalizedChan: make(chan *types.FinalisationInfo, DEFAULT_BUFFER_SIZE),
}
}

Expand All @@ -252,9 +249,8 @@ func (l *AllBlocksListener) Listen() {
go func() {
defer func() {
l.wsconn.BlockAPI.FreeImportedBlockNotifierChannel(l.importedChan)
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.finalizedChanID)
l.wsconn.BlockAPI.FreeFinalisedNotifierChannel(l.finalizedChan)

close(l.finalizedChan)
close(l.done)
}()

Expand Down Expand Up @@ -307,16 +303,15 @@ func (l *AllBlocksListener) Stop() error {

// ExtrinsicSubmitListener to handle listening for extrinsic events
type ExtrinsicSubmitListener struct {
wsconn *WSConn
subID uint32
extrinsic types.Extrinsic
importedChan chan *types.Block
importedHash common.Hash
finalisedChan chan *types.FinalisationInfo
finalisedChanID byte
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
wsconn *WSConn
subID uint32
extrinsic types.Extrinsic
importedChan chan *types.Block
importedHash common.Hash
finalisedChan chan *types.FinalisationInfo
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
}

// NewExtrinsicSubmitListener constructor to build new ExtrinsicSubmitListener
Expand All @@ -338,7 +333,7 @@ func (l *ExtrinsicSubmitListener) Listen() {
go func() {
defer func() {
l.wsconn.BlockAPI.FreeImportedBlockNotifierChannel(l.importedChan)
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.finalisedChanID)
l.wsconn.BlockAPI.FreeFinalisedNotifierChannel(l.finalisedChan)
close(l.done)
close(l.finalisedChan)
}()
Expand Down Expand Up @@ -459,7 +454,6 @@ type GrandpaJustificationListener struct {
done chan struct{}
wsconn *WSConn
subID uint32
finalisedChID byte
finalisedCh chan *types.FinalisationInfo
}

Expand All @@ -468,7 +462,7 @@ func (g *GrandpaJustificationListener) Listen() {
// listen for finalised headers
go func() {
defer func() {
g.wsconn.BlockAPI.UnregisterFinalisedChannel(g.finalisedChID)
g.wsconn.BlockAPI.FreeFinalisedNotifierChannel(g.finalisedCh)
close(g.done)
}()

Expand Down
10 changes: 5 additions & 5 deletions dot/rpc/subscription/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestBlockFinalizedListener_Listen(t *testing.T) {
defer cancel()

BlockAPI := new(mocks.MockBlockAPI)
BlockAPI.On("UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
BlockAPI.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))

wsconn.BlockAPI = BlockAPI

Expand All @@ -165,7 +165,7 @@ func TestBlockFinalizedListener_Listen(t *testing.T) {
defer func() {
require.NoError(t, bfl.Stop())
time.Sleep(time.Millisecond * 10)
BlockAPI.AssertCalled(t, "UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
BlockAPI.AssertCalled(t, "FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))
}()

notifyChan <- &types.FinalisationInfo{
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {

BlockAPI := new(mocks.MockBlockAPI)
BlockAPI.On("FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block"))
BlockAPI.On("UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
BlockAPI.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))

wsconn.BlockAPI = BlockAPI

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {
time.Sleep(time.Millisecond * 10)

BlockAPI.AssertCalled(t, "FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block"))
BlockAPI.AssertCalled(t, "UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
BlockAPI.AssertCalled(t, "FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))
}()

notifyImportedChan <- block
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestGrandpaJustification_Listen(t *testing.T) {

blockStateMock := new(mocks.MockBlockAPI)
blockStateMock.On("GetJustification", mock.AnythingOfType("common.Hash")).Return(mockedJustBytes, nil)
blockStateMock.On("UnregisterFinalisedChannel", mock.AnythingOfType("uint8"))
blockStateMock.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))
wsconn.BlockAPI = blockStateMock

finchannel := make(chan *types.FinalisationInfo)
Expand Down
Loading

0 comments on commit cf04c2d

Please sign in to comment.