Skip to content

Commit eba3376

Browse files
committed
fix: fix da height bumping and improve retriever
1 parent 32c089e commit eba3376

File tree

4 files changed

+54
-59
lines changed

4 files changed

+54
-59
lines changed

block/internal/da/forced_inclusion_retriever.go

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
5353

5454
epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.genesis.DAStartHeight, r.daEpochSize)
5555

56-
if daHeight != epochStart {
56+
if daHeight != epochEnd {
5757
r.logger.Debug().
5858
Uint64("da_height", daHeight).
59-
Uint64("epoch_start", epochStart).
60-
Msg("not at epoch start - returning empty transactions")
59+
Uint64("epoch_end", epochEnd).
60+
Msg("not at epoch end - returning empty transactions")
6161

6262
return &ForcedInclusionEvent{
6363
StartDaHeight: daHeight,
@@ -68,16 +68,10 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
6868

6969
event := &ForcedInclusionEvent{
7070
StartDaHeight: epochStart,
71+
EndDaHeight: epochEnd,
7172
Txs: [][]byte{},
7273
}
7374

74-
r.logger.Debug().
75-
Uint64("da_height", daHeight).
76-
Uint64("epoch_start", epochStart).
77-
Uint64("epoch_end", epochEnd).
78-
Uint64("epoch_num", currentEpochNumber).
79-
Msg("retrieving forced included transactions from DA")
80-
8175
epochEndResult := r.client.RetrieveForcedInclusion(ctx, epochEnd)
8276
if epochEndResult.Code == coreda.StatusHeightFromFuture {
8377
r.logger.Debug().
@@ -97,58 +91,59 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
9791
}
9892
}
9993

100-
lastProcessedHeight := epochStart
94+
r.logger.Debug().
95+
Uint64("da_height", daHeight).
96+
Uint64("epoch_start", epochStart).
97+
Uint64("epoch_end", epochEnd).
98+
Uint64("epoch_num", currentEpochNumber).
99+
Msg("retrieving forced included transactions from DA")
101100

102-
if err := r.processForcedInclusionBlobs(event, &lastProcessedHeight, epochStartResult, epochStart); err != nil {
103-
return nil, err
104-
}
101+
var processErrs error
102+
err := r.processForcedInclusionBlobs(event, epochStartResult, epochStart)
103+
processErrs = errors.Join(processErrs, err)
105104

106105
// Process heights between start and end (exclusive)
107106
for epochHeight := epochStart + 1; epochHeight < epochEnd; epochHeight++ {
108107
result := r.client.RetrieveForcedInclusion(ctx, epochHeight)
109108

110-
// If any intermediate height is from future, break early
111-
if result.Code == coreda.StatusHeightFromFuture {
112-
r.logger.Debug().
113-
Uint64("epoch_height", epochHeight).
114-
Uint64("last_processed", lastProcessedHeight).
115-
Msg("reached future DA height within epoch - stopping")
116-
break
117-
}
118-
119-
if err := r.processForcedInclusionBlobs(event, &lastProcessedHeight, result, epochHeight); err != nil {
120-
return nil, err
121-
}
109+
err = r.processForcedInclusionBlobs(event, result, epochHeight)
110+
processErrs = errors.Join(processErrs, err)
122111
}
123112

124113
// Process epoch end (only if different from start)
125114
if epochEnd != epochStart {
126-
if err := r.processForcedInclusionBlobs(event, &lastProcessedHeight, epochEndResult, epochEnd); err != nil {
127-
return nil, err
128-
}
115+
err = r.processForcedInclusionBlobs(event, epochEndResult, epochEnd)
116+
processErrs = errors.Join(processErrs, err)
129117
}
130118

131-
event.EndDaHeight = lastProcessedHeight
119+
// any error during process, need to retry at next call
120+
if processErrs != nil {
121+
r.logger.Warn().
122+
Uint64("da_height", daHeight).
123+
Uint64("epoch_start", epochStart).
124+
Uint64("epoch_end", epochEnd).
125+
Uint64("epoch_num", currentEpochNumber).
126+
Err(processErrs).
127+
Msg("Failed to retrieve DA epoch.. retrying next iteration")
132128

133-
r.logger.Info().
134-
Uint64("epoch_start", epochStart).
135-
Uint64("epoch_end", lastProcessedHeight).
136-
Int("tx_count", len(event.Txs)).
137-
Msg("retrieved forced inclusion transactions")
129+
return &ForcedInclusionEvent{
130+
StartDaHeight: daHeight,
131+
EndDaHeight: daHeight,
132+
Txs: [][]byte{},
133+
}, nil
134+
}
138135

139136
return event, nil
140137
}
141138

142139
// processForcedInclusionBlobs processes blobs from a single DA height for forced inclusion.
143140
func (r *ForcedInclusionRetriever) processForcedInclusionBlobs(
144141
event *ForcedInclusionEvent,
145-
lastProcessedHeight *uint64,
146142
result coreda.ResultRetrieve,
147143
height uint64,
148144
) error {
149145
if result.Code == coreda.StatusNotFound {
150146
r.logger.Debug().Uint64("height", height).Msg("no forced inclusion blobs at height")
151-
*lastProcessedHeight = height
152147
return nil
153148
}
154149

@@ -163,8 +158,6 @@ func (r *ForcedInclusionRetriever) processForcedInclusionBlobs(
163158
}
164159
}
165160

166-
*lastProcessedHeight = height
167-
168161
r.logger.Debug().
169162
Uint64("height", height).
170163
Int("blob_count", len(result.Data)).

sequencers/based/sequencer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (s *BasedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.S
6767
// GetNextBatch retrieves the next batch of transactions from the DA layer
6868
// It fetches forced inclusion transactions and returns them as the next batch
6969
func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) {
70-
currentDAHeight := s.daHeight.Load()
70+
currentDAHeight := s.GetDAHeight()
7171

7272
s.logger.Debug().Uint64("da_height", currentDAHeight).Msg("fetching forced inclusion transactions from DA")
7373

@@ -86,13 +86,13 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
8686
s.logger.Error().Err(err).Uint64("da_height", currentDAHeight).Msg("failed to retrieve forced inclusion transactions")
8787
return nil, err
8888
}
89-
}
90-
91-
// Update DA height based on the retrieved event
92-
if forcedTxsEvent.EndDaHeight > currentDAHeight {
93-
s.SetDAHeight(forcedTxsEvent.EndDaHeight)
94-
} else if forcedTxsEvent.StartDaHeight > currentDAHeight {
95-
s.SetDAHeight(forcedTxsEvent.StartDaHeight)
89+
} else {
90+
// Update DA height.
91+
// If we are in between epochs, we still need to bump the da height.
92+
// At the end of an epoch, we need to bump to go to the next epoch.
93+
if forcedTxsEvent.EndDaHeight >= currentDAHeight {
94+
s.SetDAHeight(forcedTxsEvent.EndDaHeight + 1)
95+
}
9696
}
9797

9898
// Add forced inclusion transactions to the queue with validation

sequencers/common/size_validation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const (
1212
// This checks against the DA layer limit, not the per-batch limit.
1313
// Returns true if the blob is within the absolute size limit, false otherwise.
1414
func ValidateBlobSize(blob []byte) bool {
15-
return uint64(len(blob)) <= AbsoluteMaxBlobSize
15+
return uint64(GetBlobSize(blob)) <= AbsoluteMaxBlobSize
1616
}
1717

1818
// WouldExceedCumulativeSize checks if adding a blob would exceed the cumulative size limit for a batch.

sequencers/single/sequencer.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
130130
return nil, ErrInvalidId
131131
}
132132

133-
currentDAHeight := c.daHeight.Load()
133+
currentDAHeight := c.GetDAHeight()
134134

135-
forcedEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight)
135+
forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight)
136136
if err != nil {
137137
if errors.Is(err, coreda.ErrHeightFromFuture) {
138138
c.logger.Debug().
@@ -143,25 +143,27 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
143143
}
144144

145145
// Still create an empty forced inclusion event
146-
forcedEvent = &block.ForcedInclusionEvent{
146+
forcedTxsEvent = &block.ForcedInclusionEvent{
147147
Txs: [][]byte{},
148148
StartDaHeight: currentDAHeight,
149149
EndDaHeight: currentDAHeight,
150150
}
151+
} else {
152+
// Update DA height.
153+
// If we are in between epochs, we still need to bump the da height.
154+
// At the end of an epoch, we need to bump to go to the next epoch.
155+
if forcedTxsEvent.EndDaHeight >= currentDAHeight {
156+
c.SetDAHeight(forcedTxsEvent.EndDaHeight + 1)
157+
}
151158
}
152159

153160
// Always try to process forced inclusion transactions (including pending from previous epochs)
154-
forcedTxs := c.processForcedInclusionTxs(forcedEvent, req.MaxBytes)
155-
if forcedEvent.EndDaHeight > currentDAHeight {
156-
c.SetDAHeight(forcedEvent.EndDaHeight)
157-
} else if forcedEvent.StartDaHeight > currentDAHeight {
158-
c.SetDAHeight(forcedEvent.StartDaHeight)
159-
}
161+
forcedTxs := c.processForcedInclusionTxs(forcedTxsEvent, req.MaxBytes)
160162

161163
c.logger.Debug().
162164
Int("tx_count", len(forcedTxs)).
163-
Uint64("da_height_start", forcedEvent.StartDaHeight).
164-
Uint64("da_height_end", forcedEvent.EndDaHeight).
165+
Uint64("da_height_start", forcedTxsEvent.StartDaHeight).
166+
Uint64("da_height_end", forcedTxsEvent.EndDaHeight).
165167
Msg("retrieved forced inclusion transactions from DA")
166168

167169
// Calculate size used by forced inclusion transactions

0 commit comments

Comments
 (0)