Skip to content

Commit

Permalink
More minibatch clean up (#883)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 13, 2024
1 parent 999fb15 commit c694d41
Show file tree
Hide file tree
Showing 7 changed files with 2 additions and 582 deletions.
133 changes: 0 additions & 133 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dispatcher
import (
"context"
"errors"
"fmt"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
Expand Down Expand Up @@ -148,138 +147,6 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.EncodedBlobMe
sig := &core.Signature{G1Point: point}
return sig, nil
}

// SendBlobsToOperator sends blobs to an operator via the node's StoreBlobs endpoint
// It returns the signatures of the blobs sent to the operator in the same order as the blobs
// with nil values for blobs that were not attested by the operator
func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) {
// TODO Add secure Grpc

conn, err := grpc.Dial(
core.OperatorSocket(op.Socket).GetDispersalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err)
return nil, err
}
defer conn.Close()

gc := node.NewDispersalClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
start := time.Now()
request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding)
if err != nil {
return nil, err
}
c.logger.Debug("sending chunks to operator", "operator", op.Socket, "num blobs", len(blobs), "size", totalSize, "request message size", proto.Size(request), "request serialization time", time.Since(start), "use Gnark chunk encoding", c.EnableGnarkBundleEncoding)
opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024)
reply, err := gc.StoreBlobs(ctx, request, opt)

if err != nil {
return nil, err
}

signaturesInBytes := reply.GetSignatures()
signatures := make([]*core.Signature, 0, len(signaturesInBytes))
for _, sigBytes := range signaturesInBytes {
sig := sigBytes.GetValue()
if sig != nil {
point, err := new(core.Signature).Deserialize(sig)
if err != nil {
return nil, err
}
signatures = append(signatures, &core.Signature{G1Point: point})
} else {
signatures = append(signatures, nil)
}
}
return signatures, nil
}

func (c *dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) {
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
if err != nil {
return nil, err
}
responseChan := make(chan core.SigningMessage, len(state.IndexedOperators))

for id, op := range state.IndexedOperators {
go func(op core.IndexedOperatorInfo, id core.OperatorID) {
conn, err := grpc.Dial(
core.OperatorSocket(op.Socket).GetDispersalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
c.logger.Error("disperser cannot connect to operator dispersal socket", "socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err)
return
}
defer conn.Close()

nodeClient := node.NewDispersalClient(conn)

requestedAt := time.Now()
sig, err := c.SendAttestBatchRequest(ctx, nodeClient, blobHeaderHashes, batchHeader, &op)
latencyMs := float64(time.Since(requestedAt).Milliseconds())
if err != nil {
responseChan <- core.SigningMessage{
Err: err,
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: latencyMs,
}
c.metrics.ObserveLatency(id.Hex(), false, latencyMs)
} else {
responseChan <- core.SigningMessage{
Signature: sig,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: latencyMs,
Err: nil,
}
c.metrics.ObserveLatency(id.Hex(), true, latencyMs)
}
}(core.IndexedOperatorInfo{
PubkeyG1: op.PubkeyG1,
PubkeyG2: op.PubkeyG2,
Socket: op.Socket,
}, id)
}

return responseChan, nil
}

func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
// start := time.Now()
hashes := make([][]byte, len(blobHeaderHashes))
for i, hash := range blobHeaderHashes {
hashes[i] = hash[:]
}

request := &node.AttestBatchRequest{
BatchHeader: getBatchHeaderMessage(batchHeader),
BlobHeaderHashes: hashes,
}

c.logger.Debug("sending AttestBatch request to operator", "operator", op.Socket, "numBlobs", len(blobHeaderHashes), "requestMessageSize", proto.Size(request), "referenceBlockNumber", batchHeader.ReferenceBlockNumber)
opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024)
reply, err := nodeDispersalClient.AttestBatch(ctx, request, opt)
if err != nil {
return nil, fmt.Errorf("failed to send AttestBatch request to operator %s: %w", core.OperatorSocket(op.Socket).GetDispersalSocket(), err)
}

sigBytes := reply.GetSignature()
point, err := new(core.Signature).Deserialize(sigBytes)
if err != nil {
return nil, fmt.Errorf("failed to deserialize signature: %w", err)
}
return &core.Signature{G1Point: point}, nil
}

func GetStoreChunksRequest(blobMessages []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
Expand Down
59 changes: 0 additions & 59 deletions disperser/batcher/grpc/dispatcher_test.go

This file was deleted.

4 changes: 0 additions & 4 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/Layr-Labs/eigenda/encoding"

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/api/grpc/node"
gcommon "github.com/ethereum/go-ethereum/common"
)

Expand Down Expand Up @@ -219,9 +218,6 @@ type BlobStore interface {

type Dispatcher interface {
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage
SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error)
AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error)
SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error)
}

// GenerateReverseIndexKey returns the key used to store the blob key in the reverse index
Expand Down
25 changes: 0 additions & 25 deletions disperser/mock/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"

"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -65,27 +64,3 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera

return update
}

func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) {
args := d.Called(ctx, blobs, batchHeader, op)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]*core.Signature), args.Error(1)
}

func (d *Dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) {
args := d.Called(ctx, state, blobHeaderHashes, batchHeader)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(chan core.SigningMessage), args.Error(1)
}

func (d *Dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
args := d.Called(ctx, nodeDispersalClient, blobHeaderHashes, batchHeader, op)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*core.Signature), args.Error(1)
}
55 changes: 0 additions & 55 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/http"
"net/url"
"os"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -22,9 +21,6 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
"google.golang.org/protobuf/proto"

"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/common/geth"
Expand Down Expand Up @@ -418,57 +414,6 @@ func (n *Node) ValidateBatch(ctx context.Context, header *core.BatchHeader, blob
return nil
}

// ValidateBlobHeadersRoot validates the blob headers root hash
// by comparing it with the merkle tree root hash of the blob headers.
// It also checks if all blob headers have the same reference block number
func (n *Node) ValidateBatchContents(ctx context.Context, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) error {
leafs := make([][]byte, 0)
for _, blobHeaderHash := range blobHeaderHashes {
blobHeaderBytes, err := n.Store.GetBlobHeaderByHeaderHash(ctx, blobHeaderHash)
if err != nil {
return fmt.Errorf("failed to get blob header by hash: %w", err)
}
if blobHeaderBytes == nil {
return fmt.Errorf("blob header not found for hash %x", blobHeaderHash)
}

var protoBlobHeader node.BlobHeader
err = proto.Unmarshal(blobHeaderBytes, &protoBlobHeader)
if err != nil {
return fmt.Errorf("failed to unmarshal blob header: %w", err)
}
if uint32(batchHeader.ReferenceBlockNumber) != protoBlobHeader.GetReferenceBlockNumber() {
return errors.New("blob headers have different reference block numbers")
}

blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader)
if err != nil {
return fmt.Errorf("failed to get blob header from proto: %w", err)
}

blobHeaderHash, err := blobHeader.GetBlobHeaderHash()
if err != nil {
return fmt.Errorf("failed to get blob header hash: %w", err)
}
leafs = append(leafs, blobHeaderHash[:])
}

if len(leafs) == 0 {
return errors.New("no blob headers found")
}

tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New()))
if err != nil {
return fmt.Errorf("failed to create merkle tree: %w", err)
}

if !reflect.DeepEqual(tree.Root(), batchHeader.BatchRoot[:]) {
return errors.New("invalid batch header")
}

return nil
}

func (n *Node) updateSocketAddress(ctx context.Context, newSocketAddr string) {
n.mu.Lock()
defer n.mu.Unlock()
Expand Down
Loading

0 comments on commit c694d41

Please sign in to comment.