Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

header: Extract Head method into separate Head interface, make Syncer implement it #978

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions header/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ type Store interface {
// Getter contains the behavior necessary for a component to retrieve
// headers that have been processed during header sync.
type Getter interface {
// Head returns the ExtendedHeader of the chain head.
Head(context.Context) (*ExtendedHeader, error)
Head

// Get returns the ExtendedHeader corresponding to the given hash.
Get(context.Context, tmbytes.HexBytes) (*ExtendedHeader, error)
Expand All @@ -116,3 +115,11 @@ type Getter interface {
// GetRangeByHeight returns the given range [from:to) of ExtendedHeaders.
GetRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error)
}

// Head contains the behavior necessary for a component to retrieve
// the chain head. Note that "chain head" is subjective to the component
// reporting it.
type Head interface {
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
// Head returns the ExtendedHeader of the chain head.
Head(context.Context) (*ExtendedHeader, error)
}
27 changes: 23 additions & 4 deletions header/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -35,6 +36,10 @@ type Syncer struct {
exchange header.Exchange
store header.Store

// blockTime provides a reference point for the Syncer to determine
// whether its subjective head is outdated
blockTime time.Duration

// stateLk protects state which represents the current or latest sync
stateLk sync.RWMutex
state State
Expand All @@ -47,11 +52,12 @@ type Syncer struct {
}

// NewSyncer creates a new instance of Syncer.
func NewSyncer(exchange header.Exchange, store header.Store, sub header.Subscriber) *Syncer {
func NewSyncer(exchange header.Exchange, store header.Store, sub header.Subscriber, blockTime time.Duration) *Syncer {
return &Syncer{
sub: sub,
exchange: exchange,
store: store,
blockTime: blockTime,
triggerSync: make(chan struct{}, 1), // should be buffered
}
}
Expand Down Expand Up @@ -88,6 +94,12 @@ func (s *Syncer) WaitSync(ctx context.Context) error {
return err
}

// Head tries to return the Syncer's view of the objective head of the
// network.
func (s *Syncer) Head(ctx context.Context) (*header.ExtendedHeader, error) {
return s.trustedHead(ctx)
}

// State collects all the information about a sync.
type State struct {
ID uint64 // incrementing ID of a sync
Expand Down Expand Up @@ -133,16 +145,23 @@ func (s *Syncer) trustedHead(ctx context.Context) (*header.ExtendedHeader, error
return nil, err
}

// check if our subjective header is not expired and use it
if !sbj.IsExpired() {
// check if our subjective header is not expired and not too outdated (relative to the
// network's block time) and use it
if !sbj.IsExpired() && (time.Now().Sub(sbj.Time) <= s.blockTime) {
return sbj, nil
}

// otherwise, request head from a trustedPeer or, in other words, do automatic subjective initialization
// otherwise, attempt to request head from a trustedPeer or, in other words, do
// automatic subjective initialization
objHead, err := s.exchange.Head(ctx)
if err != nil {
return nil, err
}
// ensure that head returned from trustedPeer is more recent than subjective head
// as it is possible that the trustedPeer's reported head is not current.
if objHead.Height < sbj.Height {
renaynay marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("trusted peer failed to return current network head")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return sbj head here bc it's highest we know of at that point

}

s.pending.Add(objHead)
return objHead, nil
Expand Down
49 changes: 46 additions & 3 deletions header/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/celestiaorg/celestia-node/header/store"
)

var blockTime = time.Minute

func TestSyncSimpleRequestingHead(t *testing.T) {
// this way we force local head of Syncer to expire, so it requests a new one from trusted peer
header.TrustingPeriod = time.Microsecond
Expand All @@ -33,7 +35,7 @@ func TestSyncSimpleRequestingHead(t *testing.T) {
require.NoError(t, err)

localStore := store.NewTestStore(ctx, t, head)
syncer := NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{})
syncer := NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{}, blockTime)
err = syncer.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -67,7 +69,7 @@ func TestSyncCatchUp(t *testing.T) {

remoteStore := store.NewTestStore(ctx, t, head)
localStore := store.NewTestStore(ctx, t, head)
syncer := NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{})
syncer := NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{}, blockTime)
// 1. Initial sync
err := syncer.Start(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -111,7 +113,7 @@ func TestSyncPendingRangesWithMisses(t *testing.T) {

remoteStore := store.NewTestStore(ctx, t, head)
localStore := store.NewTestStore(ctx, t, head)
syncer := NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{})
syncer := NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{}, blockTime)
err := syncer.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -153,3 +155,44 @@ func TestSyncPendingRangesWithMisses(t *testing.T) {
assert.Equal(t, exp.Height, have.Height)
assert.Empty(t, syncer.pending.Head()) // assert all cache from pending is used
}

// TestSyncHead tests the Syncer's Head method.
func TestSyncHead(t *testing.T) {
// this way we force local head of Syncer to expire, so it requests a new one from trusted peer
header.TrustingPeriod = time.Microsecond

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

suite := header.NewTestSuite(t, 3)
head := suite.Head()

remoteStore := store.NewTestStore(ctx, t, head)
_, err := remoteStore.Append(ctx, suite.GenExtendedHeaders(100)...)
require.NoError(t, err)

_, err = remoteStore.GetByHeight(ctx, 100)
require.NoError(t, err)

localStore := store.NewTestStore(ctx, t, head)
syncer := NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{}, blockTime)
err = syncer.Start(ctx)
require.NoError(t, err)

_, err = localStore.GetByHeight(ctx, 100)
require.NoError(t, err)

// send some headers through "headersub" to increase pending cache
headers := suite.GenExtendedHeaders(10)
res := syncer.processIncoming(ctx, headers[len(headers)-1])
require.Equal(t, pubsub.ValidationAccept, res)

// compare Syncer's reported head against local store's head
storeHead, err := localStore.Head(ctx)
require.NoError(t, err)
syncHead, err := syncer.Head(ctx)
require.NoError(t, err)

// ensure Syncer reports its head from pending cache rather than the store
assert.Greater(t, syncHead.Height, storeHead.Height)
}
2 changes: 1 addition & 1 deletion node/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func baseComponents(cfg *Config, store Store) fx.Option {
fx.Provide(services.HeaderStore),
fx.Invoke(services.HeaderStoreInit(&cfg.Services)),
fxutil.ProvideAs(services.FraudService, new(fraud.Service), new(fraud.Subscriber)),
fx.Provide(services.HeaderSyncer),
fx.Provide(services.HeaderSyncer(cfg.Services)),
fxutil.ProvideAs(services.P2PSubscriber, new(header.Broadcaster), new(header.Subscriber)),
fx.Provide(services.HeaderP2PExchangeServer),
// p2p components
Expand Down
8 changes: 8 additions & 0 deletions node/config_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func WithTrustedPeers(addr ...string) Option {
}
}

// WithBlockTime overrides the default block time with the given
// duration.
func WithBlockTime(blockTime time.Duration) Option {
return func(sets *settings) {
sets.cfg.Services.BlockTime = blockTime
}
}

// WithPeersLimit overrides default peer limit for peers found during discovery.
func WithPeersLimit(limit uint) Option {
return func(sets *settings) {
Expand Down
3 changes: 2 additions & 1 deletion node/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/celestiaorg/celestia-node/header/local"
"github.com/celestiaorg/celestia-node/header/store"
"github.com/celestiaorg/celestia-node/header/sync"
"github.com/celestiaorg/celestia-node/params"
service "github.com/celestiaorg/celestia-node/service/header"
"github.com/celestiaorg/celestia-node/service/rpc"
"github.com/celestiaorg/celestia-node/service/share"
Expand Down Expand Up @@ -242,7 +243,7 @@ func setupHeaderService(ctx context.Context, t *testing.T) *service.Service {
_, err := localStore.Append(ctx, suite.GenExtendedHeaders(5)...)
require.NoError(t, err)
// create syncer
syncer := sync.NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{})
syncer := sync.NewSyncer(local.NewExchange(remoteStore), localStore, &header.DummySubscriber{}, params.BlockTime)

return service.NewHeaderService(syncer, nil, nil, nil, localStore)
}
Expand Down
3 changes: 3 additions & 0 deletions node/services/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {
// Note: The trusted does *not* imply Headers are not verified, but trusted as reliable to fetch headers
// at any moment.
TrustedPeers []string
// BlockTime is the block time of the network.
BlockTime time.Duration
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still kind of iffy on this being a config value. While it doesn't really impact anything other than the frequency at which the syncer requests a new objective head from its trustedPeer, if we do end up using this config param elsewhere in the code, we would have to be very cautious on how it is used. Wdyt @Wondertan ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not make it part of the config value but a part of params pkg(#790). Taken from the params, the value then gets pass to the Syncer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, I don't think we should make it an option on the node, as by now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls check: 9d89774

// NOTE: All further fields related to share/discovery.
// PeersLimit defines how many peers will be added during discovery.
PeersLimit uint
Expand All @@ -36,6 +38,7 @@ func DefaultConfig() Config {
return Config{
TrustedHash: "",
TrustedPeers: make([]string, 0),
BlockTime: params.BlockTime,
PeersLimit: 3,
DiscoveryInterval: time.Second * 30,
AdvertiseInterval: time.Second * 30,
Expand Down
29 changes: 18 additions & 11 deletions node/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,31 @@ import (
)

// HeaderSyncer creates a new Syncer.
func HeaderSyncer(
func HeaderSyncer(cfg Config) func(
ctx context.Context,
lc fx.Lifecycle,
ex header.Exchange,
store header.Store,
sub header.Subscriber,
fservice fraud.Service,
) (*sync.Syncer, error) {
syncer := sync.NewSyncer(ex, store, sub)
lifecycleCtx := fxutil.WithLifecycle(ctx, lc)
lc.Append(fx.Hook{
OnStart: func(startCtx context.Context) error {
return FraudLifecycle(startCtx, lifecycleCtx, fraud.BadEncoding, fservice, syncer.Start, syncer.Stop)
},
OnStop: syncer.Stop,
})

return syncer, nil
return func(ctx context.Context,
lc fx.Lifecycle,
ex header.Exchange,
store header.Store,
sub header.Subscriber,
fservice fraud.Service,
) (*sync.Syncer, error) {
syncer := sync.NewSyncer(ex, store, sub, cfg.BlockTime)
lifecycleCtx := fxutil.WithLifecycle(ctx, lc)
lc.Append(fx.Hook{
OnStart: func(startCtx context.Context) error {
return FraudLifecycle(startCtx, lifecycleCtx, fraud.BadEncoding, fservice, syncer.Start, syncer.Stop)
},
OnStop: syncer.Stop,
})
return syncer, nil
}
}

// P2PSubscriber creates a new p2p.Subscriber.
Expand Down
1 change: 1 addition & 0 deletions node/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestNode(t *testing.T, tp Type, opts ...Option) *Node {
WithNetwork(params.Private),
WithRPCPort("0"),
WithKeyringSigner(TestKeyringSigner(t)),
WithBlockTime(time.Microsecond),
)
nd, err := New(tp, store, opts...)
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions params/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package params

import (
"errors"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)
Expand All @@ -13,6 +14,8 @@ const (
// Private can be used to set up any private network, including local testing setups.
// Use CELESTIA_PRIVATE_GENESIS env var to enable Private by specifying its genesis block hash.
Private Network = "private"
// TODO @renaynay @Wondertan: Set this once it's agreed upon, ideally this could point at a core const
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockTime = time.Second * 30
)

// Network is a type definition for DA network run by Celestia Node.
Expand Down