Skip to content

Commit a7ca70c

Browse files
committed
implement batching and avoid recalculating commmitment calculations when the data is already present
1 parent e30cb11 commit a7ca70c

File tree

5 files changed

+349
-22
lines changed

5 files changed

+349
-22
lines changed

block/internal/da/client.go

Lines changed: 125 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import (
1515
datypes "github.com/evstack/ev-node/pkg/da/types"
1616
)
1717

18+
// DefaultRetrieveBatchSize is the default number of blob IDs to fetch per batch.
19+
const DefaultRetrieveBatchSize = 150
20+
1821
// Config contains configuration for the blob DA client.
1922
type Config struct {
2023
Client *blobrpc.Client
@@ -24,6 +27,7 @@ type Config struct {
2427
DataNamespace string
2528
ForcedInclusionNamespace string
2629
MaxBlobSize uint64
30+
RetrieveBatchSize int
2731
}
2832

2933
// client wraps the blob RPC with namespace handling and error mapping.
@@ -37,6 +41,7 @@ type client struct {
3741
forcedNamespaceBz []byte
3842
hasForcedNamespace bool
3943
maxBlobSize uint64
44+
batchSize int
4045
}
4146

4247
// Ensure client implements the block DA client interface.
@@ -53,6 +58,9 @@ func NewClient(cfg Config) Client {
5358
if cfg.MaxBlobSize == 0 {
5459
cfg.MaxBlobSize = blobrpc.DefaultMaxBlobSize
5560
}
61+
if cfg.RetrieveBatchSize <= 0 {
62+
cfg.RetrieveBatchSize = DefaultRetrieveBatchSize
63+
}
5664

5765
hasForcedNamespace := cfg.ForcedInclusionNamespace != ""
5866
var forcedNamespaceBz []byte
@@ -69,6 +77,7 @@ func NewClient(cfg Config) Client {
6977
forcedNamespaceBz: forcedNamespaceBz,
7078
hasForcedNamespace: hasForcedNamespace,
7179
maxBlobSize: cfg.MaxBlobSize,
80+
batchSize: cfg.RetrieveBatchSize,
7281
}
7382
}
7483

@@ -310,26 +319,58 @@ func (c *client) HasForcedInclusionNamespace() bool {
310319
}
311320

312321
// Get implements a minimal DA surface used by visualization: fetch blobs by IDs.
322+
// IDs are fetched in batches to avoid timeout issues with large ID sets.
323+
// Each batch gets its own timeout context to prevent cumulative timeout exhaustion.
313324
func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
314325
if len(ids) == 0 {
315326
return nil, nil
316327
}
317-
getCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
318-
defer cancel()
319328

329+
ns, err := share.NewNamespaceFromBytes(namespace)
330+
if err != nil {
331+
return nil, fmt.Errorf("invalid namespace: %w", err)
332+
}
333+
334+
res := make([]datypes.Blob, 0, len(ids))
335+
336+
// Process IDs in batches to avoid timeout issues with large ID sets.
337+
for i := 0; i < len(ids); i += c.batchSize {
338+
// Check if parent context was cancelled between batches.
339+
if err := ctx.Err(); err != nil {
340+
return nil, err
341+
}
342+
343+
end := i + c.batchSize
344+
if end > len(ids) {
345+
end = len(ids)
346+
}
347+
batch := ids[i:end]
348+
349+
// Each batch gets its own timeout to prevent cumulative timeout exhaustion.
350+
batchCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
351+
batchRes, err := c.getBatch(batchCtx, batch, ns)
352+
cancel()
353+
354+
if err != nil {
355+
return nil, err
356+
}
357+
res = append(res, batchRes...)
358+
}
359+
360+
return res, nil
361+
}
362+
363+
// getBatch fetches a single batch of blobs by their IDs.
364+
func (c *client) getBatch(ctx context.Context, ids []datypes.ID, ns share.Namespace) ([]datypes.Blob, error) {
320365
res := make([]datypes.Blob, 0, len(ids))
366+
321367
for _, id := range ids {
322368
height, commitment := blobrpc.SplitID(id)
323369
if commitment == nil {
324370
return nil, fmt.Errorf("invalid blob id: %x", id)
325371
}
326372

327-
ns, err := share.NewNamespaceFromBytes(namespace)
328-
if err != nil {
329-
return nil, fmt.Errorf("invalid namespace: %w", err)
330-
}
331-
332-
b, err := c.blobAPI.Get(getCtx, height, ns, commitment)
373+
b, err := c.blobAPI.Get(ctx, height, ns, commitment)
333374
if err != nil {
334375
return nil, err
335376
}
@@ -343,27 +384,59 @@ func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([
343384
}
344385

345386
// GetProofs returns inclusion proofs for the provided IDs.
387+
// IDs are processed in batches to avoid timeout issues with large ID sets.
346388
func (c *client) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) {
347389
if len(ids) == 0 {
348390
return []datypes.Proof{}, nil
349391
}
350392

351-
getCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
352-
defer cancel()
353-
354393
ns, err := share.NewNamespaceFromBytes(namespace)
355394
if err != nil {
356395
return nil, fmt.Errorf("invalid namespace: %w", err)
357396
}
358397

359398
proofs := make([]datypes.Proof, len(ids))
399+
400+
// Process IDs in batches to avoid timeout issues with large ID sets.
401+
for i := 0; i < len(ids); i += c.batchSize {
402+
// Check if parent context was cancelled between batches.
403+
if err := ctx.Err(); err != nil {
404+
return nil, err
405+
}
406+
407+
end := i + c.batchSize
408+
if end > len(ids) {
409+
end = len(ids)
410+
}
411+
batch := ids[i:end]
412+
413+
// Each batch gets its own timeout to prevent cumulative timeout exhaustion.
414+
batchCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
415+
batchProofs, err := c.getProofsBatch(batchCtx, batch, ns)
416+
cancel()
417+
418+
if err != nil {
419+
return nil, err
420+
}
421+
422+
// Copy batch results to the correct position in the output slice.
423+
copy(proofs[i:end], batchProofs)
424+
}
425+
426+
return proofs, nil
427+
}
428+
429+
// getProofsBatch fetches proofs for a single batch of IDs.
430+
func (c *client) getProofsBatch(ctx context.Context, ids []datypes.ID, ns share.Namespace) ([]datypes.Proof, error) {
431+
proofs := make([]datypes.Proof, len(ids))
432+
360433
for i, id := range ids {
361434
height, commitment := blobrpc.SplitID(id)
362435
if commitment == nil {
363436
return nil, fmt.Errorf("invalid blob id: %x", id)
364437
}
365438

366-
proof, err := c.blobAPI.GetProof(getCtx, height, ns, commitment)
439+
proof, err := c.blobAPI.GetProof(ctx, height, ns, commitment)
367440
if err != nil {
368441
return nil, err
369442
}
@@ -379,20 +452,57 @@ func (c *client) GetProofs(ctx context.Context, ids []datypes.ID, namespace []by
379452
}
380453

381454
// Validate mirrors the deprecated DA server logic: it unmarshals proofs and calls Included.
455+
// IDs and proofs are processed in batches to avoid timeout issues with large sets.
382456
func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) {
383457
if len(ids) != len(proofs) {
384458
return nil, errors.New("number of IDs and proofs must match")
385459
}
386460

387-
validateCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
388-
defer cancel()
461+
if len(ids) == 0 {
462+
return []bool{}, nil
463+
}
389464

390465
ns, err := share.NewNamespaceFromBytes(namespace)
391466
if err != nil {
392467
return nil, fmt.Errorf("invalid namespace: %w", err)
393468
}
394469

395470
results := make([]bool, len(ids))
471+
472+
// Process IDs in batches to avoid timeout issues with large ID sets.
473+
for i := 0; i < len(ids); i += c.batchSize {
474+
// Check if parent context was cancelled between batches.
475+
if err := ctx.Err(); err != nil {
476+
return nil, err
477+
}
478+
479+
end := i + c.batchSize
480+
if end > len(ids) {
481+
end = len(ids)
482+
}
483+
batchIDs := ids[i:end]
484+
batchProofs := proofs[i:end]
485+
486+
// Each batch gets its own timeout to prevent cumulative timeout exhaustion.
487+
batchCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
488+
batchResults, err := c.validateBatch(batchCtx, batchIDs, batchProofs, ns)
489+
cancel()
490+
491+
if err != nil {
492+
return nil, err
493+
}
494+
495+
// Copy batch results to the correct position in the output slice.
496+
copy(results[i:end], batchResults)
497+
}
498+
499+
return results, nil
500+
}
501+
502+
// validateBatch validates a single batch of IDs and proofs.
503+
func (c *client) validateBatch(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, ns share.Namespace) ([]bool, error) {
504+
results := make([]bool, len(ids))
505+
396506
for i, id := range ids {
397507
var proof blobrpc.Proof
398508
if err := json.Unmarshal(proofs[i], &proof); err != nil {
@@ -404,7 +514,7 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype
404514
return nil, fmt.Errorf("invalid blob id: %x", id)
405515
}
406516

407-
included, err := c.blobAPI.Included(validateCtx, height, ns, &proof, commitment)
517+
included, err := c.blobAPI.Included(ctx, height, ns, &proof, commitment)
408518
if err != nil {
409519
c.logger.Debug().Err(err).Uint64("height", height).Msg("blob inclusion check returned error")
410520
}

0 commit comments

Comments
 (0)