Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion gossip/evmstore/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package evmstore

import (
"fmt"
"math/big"

cc "github.com/Fantom-foundation/Carmen/go/common"
carmen "github.com/Fantom-foundation/Carmen/go/state"
_ "github.com/Fantom-foundation/Carmen/go/state/gostate"
"github.com/Fantom-foundation/go-opera/inter/state"
"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/inter/idx"
"github.com/ethereum/go-ethereum/common"
"math/big"
)

// NoArchiveError is an error returned by implementation of the State interface
// for archive operations if no archive is maintained by this implementation.
const NoArchiveError = carmen.NoArchiveError

// GetLiveStateDb obtains StateDB for block processing - the live writable state
func (s *Store) GetLiveStateDb(stateRoot hash.Hash) (state.StateDB, error) {
if s.liveStateDb == nil {
Expand Down
29 changes: 23 additions & 6 deletions gossip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type feedUpdate struct {
logs []*types.Log
}

type ArchiveBlockHeightSource interface {
GetArchiveBlockHeight() (uint64, bool, error)
}

func (f *ServiceFeed) SubscribeNewEpoch(ch chan<- idx.Epoch) notify.Subscription {
return f.scope.Track(f.newEpoch.Subscribe(ch))
}
Expand All @@ -89,7 +93,7 @@ func (f *ServiceFeed) SubscribeNewLogs(ch chan<- []*types.Log) notify.Subscripti
return f.scope.Track(f.newLogs.Subscribe(ch))
}

func (f *ServiceFeed) Start(store *evmstore.Store) {
func (f *ServiceFeed) Start(store ArchiveBlockHeightSource) {
incoming := make(chan feedUpdate, 1024)
f.incomingUpdates = incoming
stop := make(chan struct{})
Expand Down Expand Up @@ -122,12 +126,20 @@ func (f *ServiceFeed) Start(store *evmstore.Store) {

height, empty, err := store.GetArchiveBlockHeight()
if err != nil {
log.Error("failed to get archive block height", "err", err)
continue
}
if empty {
continue
// If there is no archive, set height to the last block
// and send all notifications
if errors.Is(err, evmstore.NoArchiveError) {
height = pending[len(pending)-1].block.Number.Uint64()
} else {
log.Error("failed to get archive block height", "err", err)
continue
}
} else {
if empty {
continue
}
}

for _, update := range pending {
if update.block.Number.Uint64() > height {
break
Expand All @@ -144,6 +156,10 @@ func (f *ServiceFeed) notifyAboutNewBlock(
block *evmcore.EvmBlock,
logs []*types.Log,
) {
// ignore updates if not started
if f.incomingUpdates == nil {
return
}
f.incomingUpdates <- feedUpdate{
block: block,
logs: logs,
Expand Down Expand Up @@ -517,6 +533,7 @@ func (s *Service) Start() error {
if s.store.evm.CheckLiveStateHash(blockState.LastBlock.Idx, blockState.FinalizedStateRoot) != nil {
return errors.New("fullsync isn't possible because state root is missing")
}

// start notification feeder
s.feed.Start(s.store.evm)

Expand Down
55 changes: 55 additions & 0 deletions gossip/service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

178 changes: 178 additions & 0 deletions gossip/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package gossip

import (
"fmt"
"math/big"
"testing"
"time"

"github.com/Fantom-foundation/go-opera/evmcore"
"github.com/Fantom-foundation/go-opera/gossip/evmstore"
"go.uber.org/mock/gomock"
)

func TestServiceFeed_SubscribeNewBlock(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockArchiveBlockHeightSource(ctrl)

store.EXPECT().GetArchiveBlockHeight().Return(uint64(12), false, nil).AnyTimes()

feed := ServiceFeed{}
feed.Start(store)

consumer := make(chan evmcore.ChainHeadNotify, 1)
feed.SubscribeNewBlock(consumer)

// There should be no signal delivered until there is a notification.
select {
case <-consumer:
t.Fatal("expected no notification to be sent")
case <-time.After(100 * time.Millisecond):
// all good
}

feed.notifyAboutNewBlock(&evmcore.EvmBlock{
EvmHeader: evmcore.EvmHeader{
Number: big.NewInt(12),
},
}, nil)

// The notification should be delivered.
select {
case notification := <-consumer:
if notification.Block.Number.Cmp(big.NewInt(12)) != 0 {
t.Fatalf("expected block number 12, got %d", notification.Block.Number)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("expected notification to be sent")
}

feed.Stop()
}

func TestServiceFeed_BlocksInOrder(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockArchiveBlockHeightSource(ctrl)

var startBlocknumber uint64 = 5
mockBlockNumber := startBlocknumber
expectedBlockNumber := startBlocknumber + 1

// Increment expected block height
store.EXPECT().GetArchiveBlockHeight().DoAndReturn(func() (uint64, bool, error) {
mockBlockNumber++
return mockBlockNumber, false, nil
}).AnyTimes()

feed := ServiceFeed{}
feed.Start(store)

consumer := make(chan evmcore.ChainHeadNotify, 5)
feed.SubscribeNewBlock(consumer)

// Emit blocks
blockNumbers := []int64{8, 6, 7, 10, 9}
for _, blockNumber := range blockNumbers {
feed.notifyAboutNewBlock(&evmcore.EvmBlock{
EvmHeader: evmcore.EvmHeader{
Number: big.NewInt(blockNumber),
},
}, nil)
}

// The notification should be delivered in correct order
for expectedBlockNumber <= startBlocknumber+uint64(len(blockNumbers)) {
select {
case notification := <-consumer:
if notification.Block.Number.Cmp(big.NewInt(int64(expectedBlockNumber))) != 0 {
t.Fatalf("expected block number %d, got %d", expectedBlockNumber, notification.Block.Number)
}
expectedBlockNumber++

case <-time.After(3 * time.Second):
t.Fatal("expected notification should be already received")
}
}

feed.Stop()
}

type expectedBlockNotification struct {
blockNumber uint64
}

func TestServiceFeed_ArchiveState(t *testing.T) {

tests := map[string]struct {
blockHeight uint64
emptyArchive bool
err error
expectedNotification *expectedBlockNotification
}{
"empty archive": {
blockHeight: 0,
emptyArchive: true,
err: nil,
expectedNotification: nil,
},
"non-empty archive": {
blockHeight: 12,
emptyArchive: false,
err: nil,
expectedNotification: &expectedBlockNotification{blockNumber: 12},
},
"non-existing archive": {
blockHeight: 12,
emptyArchive: true,
err: evmstore.NoArchiveError,
expectedNotification: &expectedBlockNotification{blockNumber: 12},
},
"different archive error": {
blockHeight: 12,
emptyArchive: false,
err: fmt.Errorf("some other error"),
expectedNotification: nil,
},
}

for testName, test := range tests {
t.Run(testName, func(t *testing.T) {

ctrl := gomock.NewController(t)
store := NewMockArchiveBlockHeightSource(ctrl)

store.EXPECT().GetArchiveBlockHeight().Return(test.blockHeight, test.emptyArchive, test.err).AnyTimes()

feed := ServiceFeed{}
feed.Start(store)

consumer := make(chan evmcore.ChainHeadNotify, 1)
feed.SubscribeNewBlock(consumer)

feed.notifyAboutNewBlock(&evmcore.EvmBlock{
EvmHeader: evmcore.EvmHeader{
Number: big.NewInt(int64(test.blockHeight)),
},
}, nil)

// The notification should be delivered.
select {
case notification := <-consumer:
if test.expectedNotification == nil {
t.Fatal("expected notification to be sent")
} else {
if notification.Block.Number.Cmp(big.NewInt(int64(test.expectedNotification.blockNumber))) != 0 {
t.Fatalf("expected block number %d, got %d", test.expectedNotification.blockNumber, notification.Block.Number)
}
}
// no notification should be received
case <-time.After(100 * time.Millisecond):
if test.expectedNotification != nil {
t.Fatal("expected no notification to be sent")
}
}

feed.Stop()
})
}
}