Skip to content

Commit ab669ea

Browse files
committed
implement checkpoint for single sequencer
1 parent 21e3ead commit ab669ea

File tree

10 files changed

+525
-405
lines changed

10 files changed

+525
-405
lines changed

apps/evm/cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func createSequencer(
169169
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
170170
}
171171

172-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, da, datastore, genesis, logger)
172+
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
173173
if err != nil {
174174
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
175175
}

apps/grpc/cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func createSequencer(
131131
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
132132
}
133133

134-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, da, datastore, genesis, logger)
134+
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
135135
if err != nil {
136136
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
137137
}

apps/testapp/cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func createSequencer(
131131
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
132132
}
133133

134-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, da, datastore, genesis, logger)
134+
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
135135
if err != nil {
136136
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
137137
}

sequencers/based/sequencer.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ var _ coresequencer.Sequencer = (*BasedSequencer)(nil)
2929
// It uses DA as a queue and only persists a checkpoint of where it is in processing.
3030
type BasedSequencer struct {
3131
fiRetriever ForcedInclusionRetriever
32-
da coreda.DA
33-
genesis genesis.Genesis
3432
logger zerolog.Logger
3533

3634
daHeight atomic.Uint64
@@ -45,15 +43,12 @@ type BasedSequencer struct {
4543
func NewBasedSequencer(
4644
ctx context.Context,
4745
fiRetriever ForcedInclusionRetriever,
48-
da coreda.DA,
4946
db ds.Batching,
5047
genesis genesis.Genesis,
5148
logger zerolog.Logger,
5249
) (*BasedSequencer, error) {
5350
bs := &BasedSequencer{
5451
fiRetriever: fiRetriever,
55-
da: da,
56-
genesis: genesis,
5752
logger: logger.With().Str("component", "based_sequencer").Logger(),
5853
checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/based/checkpoint")),
5954
}
@@ -96,11 +91,15 @@ func (s *BasedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.S
9691
// It treats DA as a queue and only persists where it is in processing
9792
func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) {
9893
// If we have no cached transactions or we've consumed all from the current DA block,
99-
// fetch the next DA block
94+
// fetch the next DA epoch
95+
daHeight := s.GetDAHeight()
10096
if len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) {
101-
if err := s.fetchNextDABatch(ctx); err != nil {
97+
daEndHeight, err := s.fetchNextDAEpoch(ctx)
98+
if err != nil {
10299
return nil, err
103100
}
101+
102+
daHeight = daEndHeight
104103
}
105104

106105
// Create batch from current position up to MaxBytes
@@ -113,7 +112,7 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
113112

114113
// If we've consumed all transactions from this DA block, move to next
115114
if s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) {
116-
s.checkpoint.DAHeight++
115+
s.checkpoint.DAHeight = daHeight + 1
117116
s.checkpoint.TxIndex = 0
118117
s.currentBatchTxs = nil
119118

@@ -135,8 +134,8 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
135134
}, nil
136135
}
137136

138-
// fetchNextDABatch fetches transactions from the next DA block
139-
func (s *BasedSequencer) fetchNextDABatch(ctx context.Context) error {
137+
// fetchNextDAEpoch fetches transactions from the next DA epoch
138+
func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context) (uint64, error) {
140139
currentDAHeight := s.checkpoint.DAHeight
141140

142141
s.logger.Debug().
@@ -148,17 +147,17 @@ func (s *BasedSequencer) fetchNextDABatch(ctx context.Context) error {
148147
if err != nil {
149148
// Check if forced inclusion is not configured
150149
if errors.Is(err, block.ErrForceInclusionNotConfigured) {
151-
return errors.New("forced inclusion not configured")
150+
return currentDAHeight, block.ErrForceInclusionNotConfigured
152151
} else if errors.Is(err, coreda.ErrHeightFromFuture) {
153152
// If we get a height from future error, stay at current position
154153
// We'll retry the same height on the next call until DA produces that block
155154
s.logger.Debug().
156155
Uint64("da_height", currentDAHeight).
157156
Msg("DA height from future, waiting for DA to produce block")
158-
return nil
157+
return currentDAHeight, nil
159158
}
160159
s.logger.Error().Err(err).Uint64("da_height", currentDAHeight).Msg("failed to retrieve forced inclusion transactions")
161-
return err
160+
return currentDAHeight, err
162161
}
163162

164163
// Validate and filter transactions
@@ -184,18 +183,18 @@ func (s *BasedSequencer) fetchNextDABatch(ctx context.Context) error {
184183
Uint64("da_height_end", forcedTxsEvent.EndDaHeight).
185184
Msg("fetched forced inclusion transactions from DA")
186185

187-
// Cache the transactions for this DA block
186+
// Cache the transactions for this DA epoch
188187
s.currentBatchTxs = validTxs
189188

190189
// If we had a non-zero tx index, we're resuming from a crash mid-block
191190
// The transactions starting from that index are what we need
192191
if s.checkpoint.TxIndex > 0 {
193192
s.logger.Info().
194193
Uint64("tx_index", s.checkpoint.TxIndex).
195-
Msg("resuming from checkpoint within DA block")
194+
Msg("resuming from checkpoint within DA epoch")
196195
}
197196

198-
return nil
197+
return forcedTxsEvent.EndDaHeight, nil
199198
}
200199

201200
// createBatchFromCheckpoint creates a batch from the current checkpoint position respecting MaxBytes

sequencers/based/sequencer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func createTestSequencer(t *testing.T, mockDA *MockDA, cfg config.Config, gen ge
9090
// Create in-memory datastore
9191
db := syncds.MutexWrap(ds.NewMapDatastore())
9292

93-
seq, err := NewBasedSequencer(context.Background(), fiRetriever, mockDA, db, gen, zerolog.Nop())
93+
seq, err := NewBasedSequencer(context.Background(), fiRetriever, db, gen, zerolog.Nop())
9494
require.NoError(t, err)
9595
return seq
9696
}
@@ -223,7 +223,7 @@ func TestBasedSequencer_GetNextBatch_NotConfigured(t *testing.T) {
223223
// Create in-memory datastore
224224
db := syncds.MutexWrap(ds.NewMapDatastore())
225225

226-
seq, err := NewBasedSequencer(context.Background(), fiRetriever, mockDA, db, gen, zerolog.Nop())
226+
seq, err := NewBasedSequencer(context.Background(), fiRetriever, db, gen, zerolog.Nop())
227227
require.NoError(t, err)
228228

229229
req := coresequencer.GetNextBatchRequest{
@@ -580,7 +580,7 @@ func TestBasedSequencer_CheckpointPersistence(t *testing.T) {
580580
daClient := block.NewDAClient(mockDA, cfg, zerolog.Nop())
581581
fiRetriever := block.NewForcedInclusionRetriever(daClient, gen, zerolog.Nop())
582582

583-
seq1, err := NewBasedSequencer(context.Background(), fiRetriever, mockDA, db, gen, zerolog.Nop())
583+
seq1, err := NewBasedSequencer(context.Background(), fiRetriever, db, gen, zerolog.Nop())
584584
require.NoError(t, err)
585585

586586
req := coresequencer.GetNextBatchRequest{
@@ -595,7 +595,7 @@ func TestBasedSequencer_CheckpointPersistence(t *testing.T) {
595595
assert.Equal(t, 2, len(resp.Batch.Transactions))
596596

597597
// Create a new sequencer with the same datastore (simulating restart)
598-
seq2, err := NewBasedSequencer(context.Background(), fiRetriever, mockDA, db, gen, zerolog.Nop())
598+
seq2, err := NewBasedSequencer(context.Background(), fiRetriever, db, gen, zerolog.Nop())
599599
require.NoError(t, err)
600600

601601
// Checkpoint should be loaded from DB

sequencers/common/size_validation.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,5 @@ const (
1212
// This checks against the DA layer limit, not the per-batch limit.
1313
// Returns true if the blob is within the absolute size limit, false otherwise.
1414
func ValidateBlobSize(blob []byte) bool {
15-
return uint64(GetBlobSize(blob)) <= AbsoluteMaxBlobSize
16-
}
17-
18-
// WouldExceedCumulativeSize checks if adding a blob would exceed the cumulative size limit for a batch.
19-
// Returns true if adding the blob would exceed the limit, false otherwise.
20-
func WouldExceedCumulativeSize(currentSize int, blobSize int, maxBytes uint64) bool {
21-
return uint64(currentSize)+uint64(blobSize) > maxBytes
22-
}
23-
24-
// GetBlobSize returns the size of a blob in bytes.
25-
func GetBlobSize(blob []byte) int {
26-
return len(blob)
15+
return uint64(len(blob)) <= AbsoluteMaxBlobSize
2716
}

sequencers/common/size_validation_test.go

Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -47,95 +47,3 @@ func TestValidateBlobSize(t *testing.T) {
4747
})
4848
}
4949
}
50-
51-
func TestWouldExceedCumulativeSize(t *testing.T) {
52-
tests := []struct {
53-
name string
54-
currentSize int
55-
blobSize int
56-
maxBytes uint64
57-
want bool
58-
}{
59-
{
60-
name: "empty batch, small blob",
61-
currentSize: 0,
62-
blobSize: 50,
63-
maxBytes: 100,
64-
want: false,
65-
},
66-
{
67-
name: "would fit exactly",
68-
currentSize: 50,
69-
blobSize: 50,
70-
maxBytes: 100,
71-
want: false,
72-
},
73-
{
74-
name: "would exceed by one byte",
75-
currentSize: 50,
76-
blobSize: 51,
77-
maxBytes: 100,
78-
want: true,
79-
},
80-
{
81-
name: "far exceeds",
82-
currentSize: 80,
83-
blobSize: 100,
84-
maxBytes: 100,
85-
want: true,
86-
},
87-
{
88-
name: "zero max bytes",
89-
currentSize: 0,
90-
blobSize: 1,
91-
maxBytes: 0,
92-
want: true,
93-
},
94-
{
95-
name: "current already at limit",
96-
currentSize: 100,
97-
blobSize: 1,
98-
maxBytes: 100,
99-
want: true,
100-
},
101-
}
102-
103-
for _, tt := range tests {
104-
t.Run(tt.name, func(t *testing.T) {
105-
got := WouldExceedCumulativeSize(tt.currentSize, tt.blobSize, tt.maxBytes)
106-
assert.Equal(t, tt.want, got)
107-
})
108-
}
109-
}
110-
111-
func TestGetBlobSize(t *testing.T) {
112-
tests := []struct {
113-
name string
114-
blobSize int
115-
want int
116-
}{
117-
{
118-
name: "empty blob",
119-
blobSize: 0,
120-
want: 0,
121-
},
122-
{
123-
name: "small blob",
124-
blobSize: 42,
125-
want: 42,
126-
},
127-
{
128-
name: "large blob",
129-
blobSize: 1024 * 1024,
130-
want: 1024 * 1024,
131-
},
132-
}
133-
134-
for _, tt := range tests {
135-
t.Run(tt.name, func(t *testing.T) {
136-
blob := make([]byte, tt.blobSize)
137-
got := GetBlobSize(blob)
138-
assert.Equal(t, tt.want, got)
139-
})
140-
}
141-
}

0 commit comments

Comments
 (0)