Skip to content

Commit e8e8946

Browse files
alpejulienrbrt
andauthored
feat: DA hints in p2p (#2891)
* wip * x * Review feedback * Encapsulate hint in sync package * Async DA pull * Minor cleanup * Indipendent types for p2p store * Merge updates * Bump sonic version * Make tidy-all * Use envelope for p2p store * Minor cleanup * Better test data (cherry picked from commit ad3e21b) * Linter * Resolve merge conflicts * Tidy all * Integrate changes * updates * build fixes * fixes * cleanup * updates * implement setting * limit abstractions * linting * use same key as sequencer * fix unit test * rename working pool to avoid naming confusion * persist da/height for da synced nodes as well for consistency * fixes * use height instead of hashes * fixes * add docs * remove worker pool * fix mocks * cleanup (syncer should not care about da included height) * rename for clarity * Update syncer_test.go * speed-up queue * uncomment replaces * go mod tidy --------- Co-authored-by: Julien Robert <julien@rbrt.fr>
1 parent 2482622 commit e8e8946

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1672
-393
lines changed

.mockery.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ packages:
6161
dir: ./block/internal/syncing
6262
pkgname: syncing
6363
filename: syncer_mock.go
64+
6465
github.com/evstack/ev-node/block/internal/common:
6566
interfaces:
6667
Broadcaster:

apps/evm/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm
22

33
go 1.25.0
44

5-
//replace (
6-
// github.com/evstack/ev-node => ../../
7-
// github.com/evstack/ev-node/execution/evm => ../../execution/evm
8-
//)
5+
replace (
6+
github.com/evstack/ev-node => ../../
7+
github.com/evstack/ev-node/execution/evm => ../../execution/evm
8+
)
99

1010
require (
1111
github.com/ethereum/go-ethereum v1.16.8

apps/evm/go.sum

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,8 @@ github.com/ethereum/go-ethereum v1.16.8 h1:LLLfkZWijhR5m6yrAXbdlTeXoqontH+Ga2f9i
409409
github.com/ethereum/go-ethereum v1.16.8/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk=
410410
github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8=
411411
github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk=
412-
github.com/evstack/ev-node v1.0.0-rc.2 h1:gUQzLTkCj6D751exm/FIR/yw2aXWiW2aEREEwtxMvw0=
413-
github.com/evstack/ev-node v1.0.0-rc.2/go.mod h1:Qa2nN1D6PJQRU2tiarv6X5Der5OZg/+2QGY/K2mA760=
414412
github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE=
415413
github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
416-
github.com/evstack/ev-node/execution/evm v1.0.0-rc.2 h1:t7os7ksmPhf2rWY2psVBowyc+iuneMDPwBGQaxSckus=
417-
github.com/evstack/ev-node/execution/evm v1.0.0-rc.2/go.mod h1:ahxKQfPlJ5C7g15Eq9Mjn2tQnn59T0kIm9B10zDhcTI=
418414
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
419415
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
420416
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=

apps/grpc/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/grpc
22

33
go 1.25.0
44

5-
//replace (
6-
// github.com/evstack/ev-node => ../../
7-
// github.com/evstack/ev-node/execution/grpc => ../../execution/grpc
8-
//)
5+
replace (
6+
github.com/evstack/ev-node => ../../
7+
github.com/evstack/ev-node/execution/grpc => ../../execution/grpc
8+
)
99

1010
require (
1111
github.com/evstack/ev-node v1.0.0-rc.2

apps/grpc/go.sum

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,12 +365,8 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni
365365
github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss=
366366
github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs=
367367
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
368-
github.com/evstack/ev-node v1.0.0-rc.2 h1:gUQzLTkCj6D751exm/FIR/yw2aXWiW2aEREEwtxMvw0=
369-
github.com/evstack/ev-node v1.0.0-rc.2/go.mod h1:Qa2nN1D6PJQRU2tiarv6X5Der5OZg/+2QGY/K2mA760=
370368
github.com/evstack/ev-node/core v1.0.0-rc.1 h1:Dic2PMUMAYUl5JW6DkDj6HXDEWYzorVJQuuUJOV0FjE=
371369
github.com/evstack/ev-node/core v1.0.0-rc.1/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
372-
github.com/evstack/ev-node/execution/grpc v1.0.0-rc.1 h1:OzrWLDDY6/9+LWx0XmUqPzxs/CHZRJICOwQ0Me/i6dY=
373-
github.com/evstack/ev-node/execution/grpc v1.0.0-rc.1/go.mod h1:Pr/sF6Zx8am9ZeWFcoz1jYPs0kXmf+OmL8Tz2Gyq7E4=
374370
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
375371
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
376372
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=

block/components.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77

8+
"github.com/celestiaorg/go-header"
89
"github.com/rs/zerolog"
910

1011
"github.com/evstack/ev-node/block/internal/cache"
@@ -20,6 +21,7 @@ import (
2021
"github.com/evstack/ev-node/pkg/genesis"
2122
"github.com/evstack/ev-node/pkg/signer"
2223
"github.com/evstack/ev-node/pkg/store"
24+
"github.com/evstack/ev-node/pkg/sync"
2325
"github.com/evstack/ev-node/pkg/telemetry"
2426
"github.com/evstack/ev-node/types"
2527
)
@@ -127,8 +129,10 @@ func NewSyncComponents(
127129
store store.Store,
128130
exec coreexecutor.Executor,
129131
daClient da.Client,
130-
headerStore common.Broadcaster[*types.SignedHeader],
131-
dataStore common.Broadcaster[*types.Data],
132+
headerStore header.Store[*types.P2PSignedHeader],
133+
dataStore header.Store[*types.P2PData],
134+
headerDAHintAppender submitting.DAHintAppender,
135+
dataDAHintAppender submitting.DAHintAppender,
132136
logger zerolog.Logger,
133137
metrics *Metrics,
134138
blockOpts BlockOptions,
@@ -163,7 +167,7 @@ func NewSyncComponents(
163167
}
164168

165169
// Create submitter for sync nodes (no signer, only DA inclusion processing)
166-
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
170+
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerDAHintAppender, dataDAHintAppender)
167171
if config.Instrumentation.IsTracingEnabled() {
168172
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
169173
}
@@ -200,8 +204,8 @@ func NewAggregatorComponents(
200204
sequencer coresequencer.Sequencer,
201205
daClient da.Client,
202206
signer signer.Signer,
203-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
204-
dataBroadcaster common.Broadcaster[*types.Data],
207+
headerSyncService *sync.HeaderSyncService,
208+
dataSyncService *sync.DataSyncService,
205209
logger zerolog.Logger,
206210
metrics *Metrics,
207211
blockOpts BlockOptions,
@@ -229,8 +233,8 @@ func NewAggregatorComponents(
229233
metrics,
230234
config,
231235
genesis,
232-
headerBroadcaster,
233-
dataBroadcaster,
236+
headerSyncService,
237+
dataSyncService,
234238
logger,
235239
blockOpts,
236240
errorCh,
@@ -266,7 +270,7 @@ func NewAggregatorComponents(
266270
}, nil
267271
}
268272

269-
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
273+
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerSyncService, dataSyncService)
270274
if config.Instrumentation.IsTracingEnabled() {
271275
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
272276
}

block/components_test.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,17 @@ import (
2222
"github.com/evstack/ev-node/pkg/signer/noop"
2323
"github.com/evstack/ev-node/pkg/store"
2424
testmocks "github.com/evstack/ev-node/test/mocks"
25+
extmocks "github.com/evstack/ev-node/test/mocks/external"
26+
"github.com/evstack/ev-node/types"
2527
)
2628

29+
// noopDAHintAppender is a no-op implementation of DAHintAppender for testing
30+
type noopDAHintAppender struct{}
31+
32+
func (n noopDAHintAppender) AppendDAHint(ctx context.Context, daHeight uint64, heights ...uint64) error {
33+
return nil
34+
}
35+
2736
func TestBlockComponents_ExecutionClientFailure_StopsNode(t *testing.T) {
2837
// Test the error channel mechanism works as intended
2938

@@ -86,6 +95,14 @@ func TestNewSyncComponents_Creation(t *testing.T) {
8695
daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
8796
daClient.On("HasForcedInclusionNamespace").Return(false).Maybe()
8897

98+
// Create mock P2P stores
99+
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
100+
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
101+
102+
// Create noop DAHintAppenders for testing
103+
headerHintAppender := noopDAHintAppender{}
104+
dataHintAppender := noopDAHintAppender{}
105+
89106
// Just test that the constructor doesn't panic - don't start the components
90107
// to avoid P2P store dependencies
91108
components, err := NewSyncComponents(
@@ -94,8 +111,10 @@ func TestNewSyncComponents_Creation(t *testing.T) {
94111
memStore,
95112
mockExec,
96113
daClient,
97-
nil,
98-
nil,
114+
mockHeaderStore,
115+
mockDataStore,
116+
headerHintAppender,
117+
dataHintAppender,
99118
zerolog.Nop(),
100119
NopMetrics(),
101120
DefaultBlockOptions(),

block/internal/common/event.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,7 @@ type DAHeightEvent struct {
2020
DaHeight uint64
2121
// Source indicates where this event originated from (DA or P2P)
2222
Source EventSource
23+
24+
// Optional DA height hints from P2P. first is the DA height hint for the header, second is the DA height hint for the data
25+
DaHeightHints [2]uint64
2326
}

block/internal/common/expected_interfaces.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@ package common
33
import (
44
"context"
55

6+
"github.com/evstack/ev-node/types"
67
pubsub "github.com/libp2p/go-libp2p-pubsub"
78

89
"github.com/celestiaorg/go-header"
910
)
1011

12+
type (
13+
HeaderP2PBroadcaster = Broadcaster[*types.P2PSignedHeader]
14+
DataP2PBroadcaster = Broadcaster[*types.P2PData]
15+
)
16+
1117
// Broadcaster interface for P2P broadcasting
1218
type Broadcaster[H header.Header[H]] interface {
1319
WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error

block/internal/executing/executor.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ type Executor struct {
4242
metrics *common.Metrics
4343

4444
// Broadcasting
45-
headerBroadcaster common.Broadcaster[*types.SignedHeader]
46-
dataBroadcaster common.Broadcaster[*types.Data]
45+
headerBroadcaster common.HeaderP2PBroadcaster
46+
dataBroadcaster common.DataP2PBroadcaster
4747

4848
// Configuration
4949
config config.Config
@@ -90,8 +90,8 @@ func NewExecutor(
9090
metrics *common.Metrics,
9191
config config.Config,
9292
genesis genesis.Genesis,
93-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
94-
dataBroadcaster common.Broadcaster[*types.Data],
93+
headerBroadcaster common.HeaderP2PBroadcaster,
94+
dataBroadcaster common.DataP2PBroadcaster,
9595
logger zerolog.Logger,
9696
options common.BlockOptions,
9797
errorCh chan<- error,
@@ -547,9 +547,13 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
547547
e.setLastState(newState)
548548

549549
// broadcast header and data to P2P network
550-
g, broadcastCtx := errgroup.WithContext(ctx)
551-
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, header) })
552-
g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, data) })
550+
g, broadcastCtx := errgroup.WithContext(e.ctx)
551+
g.Go(func() error {
552+
return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PSignedHeader{SignedHeader: header})
553+
})
554+
g.Go(func() error {
555+
return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PData{Data: data})
556+
})
553557
if err := g.Wait(); err != nil {
554558
e.logger.Error().Err(err).Msg("failed to broadcast header and/data")
555559
// don't fail block production on broadcast error

0 commit comments

Comments
 (0)