From c694d41ae37ddd4b2e42a9b51417e8093970cb38 Mon Sep 17 00:00:00 2001 From: Ian Shim <100327837+ian-shim@users.noreply.github.com> Date: Tue, 12 Nov 2024 18:26:15 -0800 Subject: [PATCH] More minibatch clean up (#883) --- disperser/batcher/grpc/dispatcher.go | 133 ------------------ disperser/batcher/grpc/dispatcher_test.go | 59 -------- disperser/disperser.go | 4 - disperser/mock/dispatcher.go | 25 ---- node/node.go | 55 -------- node/store.go | 151 +-------------------- node/store_test.go | 157 ---------------------- 7 files changed, 2 insertions(+), 582 deletions(-) delete mode 100644 disperser/batcher/grpc/dispatcher_test.go diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 495dee377a..4de5bbd0bb 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -3,7 +3,6 @@ package dispatcher import ( "context" "errors" - "fmt" "time" commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" @@ -148,138 +147,6 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.EncodedBlobMe sig := &core.Signature{G1Point: point} return sig, nil } - -// SendBlobsToOperator sends blobs to an operator via the node's StoreBlobs endpoint -// It returns the signatures of the blobs sent to the operator in the same order as the blobs -// with nil values for blobs that were not attested by the operator -func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { - // TODO Add secure Grpc - - conn, err := grpc.Dial( - core.OperatorSocket(op.Socket).GetDispersalSocket(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err) - return nil, err - } - defer conn.Close() - - gc := node.NewDispersalClient(conn) - ctx, cancel := context.WithTimeout(ctx, c.Timeout) - defer cancel() - start := time.Now() - request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding) - if err != nil { - return nil, err - } - c.logger.Debug("sending chunks to operator", "operator", op.Socket, "num blobs", len(blobs), "size", totalSize, "request message size", proto.Size(request), "request serialization time", time.Since(start), "use Gnark chunk encoding", c.EnableGnarkBundleEncoding) - opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024) - reply, err := gc.StoreBlobs(ctx, request, opt) - - if err != nil { - return nil, err - } - - signaturesInBytes := reply.GetSignatures() - signatures := make([]*core.Signature, 0, len(signaturesInBytes)) - for _, sigBytes := range signaturesInBytes { - sig := sigBytes.GetValue() - if sig != nil { - point, err := new(core.Signature).Deserialize(sig) - if err != nil { - return nil, err - } - signatures = append(signatures, &core.Signature{G1Point: point}) - } else { - signatures = append(signatures, nil) - } - } - return signatures, nil -} - -func (c *dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) { - batchHeaderHash, err := batchHeader.GetBatchHeaderHash() - if err != nil { - return nil, err - } - responseChan := make(chan core.SigningMessage, len(state.IndexedOperators)) - - for id, op := range state.IndexedOperators { - go func(op core.IndexedOperatorInfo, id core.OperatorID) { - conn, err := grpc.Dial( - core.OperatorSocket(op.Socket).GetDispersalSocket(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - c.logger.Error("disperser cannot connect to operator dispersal socket", "socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err) - return - } - defer conn.Close() - - nodeClient := node.NewDispersalClient(conn) - - requestedAt := time.Now() - sig, err := c.SendAttestBatchRequest(ctx, nodeClient, blobHeaderHashes, batchHeader, &op) - latencyMs := float64(time.Since(requestedAt).Milliseconds()) - if err != nil { - responseChan <- core.SigningMessage{ - Err: err, - Signature: nil, - Operator: id, - BatchHeaderHash: batchHeaderHash, - AttestationLatencyMs: latencyMs, - } - c.metrics.ObserveLatency(id.Hex(), false, latencyMs) - } else { - responseChan <- core.SigningMessage{ - Signature: sig, - Operator: id, - BatchHeaderHash: batchHeaderHash, - AttestationLatencyMs: latencyMs, - Err: nil, - } - c.metrics.ObserveLatency(id.Hex(), true, latencyMs) - } - }(core.IndexedOperatorInfo{ - PubkeyG1: op.PubkeyG1, - PubkeyG2: op.PubkeyG2, - Socket: op.Socket, - }, id) - } - - return responseChan, nil -} - -func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { - ctx, cancel := context.WithTimeout(ctx, c.Timeout) - defer cancel() - // start := time.Now() - hashes := make([][]byte, len(blobHeaderHashes)) - for i, hash := range blobHeaderHashes { - hashes[i] = hash[:] - } - - request := &node.AttestBatchRequest{ - BatchHeader: getBatchHeaderMessage(batchHeader), - BlobHeaderHashes: hashes, - } - - c.logger.Debug("sending AttestBatch request to operator", "operator", op.Socket, "numBlobs", len(blobHeaderHashes), "requestMessageSize", proto.Size(request), "referenceBlockNumber", batchHeader.ReferenceBlockNumber) - opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024) - reply, err := nodeDispersalClient.AttestBatch(ctx, request, opt) - if err != nil { - return nil, fmt.Errorf("failed to send AttestBatch request to operator %s: %w", core.OperatorSocket(op.Socket).GetDispersalSocket(), err) - } - - sigBytes := reply.GetSignature() - point, err := new(core.Signature).Deserialize(sigBytes) - if err != nil { - return nil, fmt.Errorf("failed to deserialize signature: %w", err) - } - return &core.Signature{G1Point: point}, nil -} - func GetStoreChunksRequest(blobMessages []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) diff --git a/disperser/batcher/grpc/dispatcher_test.go b/disperser/batcher/grpc/dispatcher_test.go deleted file mode 100644 index 0a6ea539f2..0000000000 --- a/disperser/batcher/grpc/dispatcher_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package dispatcher_test - -import ( - "context" - "math/big" - "testing" - "time" - - grpcMock "github.com/Layr-Labs/eigenda/api/grpc/mock" - "github.com/Layr-Labs/eigenda/api/grpc/node" - "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/disperser" - "github.com/Layr-Labs/eigenda/disperser/batcher" - dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc" - "github.com/consensys/gnark-crypto/ecc/bn254" - "github.com/consensys/gnark-crypto/ecc/bn254/fp" - "github.com/stretchr/testify/assert" -) - -func newDispatcher(t *testing.T, config *dispatcher.Config) disperser.Dispatcher { - loggerConfig := common.DefaultLoggerConfig() - logger, err := common.NewLogger(loggerConfig) - assert.NoError(t, err) - metrics := batcher.NewMetrics("9091", logger) - return dispatcher.NewDispatcher(config, logger, metrics.DispatcherMetrics) -} - -func TestSendAttestBatchRequest(t *testing.T) { - dispatcher := newDispatcher(t, &dispatcher.Config{ - Timeout: 5 * time.Second, - }) - nodeClient := grpcMock.NewMockDispersalClient() - var X, Y fp.Element - X = *X.SetBigInt(big.NewInt(1)) - Y = *Y.SetBigInt(big.NewInt(2)) - signature := &core.Signature{ - G1Point: &core.G1Point{ - G1Affine: &bn254.G1Affine{ - X: X, - Y: Y, - }, - }, - } - sigBytes := signature.Bytes() - nodeClient.On("AttestBatch").Return(&node.AttestBatchReply{ - Signature: sigBytes[:], - }, nil) - sigReply, err := dispatcher.SendAttestBatchRequest(context.Background(), nodeClient, [][32]byte{{1}}, &core.BatchHeader{ - ReferenceBlockNumber: 10, - BatchRoot: [32]byte{1}, - }, &core.IndexedOperatorInfo{ - PubkeyG1: nil, - PubkeyG2: nil, - Socket: "localhost:8080", - }) - assert.NoError(t, err) - assert.Equal(t, signature, sigReply) -} diff --git a/disperser/disperser.go b/disperser/disperser.go index 0d939e7640..e093e9f930 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -13,7 +13,6 @@ import ( "github.com/Layr-Labs/eigenda/encoding" disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/api/grpc/node" gcommon "github.com/ethereum/go-ethereum/common" ) @@ -219,9 +218,6 @@ type BlobStore interface { type Dispatcher interface { DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage - SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) - AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) - SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) } // GenerateReverseIndexKey returns the key used to store the blob key in the reverse index diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index e17dbd0d15..59d13f686e 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -4,7 +4,6 @@ import ( "context" "errors" - "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" @@ -65,27 +64,3 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera return update } - -func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { - args := d.Called(ctx, blobs, batchHeader, op) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).([]*core.Signature), args.Error(1) -} - -func (d *Dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) { - args := d.Called(ctx, state, blobHeaderHashes, batchHeader) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(chan core.SigningMessage), args.Error(1) -} - -func (d *Dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { - args := d.Called(ctx, nodeDispersalClient, blobHeaderHashes, batchHeader, op) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(*core.Signature), args.Error(1) -} diff --git a/node/node.go b/node/node.go index c657d791bf..4d9690632b 100644 --- a/node/node.go +++ b/node/node.go @@ -12,7 +12,6 @@ import ( "net/http" "net/url" "os" - "reflect" "strings" "sync" "time" @@ -22,9 +21,6 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/prometheus/client_golang/prometheus" - "github.com/wealdtech/go-merkletree/v2" - "github.com/wealdtech/go-merkletree/v2/keccak256" - "google.golang.org/protobuf/proto" "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/common/geth" @@ -418,57 +414,6 @@ func (n *Node) ValidateBatch(ctx context.Context, header *core.BatchHeader, blob return nil } -// ValidateBlobHeadersRoot validates the blob headers root hash -// by comparing it with the merkle tree root hash of the blob headers. -// It also checks if all blob headers have the same reference block number -func (n *Node) ValidateBatchContents(ctx context.Context, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) error { - leafs := make([][]byte, 0) - for _, blobHeaderHash := range blobHeaderHashes { - blobHeaderBytes, err := n.Store.GetBlobHeaderByHeaderHash(ctx, blobHeaderHash) - if err != nil { - return fmt.Errorf("failed to get blob header by hash: %w", err) - } - if blobHeaderBytes == nil { - return fmt.Errorf("blob header not found for hash %x", blobHeaderHash) - } - - var protoBlobHeader node.BlobHeader - err = proto.Unmarshal(blobHeaderBytes, &protoBlobHeader) - if err != nil { - return fmt.Errorf("failed to unmarshal blob header: %w", err) - } - if uint32(batchHeader.ReferenceBlockNumber) != protoBlobHeader.GetReferenceBlockNumber() { - return errors.New("blob headers have different reference block numbers") - } - - blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader) - if err != nil { - return fmt.Errorf("failed to get blob header from proto: %w", err) - } - - blobHeaderHash, err := blobHeader.GetBlobHeaderHash() - if err != nil { - return fmt.Errorf("failed to get blob header hash: %w", err) - } - leafs = append(leafs, blobHeaderHash[:]) - } - - if len(leafs) == 0 { - return errors.New("no blob headers found") - } - - tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New())) - if err != nil { - return fmt.Errorf("failed to create merkle tree: %w", err) - } - - if !reflect.DeepEqual(tree.Root(), batchHeader.BatchRoot[:]) { - return errors.New("invalid batch header") - } - - return nil -} - func (n *Node) updateSocketAddress(ctx context.Context, newSocketAddr string) { n.mu.Lock() defer n.mu.Unlock() diff --git a/node/store.go b/node/store.go index 752557463b..03d0ab775f 100644 --- a/node/store.go +++ b/node/store.go @@ -6,9 +6,10 @@ import ( "encoding/binary" "errors" "fmt" + "time" + "github.com/Layr-Labs/eigenda/common/kvstore" "github.com/Layr-Labs/eigenda/common/kvstore/leveldb" - "time" "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" @@ -475,154 +476,6 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs return &keys, nil } -func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobsProto []*node.Blob) (*[][]byte, error) { - storeBlobsStart := time.Now() - keys := make([][]byte, 0) - batch := s.db.NewBatch() - - expirationTime := s.expirationTime() - - // Generate key/value pairs for all blob headers and blob chunks . - size := int64(0) - var serializationDuration, encodingDuration time.Duration - for idx, blob := range blobs { - rawBlob := blobsProto[idx] - if len(rawBlob.GetBundles()) != len(blob.Bundles) { - return nil, fmt.Errorf("internal error: the number of bundles in parsed blob (%d) must be the same as in raw blob (%d)", len(rawBlob.GetBundles()), len(blob.Bundles)) - } - - blobHeaderHash, err := blob.BlobHeader.GetBlobHeaderHash() - if err != nil { - return nil, fmt.Errorf("failed to get blob header hash: %w", err) - } - - // expiration key - expirationKey := EncodeBlobExpirationKey(expirationTime, blobHeaderHash) - keys = append(keys, expirationKey) - batch.Put(expirationKey, blobHeaderHash[:]) - - // blob header - blobHeaderKey := EncodeBlobHeaderKeyByHash(blobHeaderHash) - if s.HasKey(ctx, blobHeaderKey) { - s.logger.Warn("Blob already exists", "blobHeaderHash", hexutil.Encode(blobHeaderHash[:])) - continue - } - - blobHeaderBytes, err := proto.Marshal(blobsProto[idx].GetHeader()) - if err != nil { - return nil, fmt.Errorf("failed to serialize the blob header proto: %w", err) - } - keys = append(keys, blobHeaderKey) - batch.Put(blobHeaderKey, blobHeaderBytes) - - // Get raw chunks - start := time.Now() - format := GetBundleEncodingFormat(rawBlob) - rawBundles := make(map[core.QuorumID][]byte) - rawChunks := make(map[core.QuorumID][][]byte) - for i, bundle := range rawBlob.GetBundles() { - quorumID := uint8(rawBlob.GetHeader().GetQuorumHeaders()[i].GetQuorumId()) - if format == core.GnarkBundleEncodingFormat { - if len(bundle.GetChunks()) > 0 && len(bundle.GetChunks()[0]) > 0 { - return nil, errors.New("chunks of a bundle are encoded together already") - } - rawBundles[quorumID] = bundle.GetBundle() - } else { - rawChunks[quorumID] = make([][]byte, len(bundle.GetChunks())) - for j, chunk := range bundle.GetChunks() { - rawChunks[quorumID][j] = chunk - } - } - } - serializationDuration += time.Since(start) - start = time.Now() - // blob chunks - for quorumID, bundle := range blob.Bundles { - key, err := EncodeBlobKeyByHash(blobHeaderHash, quorumID) - if err != nil { - return nil, fmt.Errorf("failed to generate the key for storing blob: %w", err) - } - - if format == core.GnarkBundleEncodingFormat { - rawBundle, ok := rawBundles[quorumID] - if ok { - size += int64(len(rawBundle)) - keys = append(keys, key) - batch.Put(key, rawBundle) - } - } else if format == core.GobBundleEncodingFormat { - if len(rawChunks[quorumID]) != len(bundle) { - return nil, errors.New("internal error: the number of chunks in parsed blob bundle must be the same as in raw blob bundle") - } - chunksBytes, ok := rawChunks[quorumID] - if ok { - - bundleRaw := make([][]byte, len(bundle)) - for i := 0; i < len(bundle); i++ { - bundleRaw[i] = chunksBytes[i] - } - chunkBytes, err := EncodeChunks(bundleRaw) - if err != nil { - return nil, err - } - size += int64(len(chunkBytes)) - keys = append(keys, key) - batch.Put(key, chunkBytes) - } - } else { - return nil, fmt.Errorf("invalid bundle encoding format: %d", format) - } - } - encodingDuration += time.Since(start) - } - - start := time.Now() - // Write all the key/value pairs to the local database atomically. - err := batch.Apply() - if err != nil { - return nil, fmt.Errorf("failed to write the batch into local database: %w", err) - } - throughput := float64(size) / time.Since(start).Seconds() - s.metrics.DBWriteThroughput.Set(throughput) - s.logger.Debug("StoreBlobs succeeded", "chunk serialization duration", serializationDuration, "bytes encoding duration", encodingDuration, "num blobs", len(blobs), "num of key-value pair entries", len(keys), "write batch duration", time.Since(start), "write throughput (MB/s)", throughput/1000_000, "total store batch duration", time.Since(storeBlobsStart), "total bytes", size) - - return &keys, nil -} - -func (s *Store) StoreBatchBlobMapping(ctx context.Context, batchHeader *core.BatchHeader, blobHeaderHashes [][32]byte) error { - start := time.Now() - batch := s.db.NewBatch() - - batchHeaderHash, err := batchHeader.GetBatchHeaderHash() - if err != nil { - return fmt.Errorf("failed to get the batch header hash: %w", err) - } - - expirationTime := s.expirationTime() - expirationKey := EncodeBatchMappingExpirationKey(expirationTime, batchHeaderHash) - batch.Put(expirationKey, batchHeaderHash[:]) - - for blobIndex, blobHeaderHash := range blobHeaderHashes { - blobIndexKey := EncodeBlobIndexKey(batchHeaderHash, blobIndex) - batch.Put(blobIndexKey, blobHeaderHash[:]) - } - - // Generate the key/value pair for batch header. - batchHeaderKey := EncodeBatchHeaderKey(batchHeaderHash) - batchHeaderBytes, err := batchHeader.Serialize() - if err != nil { - return fmt.Errorf("failed to serialize the batch header: %w", err) - } - batch.Put(batchHeaderKey, batchHeaderBytes) - - err = batch.Apply() - if err != nil { - return fmt.Errorf("failed to write the blob index mappings into local database: %w", err) - } - s.logger.Debug("StoreBatchBlobMapping succeeded", "duration", time.Since(start)) - return nil -} - func (s *Store) expirationTime() int64 { // Setting the expiration time for the batch. curr := time.Now().Unix() diff --git a/node/store_test.go b/node/store_test.go index af3c786e3b..ef64a78e90 100644 --- a/node/store_test.go +++ b/node/store_test.go @@ -260,15 +260,6 @@ func TestStoreBatchInvalidBlob(t *testing.T) { assert.EqualError(t, err, "chunks of a bundle are encoded together already") } -func TestStoreBlobsInvalidBlob(t *testing.T) { - s := createStore(t) - ctx := context.Background() - _, blobs, blobsProto := CreateBatchWith(t, true) - blobsProto[0].Bundles[0].Chunks = [][]byte{[]byte{1}} - _, err := s.StoreBlobs(ctx, blobs, blobsProto) - assert.EqualError(t, err, "chunks of a bundle are encoded together already") -} - func TestStoreBatchSuccess(t *testing.T) { s := createStore(t) ctx := context.Background() @@ -360,154 +351,6 @@ func TestStoreBatchSuccess(t *testing.T) { assert.False(t, s.HasKey(ctx, blobKey2)) } -func TestStoreBlobsSuccess(t *testing.T) { - s := createStore(t) - ctx := context.Background() - - // Empty store - blobKey := []byte{1, 2} - assert.False(t, s.HasKey(ctx, blobKey)) - - // Prepare data to store. - _, blobs, blobsProto := CreateBatch(t) - - // Store a batch. - _, err := s.StoreBlobs(ctx, blobs, blobsProto) - assert.Nil(t, err) - - // Check existence: blob headers. - blobHeaderHash0, err := blobs[0].BlobHeader.GetBlobHeaderHash() - assert.Nil(t, err) - blobHeaderKey0 := node.EncodeBlobHeaderKeyByHash(blobHeaderHash0) - assert.True(t, s.HasKey(ctx, blobHeaderKey0)) - blobHeaderHash1, err := blobs[1].BlobHeader.GetBlobHeaderHash() - assert.Nil(t, err) - blobHeaderKey1 := node.EncodeBlobHeaderKeyByHash(blobHeaderHash1) - assert.True(t, s.HasKey(ctx, blobHeaderKey1)) - blobHeaderBytes0, err := s.GetBlobHeaderByHeaderHash(ctx, blobHeaderHash0) - assert.Nil(t, err) - expected, err := proto.Marshal(blobsProto[0].GetHeader()) - assert.Nil(t, err) - assert.True(t, bytes.Equal(blobHeaderBytes0, expected)) - blobHeaderBytes1, err := s.GetBlobHeaderByHeaderHash(ctx, blobHeaderHash1) - assert.Nil(t, err) - expected, err = proto.Marshal(blobsProto[1].GetHeader()) - assert.Nil(t, err) - assert.True(t, bytes.Equal(blobHeaderBytes1, expected)) - - // Check existence: blob chunks. - blobKey0, err := node.EncodeBlobKeyByHash(blobHeaderHash0, 0) - assert.Nil(t, err) - assert.True(t, s.HasKey(ctx, blobKey0)) - blobKey1, err := node.EncodeBlobKeyByHash(blobHeaderHash1, 0) - assert.Nil(t, err) - assert.True(t, s.HasKey(ctx, blobKey1)) - - // Expire the batches. - curTime := time.Now().Unix() + int64(staleMeasure+storeDuration)*12 - // Try to expire at a time before expiry, so nothing will be expired. - numBatchesDeleted, numMappingsDeleted, numBlobsDeleted, err := s.DeleteExpiredEntries(curTime-10, 5) - assert.Nil(t, err) - assert.Equal(t, numBatchesDeleted, 0) - assert.Equal(t, numMappingsDeleted, 0) - assert.Equal(t, numBlobsDeleted, 0) - assert.True(t, s.HasKey(ctx, blobHeaderKey0)) - // Then expire it at a time post expiry, so the batch will get purged. - numBatchesDeleted, numMappingsDeleted, numBlobsDeleted, err = s.DeleteExpiredEntries(curTime+10, 5) - assert.Nil(t, err) - assert.Equal(t, numBatchesDeleted, 0) - assert.Equal(t, numMappingsDeleted, 0) - assert.Equal(t, numBlobsDeleted, 2) - assert.False(t, s.HasKey(ctx, blobHeaderKey0)) - assert.False(t, s.HasKey(ctx, blobHeaderKey1)) - assert.False(t, s.HasKey(ctx, blobKey0)) - assert.False(t, s.HasKey(ctx, blobKey1)) -} - -func TestStoreBatchBlobMapping(t *testing.T) { - s := createStore(t) - ctx := context.Background() - - // Empty store - blobKey := []byte{1, 2} - assert.False(t, s.HasKey(ctx, blobKey)) - - // Prepare data to store. - batchHeader, blobs, blobsProto := CreateBatch(t) - batchHeaderHash, err := batchHeader.GetBatchHeaderHash() - assert.Nil(t, err) - blobHeaderHash0, err := blobs[0].BlobHeader.GetBlobHeaderHash() - assert.Nil(t, err) - blobHeaderHash1, err := blobs[1].BlobHeader.GetBlobHeaderHash() - assert.Nil(t, err) - - // Store a batch. - _, err = s.StoreBlobs(ctx, blobs, blobsProto) - assert.Nil(t, err) - err = s.StoreBatchBlobMapping(ctx, batchHeader, [][32]byte{blobHeaderHash0, blobHeaderHash1}) - assert.Nil(t, err) - - // Check existence: batch header. - batchHeaderKey := node.EncodeBatchHeaderKey(batchHeaderHash) - assert.True(t, s.HasKey(ctx, batchHeaderKey)) - batchHeaderBytes, err := s.GetBatchHeader(ctx, batchHeaderHash) - assert.Nil(t, err) - expectedBatchHeaderBytes, err := batchHeader.Serialize() - assert.Nil(t, err) - assert.True(t, bytes.Equal(batchHeaderBytes, expectedBatchHeaderBytes)) - - // Check existence: blob index mapping - blobIndexKey0 := node.EncodeBlobIndexKey(batchHeaderHash, 0) - blobIndexKey1 := node.EncodeBlobIndexKey(batchHeaderHash, 1) - assert.True(t, s.HasKey(ctx, blobIndexKey0)) - assert.True(t, s.HasKey(ctx, blobIndexKey1)) - - bhh0, err := s.GetBlobHeaderHashAtIndex(ctx, batchHeaderHash, 0) - assert.Nil(t, err) - assert.Equal(t, blobHeaderHash0, bhh0) - bhh1, err := s.GetBlobHeaderHashAtIndex(ctx, batchHeaderHash, 1) - assert.Nil(t, err) - assert.Equal(t, blobHeaderHash1, bhh1) - - // Check blob headers by GetBlobHeader method - var protoBlobHeader pb.BlobHeader - blobHeaderBytes0, err := s.GetBlobHeader(ctx, batchHeaderHash, 0) - assert.Nil(t, err) - err = proto.Unmarshal(blobHeaderBytes0, &protoBlobHeader) - assert.Nil(t, err) - blobHeader0, err := core.BlobHeaderFromProto(&protoBlobHeader) - assert.Nil(t, err) - - assert.Equal(t, blobHeader0, blobs[0].BlobHeader) - blobHeaderBytes1, err := s.GetBlobHeader(ctx, batchHeaderHash, 1) - assert.Nil(t, err) - err = proto.Unmarshal(blobHeaderBytes1, &protoBlobHeader) - assert.Nil(t, err) - blobHeader1, err := core.BlobHeaderFromProto(&protoBlobHeader) - assert.Nil(t, err) - assert.Equal(t, blobHeader1, blobs[1].BlobHeader) - blobHeaderBytes2, err := s.GetBlobHeader(ctx, batchHeaderHash, 2) - assert.ErrorIs(t, err, node.ErrKeyNotFound) - assert.Nil(t, blobHeaderBytes2) - - // Expire the batches. - curTime := time.Now().Unix() + int64(staleMeasure+storeDuration)*12 - // Try to expire at a time before expiry, so nothing will be expired. - numBatchesDeleted, numMappingsDeleted, _, err := s.DeleteExpiredEntries(curTime-10, 5) - assert.Nil(t, err) - assert.Equal(t, numBatchesDeleted, 0) - assert.Equal(t, numMappingsDeleted, 0) - assert.True(t, s.HasKey(ctx, blobIndexKey0)) - assert.True(t, s.HasKey(ctx, blobIndexKey1)) - // Then expire it at a time post expiry, so the batch will get purged. - numBatchesDeleted, numMappingsDeleted, _, err = s.DeleteExpiredEntries(curTime+10, 5) - assert.Nil(t, err) - assert.Equal(t, numBatchesDeleted, 1) - assert.Equal(t, numMappingsDeleted, 2) - assert.False(t, s.HasKey(ctx, blobIndexKey0)) - assert.False(t, s.HasKey(ctx, blobIndexKey1)) -} - func decodeChunks(t *testing.T, s *node.Store, batchHeaderHash [32]byte, blobIdx int, chunkEncoding pb.ChunkEncodingFormat) []*encoding.Frame { ctx := context.Background() chunks, format, err := s.GetChunks(ctx, batchHeaderHash, blobIdx, 0)