Skip to content

Commit 14d7771

Browse files
committed
minor change
1 parent eb5966a commit 14d7771

File tree

1 file changed

+163
-40
lines changed

1 file changed

+163
-40
lines changed

block/retriever.go

Lines changed: 163 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,16 @@ type RetrievalResult struct {
4040
LastAttempt time.Time
4141
}
4242

43-
// ParallelRetriever manages parallel retrieval of blocks from DA layer with simplified design
44-
type ParallelRetriever struct {
43+
// RetryInfo tracks retry state persistently across attempts
44+
type RetryInfo struct {
45+
RetryCount int
46+
LastAttempt time.Time
47+
NextRetryTime time.Time
48+
IsHeightFromFuture bool
49+
}
50+
51+
// Retriever manages parallel retrieval of blocks from DA layer with simplified design
52+
type Retriever struct {
4553
manager *Manager
4654
prefetchWindow int
4755
ctx context.Context
@@ -59,13 +67,20 @@ type ParallelRetriever struct {
5967
inFlight map[uint64]*RetrievalResult
6068
inFlightMu sync.RWMutex
6169

70+
// Retry tracking - persistent across attempts
71+
retryInfo map[uint64]*RetryInfo
72+
retryInfoMu sync.RWMutex
73+
74+
// Processing synchronization - prevents concurrent processing races
75+
processingMu sync.Mutex
76+
6277
// Goroutine lifecycle
6378
dispatcher sync.WaitGroup
6479
processor sync.WaitGroup
6580
}
6681

67-
// NewParallelRetriever creates a new parallel retriever instance
68-
func NewParallelRetriever(manager *Manager, parentCtx context.Context) *ParallelRetriever {
82+
// NewRetriever creates a new parallel retriever instance
83+
func NewRetriever(manager *Manager, parentCtx context.Context) *Retriever {
6984
ctx, cancel := context.WithCancel(parentCtx)
7085

7186
// Use test override if set, otherwise use default
@@ -76,7 +91,7 @@ func NewParallelRetriever(manager *Manager, parentCtx context.Context) *Parallel
7691

7792
startHeight := manager.daHeight.Load()
7893

79-
return &ParallelRetriever{
94+
return &Retriever{
8095
manager: manager,
8196
prefetchWindow: prefetchWindow,
8297
ctx: ctx,
@@ -85,12 +100,13 @@ func NewParallelRetriever(manager *Manager, parentCtx context.Context) *Parallel
85100
scheduledUntil: startHeight - 1, // Will start scheduling from startHeight
86101
nextToProcess: startHeight,
87102
inFlight: make(map[uint64]*RetrievalResult),
103+
retryInfo: make(map[uint64]*RetryInfo),
88104
}
89105
}
90106

91107
// RetrieveLoop is responsible for interacting with DA layer using parallel retrieval.
92108
func (m *Manager) RetrieveLoop(ctx context.Context) {
93-
retriever := NewParallelRetriever(m, ctx)
109+
retriever := NewRetriever(m, ctx)
94110
defer retriever.Stop()
95111

96112
// Start the parallel retrieval process
@@ -101,7 +117,7 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
101117
}
102118

103119
// Start begins the parallel retrieval process
104-
func (pr *ParallelRetriever) Start() {
120+
func (pr *Retriever) Start() {
105121
// Start dispatcher goroutine to schedule heights
106122
pr.dispatcher.Add(1)
107123
go pr.dispatchHeights()
@@ -115,7 +131,7 @@ func (pr *ParallelRetriever) Start() {
115131
}
116132

117133
// Stop gracefully shuts down the parallel retriever
118-
func (pr *ParallelRetriever) Stop() {
134+
func (pr *Retriever) Stop() {
119135
// Cancel context to signal all goroutines to stop
120136
pr.cancel()
121137

@@ -125,7 +141,7 @@ func (pr *ParallelRetriever) Stop() {
125141
}
126142

127143
// dispatchHeights manages height scheduling based on processing state
128-
func (pr *ParallelRetriever) dispatchHeights() {
144+
func (pr *Retriever) dispatchHeights() {
129145
defer pr.dispatcher.Done()
130146
ticker := time.NewTicker(100 * time.Millisecond)
131147
defer ticker.Stop()
@@ -147,6 +163,41 @@ func (pr *ParallelRetriever) dispatchHeights() {
147163
targetHeight := nextToProcess + uint64(pr.prefetchWindow) - 1
148164
pr.mu.Unlock()
149165

166+
// First, check for heights that need to be retried
167+
now := time.Now()
168+
pr.retryInfoMu.RLock()
169+
var heightsToRetry []uint64
170+
for height, info := range pr.retryInfo {
171+
if height >= nextToProcess && height <= targetHeight && now.After(info.NextRetryTime) {
172+
// Check if not already in flight
173+
pr.inFlightMu.RLock()
174+
_, inFlight := pr.inFlight[height]
175+
pr.inFlightMu.RUnlock()
176+
177+
if !inFlight {
178+
heightsToRetry = append(heightsToRetry, height)
179+
}
180+
}
181+
}
182+
pr.retryInfoMu.RUnlock()
183+
184+
// Schedule retries first (they're more important)
185+
for _, height := range heightsToRetry {
186+
// Try to acquire concurrency permit
187+
if !pr.concurrencyLimit.TryAcquire(1) {
188+
// Concurrency limit reached, try again later
189+
break
190+
}
191+
192+
// Launch retrieval goroutine for this height
193+
go pr.retrieveHeight(height)
194+
195+
// Update pending jobs metric
196+
if pr.manager.metrics != nil {
197+
pr.manager.metrics.ParallelRetrievalPendingJobs.Add(1)
198+
}
199+
}
200+
150201
// Schedule any unscheduled heights within the window
151202
for height := scheduledUntil + 1; height <= targetHeight; height++ {
152203
// Check if already in flight
@@ -158,6 +209,15 @@ func (pr *ParallelRetriever) dispatchHeights() {
158209
continue
159210
}
160211

212+
// Check if it's in retry state (already handled above)
213+
pr.retryInfoMu.RLock()
214+
_, hasRetryInfo := pr.retryInfo[height]
215+
pr.retryInfoMu.RUnlock()
216+
217+
if hasRetryInfo {
218+
continue
219+
}
220+
161221
// Try to acquire concurrency permit
162222
if !pr.concurrencyLimit.TryAcquire(1) {
163223
// Concurrency limit reached, try again later
@@ -181,7 +241,7 @@ func (pr *ParallelRetriever) dispatchHeights() {
181241
}
182242

183243
// retrieveHeight retrieves data for a specific height with retry logic
184-
func (pr *ParallelRetriever) retrieveHeight(height uint64) {
244+
func (pr *Retriever) retrieveHeight(height uint64) {
185245
defer pr.concurrencyLimit.Release(1)
186246
defer func() {
187247
// Update pending jobs metric
@@ -190,9 +250,25 @@ func (pr *ParallelRetriever) retrieveHeight(height uint64) {
190250
}
191251
}()
192252

253+
// Get or create retry info for this height
254+
pr.retryInfoMu.Lock()
255+
info, exists := pr.retryInfo[height]
256+
if !exists {
257+
info = &RetryInfo{
258+
RetryCount: 0,
259+
LastAttempt: time.Now(),
260+
}
261+
pr.retryInfo[height] = info
262+
}
263+
pr.retryInfoMu.Unlock()
264+
193265
// Fetch the height with concurrent namespace calls
194266
result := pr.fetchHeightConcurrently(pr.ctx, height)
195267

268+
// Update result with persistent retry count
269+
result.RetryCount = info.RetryCount
270+
result.LastAttempt = time.Now()
271+
196272
// Store result in in-flight map
197273
pr.inFlightMu.Lock()
198274
pr.inFlight[height] = result
@@ -201,36 +277,49 @@ func (pr *ParallelRetriever) retrieveHeight(height uint64) {
201277
// Trigger processing check
202278
pr.checkAndProcessResults()
203279

204-
// If error (except height-from-future), schedule retry with backoff
205-
if result.Error != nil && !pr.manager.areAllErrorsHeightFromFuture(result.Error) {
206-
result.RetryCount++
207-
result.LastAttempt = time.Now()
208-
209-
// Calculate backoff duration
210-
backoff := time.Duration(result.RetryCount) * time.Second
211-
if backoff > maxRetryBackoff {
212-
backoff = maxRetryBackoff
280+
// Handle errors and schedule retries
281+
if result.Error != nil && pr.ctx.Err() == nil {
282+
isHeightFromFuture := pr.manager.areAllErrorsHeightFromFuture(result.Error)
283+
284+
// Update retry info
285+
pr.retryInfoMu.Lock()
286+
info.RetryCount++
287+
info.LastAttempt = time.Now()
288+
info.IsHeightFromFuture = isHeightFromFuture
289+
290+
if isHeightFromFuture {
291+
// For height-from-future, use a longer backoff
292+
backoff := 2 * time.Second
293+
info.NextRetryTime = time.Now().Add(backoff)
294+
} else if info.RetryCount < dAFetcherRetries {
295+
// For other errors, use exponential backoff
296+
backoff := time.Duration(info.RetryCount) * time.Second
297+
if backoff > maxRetryBackoff {
298+
backoff = maxRetryBackoff
299+
}
300+
info.NextRetryTime = time.Now().Add(backoff)
213301
}
302+
pr.retryInfoMu.Unlock()
214303

215-
// Schedule retry after backoff
216-
time.AfterFunc(backoff, func() {
217-
// Check if still needed and context not done
218-
pr.mu.Lock()
219-
stillNeeded := height >= pr.nextToProcess
220-
pr.mu.Unlock()
221-
222-
if stillNeeded && pr.ctx.Err() == nil {
223-
// Remove from in-flight to allow re-scheduling
304+
// Schedule retry removal from in-flight
305+
if isHeightFromFuture || info.RetryCount < dAFetcherRetries {
306+
// Remove from in-flight after a short delay to allow processing to complete
307+
time.AfterFunc(100*time.Millisecond, func() {
224308
pr.inFlightMu.Lock()
225309
delete(pr.inFlight, height)
226310
pr.inFlightMu.Unlock()
227-
}
228-
})
311+
})
312+
}
313+
} else if result.Error == nil {
314+
// Success - clean up retry info
315+
pr.retryInfoMu.Lock()
316+
delete(pr.retryInfo, height)
317+
pr.retryInfoMu.Unlock()
229318
}
230319
}
231320

232321
// processResults monitors for results to process in order
233-
func (pr *ParallelRetriever) processResults() {
322+
func (pr *Retriever) processResults() {
234323
defer pr.processor.Done()
235324

236325
ticker := time.NewTicker(50 * time.Millisecond)
@@ -247,7 +336,11 @@ func (pr *ParallelRetriever) processResults() {
247336
}
248337

249338
// checkAndProcessResults processes available results in order
250-
func (pr *ParallelRetriever) checkAndProcessResults() {
339+
func (pr *Retriever) checkAndProcessResults() {
340+
// Use processingMu to prevent concurrent execution races
341+
pr.processingMu.Lock()
342+
defer pr.processingMu.Unlock()
343+
251344
for {
252345
pr.mu.Lock()
253346
nextToProcess := pr.nextToProcess
@@ -263,28 +356,49 @@ func (pr *ParallelRetriever) checkAndProcessResults() {
263356
break
264357
}
265358

359+
// Get retry info for this height
360+
pr.retryInfoMu.RLock()
361+
retryInfo, hasRetryInfo := pr.retryInfo[nextToProcess]
362+
pr.retryInfoMu.RUnlock()
363+
266364
// Process based on result status
267365
shouldAdvance := false
366+
shouldRetry := false
268367

269368
if result.Error != nil && pr.ctx.Err() == nil {
369+
// Use persistent retry count if available
370+
retryCount := result.RetryCount
371+
if hasRetryInfo {
372+
retryCount = retryInfo.RetryCount
373+
}
374+
270375
// Check if it's a height-from-future error
271376
if pr.manager.areAllErrorsHeightFromFuture(result.Error) {
272-
// Don't advance for height-from-future errors
377+
// Don't advance for height-from-future errors, but do retry
273378
pr.manager.logger.Debug().
274379
Uint64("daHeight", result.Height).
380+
Int("retries", retryCount).
275381
Msg("height from future, will retry")
276-
} else if result.RetryCount >= dAFetcherRetries {
382+
shouldRetry = true
383+
} else if retryCount >= dAFetcherRetries {
277384
// Max retries reached, log error but advance
278385
pr.manager.logger.Error().
279386
Uint64("daHeight", result.Height).
280387
Str("errors", result.Error.Error()).
281-
Int("retries", result.RetryCount).
388+
Int("retries", retryCount).
282389
Msg("failed to retrieve data from DALC after max retries")
283390
shouldAdvance = true
391+
} else {
392+
// Still have retries left, will retry
393+
pr.manager.logger.Debug().
394+
Uint64("daHeight", result.Height).
395+
Int("retries", retryCount).
396+
Int("maxRetries", dAFetcherRetries).
397+
Msg("retrieval failed, will retry")
398+
shouldRetry = true
284399
}
285-
// Otherwise, let retry logic handle it
286-
} else {
287-
// Success or NotFound - process and advance
400+
} else if result.Error == nil {
401+
// Success - process and advance
288402
if len(result.Data) > 0 {
289403
pr.manager.processRetrievedData(pr.ctx, result.Data, result.Height)
290404
} else {
@@ -299,13 +413,22 @@ func (pr *ParallelRetriever) checkAndProcessResults() {
299413
delete(pr.inFlight, nextToProcess)
300414
pr.inFlightMu.Unlock()
301415

416+
// Clean up retry info
417+
pr.retryInfoMu.Lock()
418+
delete(pr.retryInfo, nextToProcess)
419+
pr.retryInfoMu.Unlock()
420+
302421
// Advance both pointers
303422
pr.mu.Lock()
304423
pr.nextToProcess++
305424
pr.mu.Unlock()
306425

307426
// Update manager's DA height
308427
pr.manager.daHeight.Store(nextToProcess + 1)
428+
} else if shouldRetry {
429+
// Can't advance yet, but ensure retry is scheduled
430+
// The retry scheduling is already handled in retrieveHeight
431+
break
309432
} else {
310433
// Can't advance yet, stop processing
311434
break
@@ -314,7 +437,7 @@ func (pr *ParallelRetriever) checkAndProcessResults() {
314437
}
315438

316439
// fetchHeightConcurrently retrieves blobs from a specific DA height using concurrent namespace calls
317-
func (pr *ParallelRetriever) fetchHeightConcurrently(ctx context.Context, height uint64) *RetrievalResult {
440+
func (pr *Retriever) fetchHeightConcurrently(ctx context.Context, height uint64) *RetrievalResult {
318441
start := time.Now()
319442
fetchCtx, cancel := context.WithTimeout(ctx, dAefetcherTimeout)
320443
defer cancel()
@@ -507,7 +630,7 @@ func (m *Manager) fetchBlobsConcurrently(ctx context.Context, daHeight uint64) (
507630
}
508631

509632
// updateMetrics periodically updates metrics for parallel retrieval
510-
func (pr *ParallelRetriever) updateMetrics() {
633+
func (pr *Retriever) updateMetrics() {
511634
ticker := time.NewTicker(5 * time.Second)
512635
defer ticker.Stop()
513636

0 commit comments

Comments
 (0)