Skip to content

Commit

Permalink
hotfix(submit): early catch and log for empty batch (#997)
Browse files Browse the repository at this point in the history
Co-authored-by: Omri <omritoptix@gmail.com>
  • Loading branch information
danwt and omritoptix committed Aug 13, 2024
1 parent 904138e commit b744805
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 3 deletions.
12 changes: 11 additions & 1 deletion block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (m *Manager) SubmitLoop(ctx context.Context,
bytesProduced chan int,
) (err error) {
return SubmitLoopInner(ctx,
m.logger,
bytesProduced,
m.Conf.MaxBatchSkew,
m.Conf.BatchSubmitMaxTime,
Expand All @@ -34,6 +35,7 @@ func (m *Manager) SubmitLoop(ctx context.Context,

// SubmitLoopInner is a unit testable impl of SubmitLoop
func SubmitLoopInner(ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of batches that submitter is allowed to have pending
maxBatchTime time.Duration, // max time to allow between batches
Expand Down Expand Up @@ -102,6 +104,9 @@ func SubmitLoopInner(ctx context.Context,
break
}
nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes))
if errors.Is(err, gerrc.ErrInternal) {
panic(fmt.Sprintf("create and submit batch: %v", err))
}
if err != nil {
return fmt.Errorf("create and submit batch: %w", err)
}
Expand Down Expand Up @@ -143,6 +148,7 @@ func (m *Manager) CreateAndSubmitBatch(maxSizeBytes uint64) (*types.Batch, error
// CreateBatch looks through the store for any unsubmitted blocks and commits and bundles them into a batch
// max size bytes is the maximum size of the serialized batch type
func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeightInclusive uint64) (*types.Batch, error) {
m.logger.Info("Creating batch", "start height", startHeight, "end height", endHeightInclusive)
batchSize := endHeightInclusive - startHeight + 1
batch := &types.Batch{
Blocks: make([]*types.Block, 0, batchSize),
Expand Down Expand Up @@ -176,13 +182,17 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
}
}

if batch.NumBlocks() == 0 {
return nil, fmt.Errorf("empty batch: %w", gerrc.ErrInternal)
}

return batch, nil
}

func (m *Manager) SubmitBatch(batch *types.Batch) error {
resultSubmitToDA := m.DAClient.SubmitBatch(batch)
if resultSubmitToDA.Code != da.StatusSuccess {
return fmt.Errorf("da client submit batch: %s", resultSubmitToDA.Message)
return fmt.Errorf("da client submit batch: %s: %w", resultSubmitToDA.Message, resultSubmitToDA.Error)
}
m.logger.Info("Submitted batch to DA.", "start height", batch.StartHeight(), "end height", batch.EndHeight())

Expand Down
1 change: 1 addition & 0 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func testSubmitLoopInner(

block.SubmitLoopInner(
ctx,
nil,
producedBytesC,
args.batchSkew,
args.maxTime,
Expand Down
16 changes: 14 additions & 2 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS

// TODO(srene): Split batch in multiple blobs if necessary if supported
height, commitment, err := c.submit(data)
if errors.Is(err, gerrc.ErrInternal) {
// no point retrying if it's because of our code being wrong
err = fmt.Errorf("submit: %w", err)
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: err,
},
}
}

if err != nil {
c.logger.Error("Submit blob.", "error", err)
types.RollappConsecutiveFailedDASubmission.Inc()
Expand Down Expand Up @@ -521,11 +533,11 @@ func (c *DataAvailabilityLayerClient) checkBatchAvailability(daMetaData *da.DASu
func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitment, error) {
blobs, commitments, err := c.blobsAndCommitments(daBlob)
if err != nil {
return 0, nil, fmt.Errorf("blobs and commitments: %w", err)
return 0, nil, fmt.Errorf("blobs and commitments: %w: %w", err, gerrc.ErrInternal)
}

if len(commitments) == 0 {
return 0, nil, fmt.Errorf("zero commitments: %w", gerrc.ErrNotFound)
return 0, nil, fmt.Errorf("zero commitments: %w: %w", gerrc.ErrNotFound, gerrc.ErrInternal)
}

blobSizes := make([]uint32, len(blobs))
Expand Down
14 changes: 14 additions & 0 deletions types/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package types

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestBatchSerialization(t *testing.T) {
batch := Batch{}
bz, err := batch.MarshalBinary()
require.Nil(t, err)
require.Empty(t, bz)
}

0 comments on commit b744805

Please sign in to comment.