From c3033aa827ac46f099ab6ccfc8e3576259e782d5 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Thu, 7 Nov 2024 09:35:27 -0800 Subject: [PATCH] v2 dispatcher --- core/serialization.go | 21 + disperser/batcher/batcher.go | 10 +- .../v2/blobstore/dynamo_metadata_store.go | 2 +- .../blobstore/dynamo_metadata_store_test.go | 44 ++ disperser/controller/dispatcher.go | 412 ++++++++++++++++++ disperser/controller/dispatcher_test.go | 315 +++++++++++++ disperser/controller/encoding_manager.go | 2 +- disperser/controller/encoding_manager_test.go | 10 +- .../controller/mock_node_client_manager.go | 18 + disperser/controller/node_client_manager.go | 57 +++ .../controller/node_client_manager_test.go | 40 ++ 11 files changed, 915 insertions(+), 16 deletions(-) create mode 100644 disperser/controller/dispatcher.go create mode 100644 disperser/controller/dispatcher_test.go create mode 100644 disperser/controller/mock_node_client_manager.go create mode 100644 disperser/controller/node_client_manager.go create mode 100644 disperser/controller/node_client_manager_test.go diff --git a/core/serialization.go b/core/serialization.go index 534a52cccf..457879d9b6 100644 --- a/core/serialization.go +++ b/core/serialization.go @@ -485,6 +485,27 @@ func BlobHeaderFromProto(h *pb.BlobHeader) (*BlobHeader, error) { }, nil } +func SerializeMerkleProof(proof *merkletree.Proof) []byte { + proofBytes := make([]byte, 0) + for _, hash := range proof.Hashes { + proofBytes = append(proofBytes, hash[:]...) + } + return proofBytes +} + +func DeserializeMerkleProof(data []byte) (*merkletree.Proof, error) { + proof := &merkletree.Proof{} + if len(data)%32 != 0 { + return nil, fmt.Errorf("invalid proof length") + } + for i := 0; i < len(data); i += 32 { + var hash [32]byte + copy(hash[:], data[i:i+32]) + proof.Hashes = append(proof.Hashes, hash[:]) + } + return proof, nil +} + func encode(obj any) ([]byte, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index f5fef3d38f..d970eb8516 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -302,7 +302,7 @@ func (b *Batcher) updateConfirmationInfo( blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex]) continue } - proof = serializeProof(merkleProof) + proof = core.SerializeMerkleProof(merkleProof) } confirmationInfo := &disperser.ConfirmationInfo{ @@ -563,14 +563,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { return nil } -func serializeProof(proof *merkletree.Proof) []byte { - proofBytes := make([]byte, 0) - for _, hash := range proof.Hashes { - proofBytes = append(proofBytes, hash[:]...) - } - return proofBytes -} - func (b *Batcher) parseBatchIDFromReceipt(txReceipt *types.Receipt) (uint32, error) { if len(txReceipt.Logs) == 0 { return 0, errors.New("failed to get transaction receipt with logs") diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index b9c17c8d3f..847dfc93ab 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -157,7 +157,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status Value: strconv.Itoa(int(status)), }, ":updatedAt": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), + Value: strconv.FormatInt(int64(lastUpdatedAt), 10), }}) if err != nil { return nil, err diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index c56f4186b3..cbe192d30b 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -80,6 +80,11 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.NoError(t, err) assert.Len(t, queued, 1) assert.Equal(t, metadata1, queued[0]) + // query to get newer blobs should result in 0 results + queued, err = blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, metadata1.UpdatedAt+100) + assert.NoError(t, err) + assert.Len(t, queued, 0) + certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified, 0) assert.NoError(t, err) assert.Len(t, certified, 1) @@ -153,6 +158,45 @@ func TestBlobMetadataStoreCerts(t *testing.T) { err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1, fragmentInfo) assert.ErrorIs(t, err, common.ErrAlreadyExists) + // get multiple certs + numCerts := 100 + keys := make([]corev2.BlobKey, numCerts) + for i := 0; i < numCerts; i++ { + blobCert := &corev2.BlobCertificate{ + BlobHeader: &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: uint32(i), + CumulativePayment: big.NewInt(321), + }, + Signature: []byte("signature"), + }, + RelayKeys: []corev2.RelayKey{0}, + } + blobKey, err := blobCert.BlobHeader.BlobKey() + assert.NoError(t, err) + keys[i] = blobKey + err = blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo) + assert.NoError(t, err) + } + + certs, fragmentInfos, err := blobMetadataStore.GetBlobCertificates(ctx, keys) + assert.NoError(t, err) + assert.Len(t, certs, numCerts) + assert.Len(t, fragmentInfos, numCerts) + binIndexes := make(map[uint32]struct{}) + for i := 0; i < numCerts; i++ { + assert.Equal(t, fragmentInfos[i], fragmentInfo) + binIndexes[certs[i].BlobHeader.PaymentMetadata.BinIndex] = struct{}{} + } + assert.Len(t, binIndexes, numCerts) + for i := 0; i < numCerts; i++ { + assert.Contains(t, binIndexes, uint32(i)) + } + deleteItems(t, []commondynamodb.Key{ { "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()}, diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go new file mode 100644 index 0000000000..dbf3119d7b --- /dev/null +++ b/disperser/controller/dispatcher.go @@ -0,0 +1,412 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "math" + "time" + + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigensdk-go/logging" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/keccak256" +) + +var errNoBlobsToDispatch = errors.New("no blobs to dispatch") + +type DispatcherConfig struct { + PullInterval time.Duration + + FinalizationBlockDelay uint64 + NodeRequestTimeout time.Duration + NumRequestRetries int +} + +type Dispatcher struct { + DispatcherConfig + + blobMetadataStore *blobstore.BlobMetadataStore + pool common.WorkerPool + chainState core.IndexedChainState + aggregator core.SignatureAggregator + nodeClientManager NodeClientManager + logger logging.Logger + + lastUpdatedAt uint64 +} + +type batchData struct { + Batch *corev2.Batch + BatchHeaderHash [32]byte + BlobKeys []corev2.BlobKey + OperatorState *core.IndexedOperatorState +} + +func NewDispatcher( + config DispatcherConfig, + blobMetadataStore *blobstore.BlobMetadataStore, + pool common.WorkerPool, + chainState core.IndexedChainState, + aggregator core.SignatureAggregator, + nodeClientManager NodeClientManager, + logger logging.Logger, +) (*Dispatcher, error) { + return &Dispatcher{ + DispatcherConfig: config, + + blobMetadataStore: blobMetadataStore, + pool: pool, + chainState: chainState, + aggregator: aggregator, + nodeClientManager: nodeClientManager, + logger: logger.With("component", "Dispatcher"), + + lastUpdatedAt: 0, + }, nil +} + +func (d *Dispatcher) Start(ctx context.Context) error { + go func() { + ticker := time.NewTicker(d.PullInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + sigChan, batchData, err := d.HandleBatch(ctx) + if err != nil { + if errors.Is(err, errNoBlobsToDispatch) { + d.logger.Warn("no blobs to dispatch") + } else { + d.logger.Error("failed to process a batch", "err", err) + } + continue + } + + go func() { + err := d.HandleSignatures(ctx, batchData, sigChan) + if err != nil { + d.logger.Error("failed to handle signatures", "err", err) + } + }() + } + } + }() + + return nil + +} + +func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, *batchData, error) { + currentBlockNumber, err := d.chainState.GetCurrentBlockNumber() + if err != nil { + return nil, nil, fmt.Errorf("failed to get current block number: %w", err) + } + referenceBlockNumber := uint64(currentBlockNumber) - d.FinalizationBlockDelay + + // Get a batch of blobs to dispatch + // This also writes a batch header and blob verification info for each blob in metadata store + batchData, err := d.NewBatch(ctx, referenceBlockNumber) + if err != nil { + return nil, nil, err + } + + batch := batchData.Batch + state := batchData.OperatorState + sigChan := make(chan core.SigningMessage, len(state.IndexedOperators)) + for opID, op := range state.IndexedOperators { + opID := opID + op := op + host, dispersalPort, _, err := core.ParseOperatorSocket(op.Socket) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse operator socket: %w", err) + } + + client, err := d.nodeClientManager.GetClient(host, dispersalPort) + if err != nil { + d.logger.Error("failed to get node client", "operator", opID, "err", err) + continue + } + + d.pool.Submit(func() { + + req := &corev2.DispersalRequest{ + OperatorID: opID, + // TODO: get OperatorAddress + OperatorAddress: gethcommon.Address{}, + Socket: op.Socket, + DispersedAt: uint64(time.Now().UnixNano()), + BatchHeader: *batch.BatchHeader, + } + err := d.blobMetadataStore.PutDispersalRequest(ctx, req) + if err != nil { + d.logger.Error("failed to put dispersal request", "err", err) + return + } + + for i := 0; i < d.NumRequestRetries+1; i++ { + sig, err := d.sendChunks(ctx, client, batch) + if err == nil { + storeErr := d.blobMetadataStore.PutDispersalResponse(ctx, &corev2.DispersalResponse{ + DispersalRequest: req, + RespondedAt: uint64(time.Now().UnixNano()), + Signature: sig.Bytes(), + Error: "", + }) + if storeErr != nil { + d.logger.Error("failed to put dispersal response", "err", storeErr) + } + + sigChan <- core.SigningMessage{ + Signature: sig, + Operator: opID, + BatchHeaderHash: batchData.BatchHeaderHash, + AttestationLatencyMs: 0, // TODO: calculate latency + Err: nil, + } + + break + } + + d.logger.Warn("failed to send chunks", "operator", opID, "NumAttempts", i, "err", err) + time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying + } + }) + } + + return sigChan, batchData, nil +} + +// HandleSignatures receives signatures from operators, validates, and aggregates them +func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error { + quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan) + if err != nil { + return fmt.Errorf("failed to receive and validate signatures: %w", err) + } + quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults)) + i := 0 + for quorumID := range quorumAttestation.QuorumResults { + quorums[i] = quorumID + i++ + } + aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums) + if err != nil { + return fmt.Errorf("failed to aggregate signatures: %w", err) + } + err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{ + BatchHeader: batchData.Batch.BatchHeader, + AttestedAt: uint64(time.Now().UnixNano()), + NonSignerPubKeys: aggSig.NonSigners, + APKG2: aggSig.AggPubKey, + QuorumAPKs: aggSig.QuorumAggPubKeys, + Sigma: aggSig.AggSignature, + QuorumNumbers: quorums, + }) + if err != nil { + return fmt.Errorf("failed to put attestation: %w", err) + } + + err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified) + if err != nil { + return fmt.Errorf("failed to mark blobs as certified: %w", err) + } + + return nil +} + +// NewBatch creates a batch of blobs to dispatch +// Warning: This function is not thread-safe +func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) (*batchData, error) { + blobMetadatas, err := d.blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Encoded, d.lastUpdatedAt) + if err != nil { + return nil, fmt.Errorf("failed to get blob metadata by status: %w", err) + } + + if len(blobMetadatas) == 0 { + return nil, errNoBlobsToDispatch + } + + state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber) + if err != nil { + return nil, fmt.Errorf("failed to get operator state: %w", err) + } + + keys := make([]corev2.BlobKey, len(blobMetadatas)) + for i, metadata := range blobMetadatas { + if metadata == nil || metadata.BlobHeader == nil { + return nil, fmt.Errorf("invalid blob metadata") + } + blobKey, err := metadata.BlobHeader.BlobKey() + if err != nil { + return nil, fmt.Errorf("failed to get blob key: %w", err) + } + keys[i] = blobKey + if metadata.UpdatedAt > d.lastUpdatedAt { + d.lastUpdatedAt = metadata.UpdatedAt + } + } + + certs, _, err := d.blobMetadataStore.GetBlobCertificates(ctx, keys) + if err != nil { + return nil, fmt.Errorf("failed to get blob certificates: %w", err) + } + + if len(certs) != len(keys) { + return nil, fmt.Errorf("blob certificates not found for all blob keys") + } + + certsMap := make(map[corev2.BlobKey]*corev2.BlobCertificate, len(certs)) + for _, cert := range certs { + blobKey, err := cert.BlobHeader.BlobKey() + if err != nil { + return nil, fmt.Errorf("failed to get blob key: %w", err) + } + + certsMap[blobKey] = cert + } + + // Keep the order of certs the same as the order of keys + for i, key := range keys { + c, ok := certsMap[key] + if !ok { + return nil, fmt.Errorf("blob certificate not found for blob key %s", key.Hex()) + } + certs[i] = c + } + + batchHeader := &corev2.BatchHeader{ + BatchRoot: [32]byte{}, + ReferenceBlockNumber: referenceBlockNumber, + } + + tree, err := BuildMerkleTree(certs) + if err != nil { + return nil, fmt.Errorf("failed to build merkle tree: %w", err) + } + + copy(batchHeader.BatchRoot[:], tree.Root()) + + batchHeaderHash, err := batchHeader.Hash() + if err != nil { + return nil, fmt.Errorf("failed to hash batch header: %w", err) + } + + err = d.blobMetadataStore.PutBatchHeader(ctx, batchHeader) + if err != nil { + return nil, fmt.Errorf("failed to put batch header: %w", err) + } + + // accumulate verification infos in a map to avoid duplicate entries + // batch write operation fails if there are duplicate entries + verificationInfoMap := make(map[corev2.BlobKey]*corev2.BlobVerificationInfo) + for i, cert := range certs { + if cert == nil || cert.BlobHeader == nil { + return nil, fmt.Errorf("invalid blob certificate") + } + blobKey, err := cert.BlobHeader.BlobKey() + if err != nil { + return nil, fmt.Errorf("failed to get blob key: %w", err) + } + + merkleProof, err := tree.GenerateProofWithIndex(uint64(i), 0) + if err != nil { + return nil, fmt.Errorf("failed to generate merkle proof: %w", err) + } + + verificationInfoMap[blobKey] = &corev2.BlobVerificationInfo{ + BatchHeader: batchHeader, + BlobKey: blobKey, + BlobIndex: uint32(i), + InclusionProof: core.SerializeMerkleProof(merkleProof), + } + } + + verificationInfos := make([]*corev2.BlobVerificationInfo, len(verificationInfoMap)) + i := 0 + for _, v := range verificationInfoMap { + verificationInfos[i] = v + i++ + } + err = d.blobMetadataStore.PutBlobVerificationInfos(ctx, verificationInfos) + if err != nil { + return nil, fmt.Errorf("failed to put blob verification infos: %w", err) + } + + return &batchData{ + Batch: &corev2.Batch{ + BatchHeader: batchHeader, + BlobCertificates: certs, + }, + BatchHeaderHash: batchHeaderHash, + BlobKeys: keys, + OperatorState: state, + }, nil +} + +// GetOperatorState returns the operator state for the given quorums at the given block number +func (d *Dispatcher) GetOperatorState(ctx context.Context, metadatas []*v2.BlobMetadata, blockNumber uint64) (*core.IndexedOperatorState, error) { + quorums := make(map[core.QuorumID]struct{}, 0) + for _, m := range metadatas { + for _, quorum := range m.BlobHeader.QuorumNumbers { + quorums[quorum] = struct{}{} + } + } + + quorumIds := make([]core.QuorumID, len(quorums)) + i := 0 + for id := range quorums { + quorumIds[i] = id + i++ + } + + // GetIndexedOperatorState should return state for valid quorums only + return d.chainState.GetIndexedOperatorState(ctx, uint(blockNumber), quorumIds) +} + +func (d *Dispatcher) sendChunks(ctx context.Context, client clients.NodeClientV2, batch *corev2.Batch) (*core.Signature, error) { + ctxWithTimeout, cancel := context.WithTimeout(ctx, d.NodeRequestTimeout) + defer cancel() + + sig, err := client.StoreChunks(ctxWithTimeout, batch) + if err != nil { + return nil, fmt.Errorf("failed to store chunks: %w", err) + } + + return sig, nil +} + +func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKey, status v2.BlobStatus) error { + for _, key := range keys { + err := d.blobMetadataStore.UpdateBlobStatus(ctx, key, status) + if err != nil { + d.logger.Error("failed to update blob status", "blobKey", key.Hex(), "status", status.String(), "err", err) + } + } + return nil +} + +func BuildMerkleTree(certs []*corev2.BlobCertificate) (*merkletree.MerkleTree, error) { + leafs := make([][]byte, len(certs)) + for i, cert := range certs { + leaf, err := cert.Hash() + if err != nil { + return nil, fmt.Errorf("failed to compute blob header hash: %w", err) + } + leafs[i] = leaf[:] + } + + tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New())) + if err != nil { + return nil, err + } + + return tree, nil +} diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go new file mode 100644 index 0000000000..c390f897a6 --- /dev/null +++ b/disperser/controller/dispatcher_test.go @@ -0,0 +1,315 @@ +package controller_test + +import ( + "context" + "crypto/rand" + "encoding/hex" + "math/big" + "testing" + "time" + + clientsmock "github.com/Layr-Labs/eigenda/api/clients/mock" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/disperser/controller" + "github.com/Layr-Labs/eigenda/encoding" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/gammazero/workerpool" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/keccak256" +) + +var ( + opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") + opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") + mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ + 0: { + opId0: 1, + opId1: 1, + }, + 1: { + opId0: 1, + opId1: 3, + }, + }) + finalizationBlockDelay = uint64(10) +) + +type dispatcherComponents struct { + Dispatcher *controller.Dispatcher + BlobMetadataStore *blobstore.BlobMetadataStore + Pool common.WorkerPool + ChainReader *coremock.MockWriter + ChainState *coremock.ChainDataMock + SigAggregator *core.StdSignatureAggregator + NodeClientManager *controller.MockClientManager +} + +func TestDispatcherHandleBatch(t *testing.T) { + components := newDispatcherComponents(t) + objs := setupBlobCerts(t, components.BlobMetadataStore, 2) + ctx := context.Background() + + // Get batch header hash to mock signatures + merkleTree, err := controller.BuildMerkleTree(objs.blobCerts) + require.NoError(t, err) + require.NotNil(t, merkleTree) + require.NotNil(t, merkleTree.Root()) + batchHeader := &corev2.BatchHeader{ + ReferenceBlockNumber: blockNumber - finalizationBlockDelay, + } + copy(batchHeader.BatchRoot[:], merkleTree.Root()) + bhh, err := batchHeader.Hash() + require.NoError(t, err) + + mockClient0 := clientsmock.NewNodeClientV2() + sig0 := mockChainState.KeyPairs[opId0].SignMessage(bhh) + mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(sig0, nil) + op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort + op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort + require.NotEqual(t, op0Port, op1Port) + components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil) + mockClient1 := clientsmock.NewNodeClientV2() + sig1 := mockChainState.KeyPairs[opId1].SignMessage(bhh) + mockClient1.On("StoreChunks", mock.Anything, mock.Anything).Return(sig1, nil) + components.NodeClientManager.On("GetClient", mock.Anything, op1Port).Return(mockClient1, nil) + + sigChan, batchData, err := components.Dispatcher.HandleBatch(ctx) + require.NoError(t, err) + err = components.Dispatcher.HandleSignatures(ctx, batchData, sigChan) + require.NoError(t, err) + + // Test that the blob metadata status are updated + bm0, err := components.BlobMetadataStore.GetBlobMetadata(ctx, objs.blobKeys[0]) + require.NoError(t, err) + require.Equal(t, v2.Certified, bm0.BlobStatus) + bm1, err := components.BlobMetadataStore.GetBlobMetadata(ctx, objs.blobKeys[1]) + require.NoError(t, err) + require.Equal(t, v2.Certified, bm1.BlobStatus) + + // Get batch header + vis, err := components.BlobMetadataStore.GetBlobVerificationInfos(ctx, objs.blobKeys[0]) + require.NoError(t, err) + require.Len(t, vis, 1) + bhh, err = vis[0].BatchHeader.Hash() + require.NoError(t, err) + + // Test that attestation is written + att, err := components.BlobMetadataStore.GetAttestation(ctx, bhh) + require.NoError(t, err) + require.NotNil(t, att) + require.Equal(t, vis[0].BatchHeader, att.BatchHeader) + require.Greater(t, att.AttestedAt, uint64(0)) + require.Len(t, att.NonSignerPubKeys, 0) + require.NotNil(t, att.APKG2) + require.Len(t, att.QuorumAPKs, 2) + require.NotNil(t, att.Sigma) + require.ElementsMatch(t, att.QuorumNumbers, []core.QuorumID{0, 1}) +} + +func TestDispatcherNewBatch(t *testing.T) { + components := newDispatcherComponents(t) + objs := setupBlobCerts(t, components.BlobMetadataStore, 2) + require.Len(t, objs.blobHedaers, 2) + require.Len(t, objs.blobKeys, 2) + require.Len(t, objs.blobMetadatas, 2) + require.Len(t, objs.blobCerts, 2) + ctx := context.Background() + + batchData, err := components.Dispatcher.NewBatch(ctx, blockNumber) + require.NoError(t, err) + batch := batchData.Batch + bhh, keys, state := batchData.BatchHeaderHash, batchData.BlobKeys, batchData.OperatorState + require.NotNil(t, batch) + require.NotNil(t, batch.BatchHeader) + require.NotNil(t, bhh) + require.NotNil(t, keys) + require.NotNil(t, state) + require.ElementsMatch(t, keys, objs.blobKeys) + + // Test that the batch header hash is correct + hash, err := batch.BatchHeader.Hash() + require.NoError(t, err) + require.Equal(t, bhh, hash) + + // Test that the batch header is correct + require.Equal(t, blockNumber, batch.BatchHeader.ReferenceBlockNumber) + require.NotNil(t, batch.BatchHeader.BatchRoot) + + // Test that the batch header is written + bh, err := components.BlobMetadataStore.GetBatchHeader(ctx, bhh) + require.NoError(t, err) + require.NotNil(t, bh) + require.Equal(t, bh, batch.BatchHeader) + + // Test that blob verification infos are written + vi0, err := components.BlobMetadataStore.GetBlobVerificationInfo(ctx, objs.blobKeys[0], bhh) + require.NoError(t, err) + require.NotNil(t, vi0) + cert := batch.BlobCertificates[vi0.BlobIndex] + require.Equal(t, objs.blobHedaers[0], cert.BlobHeader) + require.Equal(t, objs.blobKeys[0], vi0.BlobKey) + require.Equal(t, bh, vi0.BatchHeader) + certHash, err := cert.Hash() + require.NoError(t, err) + proof, err := core.DeserializeMerkleProof(vi0.InclusionProof) + require.NoError(t, err) + verified, err := merkletree.VerifyProofUsing(certHash[:], false, proof, [][]byte{vi0.BatchRoot[:]}, keccak256.New()) + require.NoError(t, err) + require.True(t, verified) + + // Attempt to create a batch with the same blobs + _, err = components.Dispatcher.NewBatch(ctx, blockNumber) + require.ErrorContains(t, err, "no blobs to dispatch") +} + +func TestDispatcherBuildMerkleTree(t *testing.T) { + certs := []*corev2.BlobCertificate{ + { + BlobHeader: &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "account 1", + BinIndex: 0, + CumulativePayment: big.NewInt(532), + }, + Signature: []byte("signature"), + }, + RelayKeys: []corev2.RelayKey{0}, + }, + { + BlobHeader: &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0, 1}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "account 2", + BinIndex: 0, + CumulativePayment: big.NewInt(532), + }, + Signature: []byte("signature"), + }, + RelayKeys: []corev2.RelayKey{0, 1, 2}, + }, + } + merkleTree, err := controller.BuildMerkleTree(certs) + require.NoError(t, err) + require.NotNil(t, merkleTree) + require.NotNil(t, merkleTree.Root()) + + proof, err := merkleTree.GenerateProofWithIndex(uint64(0), 0) + require.NoError(t, err) + require.NotNil(t, proof) + hash, err := certs[0].Hash() + require.NoError(t, err) + verified, err := merkletree.VerifyProofUsing(hash[:], false, proof, [][]byte{merkleTree.Root()}, keccak256.New()) + require.NoError(t, err) + require.True(t, verified) + + proof, err = merkleTree.GenerateProofWithIndex(uint64(1), 0) + require.NoError(t, err) + require.NotNil(t, proof) + hash, err = certs[1].Hash() + require.NoError(t, err) + verified, err = merkletree.VerifyProofUsing(hash[:], false, proof, [][]byte{merkleTree.Root()}, keccak256.New()) + require.NoError(t, err) + require.True(t, verified) +} + +type testObjects struct { + blobHedaers []*corev2.BlobHeader + blobKeys []corev2.BlobKey + blobMetadatas []*v2.BlobMetadata + blobCerts []*corev2.BlobCertificate +} + +func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore, numObjects int) *testObjects { + ctx := context.Background() + headers := make([]*corev2.BlobHeader, numObjects) + keys := make([]corev2.BlobKey, numObjects) + metadatas := make([]*v2.BlobMetadata, numObjects) + certs := make([]*corev2.BlobCertificate, numObjects) + for i := 0; i < numObjects; i++ { + randomBytes := make([]byte, 16) + _, err := rand.Read(randomBytes) + require.NoError(t, err) + randomBinIndex, err := rand.Int(rand.Reader, big.NewInt(1000)) + require.NoError(t, err) + binIndex := uint32(randomBinIndex.Uint64()) + headers[i] = &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0, 1}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: hex.EncodeToString(randomBytes), + BinIndex: binIndex, + CumulativePayment: big.NewInt(532), + }, + } + key, err := headers[i].BlobKey() + require.NoError(t, err) + keys[i] = key + now := time.Now() + metadatas[i] = &v2.BlobMetadata{ + BlobHeader: headers[i], + BlobStatus: v2.Encoded, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()) - uint64(i), + } + err = blobMetadataStore.PutBlobMetadata(ctx, metadatas[i]) + require.NoError(t, err) + + certs[i] = &corev2.BlobCertificate{ + BlobHeader: headers[i], + RelayKeys: []corev2.RelayKey{0, 1, 2}, + } + err = blobMetadataStore.PutBlobCertificate(ctx, certs[i], &encoding.FragmentInfo{}) + require.NoError(t, err) + } + + return &testObjects{ + blobHedaers: headers, + blobKeys: keys, + blobMetadatas: metadatas, + blobCerts: certs, + } +} + +func newDispatcherComponents(t *testing.T) *dispatcherComponents { + // logger := logging.NewNoopLogger() + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + require.NoError(t, err) + pool := workerpool.New(5) + + chainReader := &coremock.MockWriter{} + chainReader.On("OperatorIDToAddress").Return(gethcommon.Address{0}, nil) + agg, err := core.NewStdSignatureAggregator(logger, chainReader) + require.NoError(t, err) + nodeClientManager := &controller.MockClientManager{} + mockChainState.On("GetCurrentBlockNumber").Return(uint(blockNumber), nil) + d, err := controller.NewDispatcher(controller.DispatcherConfig{ + PullInterval: 1 * time.Second, + FinalizationBlockDelay: finalizationBlockDelay, + NodeRequestTimeout: 1 * time.Second, + NumRequestRetries: 3, + }, blobMetadataStore, pool, mockChainState, agg, nodeClientManager, logger) + require.NoError(t, err) + return &dispatcherComponents{ + Dispatcher: d, + BlobMetadataStore: blobMetadataStore, + Pool: pool, + ChainReader: chainReader, + ChainState: mockChainState, + SigAggregator: agg, + NodeClientManager: nodeClientManager, + } +} diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 7ee6c72dea..28ce060b7d 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -22,7 +22,7 @@ import ( var errNoBlobsToEncode = errors.New("no blobs to encode") type EncodingManagerConfig struct { - PullInterval time.Duration + PullInterval time.Duration EncodingRequestTimeout time.Duration StoreTimeout time.Duration diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go index 3b48aa6570..f1e652726f 100644 --- a/disperser/controller/encoding_manager_test.go +++ b/disperser/controller/encoding_manager_test.go @@ -22,7 +22,7 @@ import ( ) var ( - blockNumber = uint32(100) + blockNumber = uint64(100) ) type testComponents struct { @@ -91,7 +91,7 @@ func TestGetRelayKeys(t *testing.T) { } } -func TestHandleBatch(t *testing.T) { +func TestEncodingManagerHandleBatch(t *testing.T) { ctx := context.Background() blobHeader1 := &corev2.BlobHeader{ BlobVersion: 0, @@ -141,14 +141,14 @@ func TestHandleBatch(t *testing.T) { assert.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4)) } -func TestHandleBatchNoBlobs(t *testing.T) { +func TestEncodingManagerHandleBatchNoBlobs(t *testing.T) { ctx := context.Background() c := newTestComponents(t) err := c.EncodingManager.HandleBatch(ctx) assert.ErrorContains(t, err, "no blobs to encode") } -func TestHandleBatchRetrySuccess(t *testing.T) { +func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) { ctx := context.Background() blobHeader1 := &corev2.BlobHeader{ BlobVersion: 0, @@ -200,7 +200,7 @@ func TestHandleBatchRetrySuccess(t *testing.T) { c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2) } -func TestHandleBatchRetryFailure(t *testing.T) { +func TestEncodingManagerHandleBatchRetryFailure(t *testing.T) { ctx := context.Background() blobHeader1 := &corev2.BlobHeader{ BlobVersion: 0, diff --git a/disperser/controller/mock_node_client_manager.go b/disperser/controller/mock_node_client_manager.go new file mode 100644 index 0000000000..d9e54f9cc3 --- /dev/null +++ b/disperser/controller/mock_node_client_manager.go @@ -0,0 +1,18 @@ +package controller + +import ( + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/stretchr/testify/mock" +) + +type MockClientManager struct { + mock.Mock +} + +var _ NodeClientManager = (*MockClientManager)(nil) + +func (m *MockClientManager) GetClient(host, port string) (clients.NodeClientV2, error) { + args := m.Called(host, port) + client, _ := args.Get(0).(clients.NodeClientV2) + return client, args.Error(1) +} diff --git a/disperser/controller/node_client_manager.go b/disperser/controller/node_client_manager.go new file mode 100644 index 0000000000..0957428c68 --- /dev/null +++ b/disperser/controller/node_client_manager.go @@ -0,0 +1,57 @@ +package controller + +import ( + "fmt" + + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigensdk-go/logging" + lru "github.com/hashicorp/golang-lru/v2" +) + +type NodeClientManager interface { + GetClient(host, port string) (clients.NodeClientV2, error) +} + +type nodeClientManager struct { + // nodeClients is a cache of node clients keyed by socket address + nodeClients *lru.Cache[string, clients.NodeClientV2] + logger logging.Logger +} + +var _ NodeClientManager = (*nodeClientManager)(nil) + +func NewNodeClientManager(cacheSize int, logger logging.Logger) (*nodeClientManager, error) { + closeClient := func(socket string, value clients.NodeClientV2) { + if err := value.Close(); err != nil { + logger.Error("failed to close node client", "err", err) + } + } + nodeClients, err := lru.NewWithEvict(cacheSize, closeClient) + if err != nil { + return nil, fmt.Errorf("failed to create LRU cache: %w", err) + } + + return &nodeClientManager{ + nodeClients: nodeClients, + logger: logger, + }, nil +} + +func (m *nodeClientManager) GetClient(host, port string) (clients.NodeClientV2, error) { + socket := fmt.Sprintf("%s:%s", host, port) + client, ok := m.nodeClients.Get(socket) + if !ok { + var err error + client, err = clients.NewNodeClientV2(&clients.NodeClientV2Config{ + Hostname: host, + Port: port, + }) + if err != nil { + return nil, fmt.Errorf("failed to create node client at %s: %w", socket, err) + } + + m.nodeClients.Add(socket, client) + } + + return client, nil +} diff --git a/disperser/controller/node_client_manager_test.go b/disperser/controller/node_client_manager_test.go new file mode 100644 index 0000000000..ffe0dc5a6a --- /dev/null +++ b/disperser/controller/node_client_manager_test.go @@ -0,0 +1,40 @@ +package controller_test + +import ( + "testing" + + "github.com/Layr-Labs/eigenda/disperser/controller" + "github.com/stretchr/testify/require" +) + +func TestNodeClientManager(t *testing.T) { + m, err := controller.NewNodeClientManager(2, nil) + require.NoError(t, err) + + client0, err := m.GetClient("localhost", "0000") + require.NoError(t, err) + require.NotNil(t, client0) + + client1, err := m.GetClient("localhost", "0000") + require.NoError(t, err) + require.NotNil(t, client1) + + require.Same(t, client0, client1) + + // fill up the cache + client2, err := m.GetClient("localhost", "1111") + require.NoError(t, err) + require.NotNil(t, client2) + + // evict client0 + client3, err := m.GetClient("localhost", "2222") + require.NoError(t, err) + require.NotNil(t, client3) + + // accessing client0 again should create new client + client4, err := m.GetClient("localhost", "0000") + require.NoError(t, err) + require.NotNil(t, client0) + + require.NotSame(t, client0, client4) +}