Skip to content

Commit ea9c055

Browse files
committed
refactor: Combine cached and sync DA fetch results
1 parent 59b33b1 commit ea9c055

File tree

1 file changed

+41
-59
lines changed

1 file changed

+41
-59
lines changed

block/internal/da/forced_inclusion_retriever.go

Lines changed: 41 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -132,39 +132,59 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
132132
missingHeights = append(missingHeights, h)
133133
continue
134134
}
135-
if block == nil {
136-
// Cache miss
135+
if block == nil { // Cache miss
137136
missingHeights = append(missingHeights, h)
138-
} else {
139-
// Cache hit
137+
} else { // Cache hit
140138
cachedBlocks[h] = block
141-
r.logger.Debug().
142-
Uint64("height", h).
143-
Int("blob_count", len(block.Blobs)).
144-
Msg("using cached block from async fetcher")
145139
}
146140
}
147141

148-
// Fetch missing heights synchronously
142+
// Fetch missing heights synchronously and store in map
143+
syncFetchedBlocks := make(map[uint64]*BlockData)
149144
var processErrs error
150145
for _, h := range missingHeights {
151146
result := r.client.Retrieve(ctx, h, r.client.GetForcedInclusionNamespace())
152-
153147
if result.Code == datypes.StatusHeightFromFuture {
154148
r.logger.Debug().
155149
Uint64("height", h).
156150
Msg("height not yet available on DA - backoff required")
157151
return nil, fmt.Errorf("%w: height %d not yet available", datypes.ErrHeightFromFuture, h)
158152
}
159153

160-
err := r.processRetrieveResult(event, result, h)
161-
processErrs = errors.Join(processErrs, err)
154+
if result.Code == datypes.StatusNotFound {
155+
r.logger.Debug().Uint64("height", h).Msg("no forced inclusion blobs at height")
156+
continue
157+
}
158+
159+
if result.Code != datypes.StatusSuccess {
160+
err := fmt.Errorf("failed to retrieve forced inclusion blobs at height %d: %s", h, result.Message)
161+
processErrs = errors.Join(processErrs, err)
162+
continue
163+
}
164+
165+
// Store the sync-fetched block data
166+
syncFetchedBlocks[h] = &BlockData{
167+
Blobs: result.Data,
168+
Timestamp: result.Timestamp,
169+
}
162170
}
163171

164-
// Process cached blocks in order
172+
// Process all blocks in height order
165173
for _, h := range heights {
166-
if block, ok := cachedBlocks[h]; ok {
167-
// Add blobs from cached block
174+
var block *BlockData
175+
var source string
176+
177+
// Check cached blocks first, then sync-fetched
178+
if cachedBlock, ok := cachedBlocks[h]; ok {
179+
block = cachedBlock
180+
source = "cache"
181+
} else if syncBlock, ok := syncFetchedBlocks[h]; ok {
182+
block = syncBlock
183+
source = "sync"
184+
}
185+
186+
if block != nil {
187+
// Add blobs from block
168188
for _, blob := range block.Blobs {
169189
if len(blob) > 0 {
170190
event.Txs = append(event.Txs, blob)
@@ -179,8 +199,13 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
179199
r.logger.Debug().
180200
Uint64("height", h).
181201
Int("blob_count", len(block.Blobs)).
182-
Msg("added blobs from cached block")
202+
Str("source", source).
203+
Msg("added blobs from block")
183204
}
205+
206+
// Clean up maps to prevent unbounded memory growth
207+
delete(cachedBlocks, h)
208+
delete(syncFetchedBlocks, h)
184209
}
185210

186211
// any error during process, need to retry at next call
@@ -200,48 +225,5 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
200225
}, nil
201226
}
202227

203-
r.logger.Info().
204-
Uint64("da_height", daHeight).
205-
Uint64("epoch_start", epochStart).
206-
Uint64("epoch_end", epochEnd).
207-
Int("tx_count", len(event.Txs)).
208-
Int("cached_blocks", len(cachedBlocks)).
209-
Int("sync_fetched_blocks", len(missingHeights)).
210-
Msg("successfully retrieved forced inclusion epoch")
211-
212228
return event, nil
213229
}
214-
215-
// processRetrieveResult processes the result from a DA retrieve operation.
216-
func (r *ForcedInclusionRetriever) processRetrieveResult(
217-
event *ForcedInclusionEvent,
218-
result datypes.ResultRetrieve,
219-
height uint64,
220-
) error {
221-
if result.Code == datypes.StatusNotFound {
222-
r.logger.Debug().Uint64("height", height).Msg("no forced inclusion blobs at height")
223-
return nil
224-
}
225-
226-
if result.Code != datypes.StatusSuccess {
227-
return fmt.Errorf("failed to retrieve forced inclusion blobs at height %d: %s", height, result.Message)
228-
}
229-
230-
// Process each blob as a transaction
231-
for _, blob := range result.Data {
232-
if len(blob) > 0 {
233-
event.Txs = append(event.Txs, blob)
234-
}
235-
}
236-
237-
if result.Timestamp.After(event.Timestamp) {
238-
event.Timestamp = result.Timestamp
239-
}
240-
241-
r.logger.Debug().
242-
Uint64("height", height).
243-
Int("blob_count", len(result.Data)).
244-
Msg("processed forced inclusion blobs")
245-
246-
return nil
247-
}

0 commit comments

Comments
 (0)