Skip to content

Commit

Permalink
Clean up unused node methods (#874)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 11, 2024
1 parent 3453468 commit 0761b56
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 473 deletions.
4 changes: 4 additions & 0 deletions api/grpc/node/node_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/proto/node/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ service Dispersal {
// StoreBlobs is simiar to StoreChunks, but it stores the blobs using a different storage schema
// so that the stored blobs can later be aggregated by AttestBatch method to a bigger batch.
// StoreBlobs + AttestBatch will eventually replace and deprecate StoreChunks method.
// DEPRECATED: StoreBlobs method is not used
rpc StoreBlobs(StoreBlobsRequest) returns (StoreBlobsReply) {}
// AttestBatch is used to aggregate the batches stored by StoreBlobs method to a bigger batch.
// It will return a signature at the end to attest to the aggregated batch.
// DEPRECATED: AttestBatch method is not used
rpc AttestBatch(AttestBatchRequest) returns (AttestBatchReply) {}
// Retrieve node info metadata
rpc NodeInfo(NodeInfoRequest) returns (NodeInfoReply) {}
Expand Down
119 changes: 2 additions & 117 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ import (
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/shirou/gopsutil/mem"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"

_ "go.uber.org/automaxprocs"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// Server implements the Node proto APIs.
Expand Down Expand Up @@ -170,125 +168,12 @@ func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*p
return reply, err
}

func (s *Server) validateStoreBlobsRequest(in *pb.StoreBlobsRequest) error {
if in.GetReferenceBlockNumber() == 0 {
return api.NewErrorInvalidArg("missing reference_block_number in request")
}

if len(in.GetBlobs()) == 0 {
return api.NewErrorInvalidArg("missing blobs in request")
}
for _, blob := range in.Blobs {
if blob.GetHeader() == nil {
return api.NewErrorInvalidArg("missing blob header in request")
}
if node.ValidatePointsFromBlobHeader(blob.GetHeader()) != nil {
return api.NewErrorInvalidArg("invalid points contained in the blob header in request")
}
if len(blob.GetHeader().GetQuorumHeaders()) == 0 {
return api.NewErrorInvalidArg("missing quorum headers in request")
}
if len(blob.GetHeader().GetQuorumHeaders()) != len(blob.GetBundles()) {
return api.NewErrorInvalidArg("the number of quorums must be the same as the number of bundles")
}
for _, q := range blob.GetHeader().GetQuorumHeaders() {
if q.GetQuorumId() > core.MaxQuorumID {
return api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, q.GetQuorumId()))
}
if err := core.ValidateSecurityParam(q.GetConfirmationThreshold(), q.GetAdversaryThreshold()); err != nil {
return err
}
}
if in.GetReferenceBlockNumber() != blob.GetHeader().GetReferenceBlockNumber() {
return api.NewErrorInvalidArg("reference_block_number must be the same for all blobs")
}
}
return nil
}

func (s *Server) StoreBlobs(ctx context.Context, in *pb.StoreBlobsRequest) (*pb.StoreBlobsReply, error) {
start := time.Now()

err := s.validateStoreBlobsRequest(in)
if err != nil {
return nil, err
}

blobHeadersSize := 0
bundleSize := 0
for _, blob := range in.Blobs {
blobHeadersSize += proto.Size(blob.GetHeader())
for _, bundle := range blob.GetBundles() {
bundleSize += proto.Size(bundle)
}
}
s.node.Logger.Info("StoreBlobs RPC request received", "numBlobs", len(in.Blobs), "reqMsgSize", proto.Size(in), "blobHeadersSize", blobHeadersSize, "bundleSize", bundleSize, "referenceBlockNumber", in.GetReferenceBlockNumber())

// Process the request
blobs, err := node.GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers)
if err != nil {
return nil, err
}

s.node.Metrics.ObserveLatency("StoreBlobs", "deserialization", float64(time.Since(start).Milliseconds()))
s.node.Logger.Info("StoreBlobsRequest deserialized", "duration", time.Since(start))

signatures, err := s.node.ProcessBlobs(ctx, blobs, in.GetBlobs())
if err != nil {
return nil, err
}

signaturesBytes := make([]*wrappers.BytesValue, len(signatures))
for i, sig := range signatures {
if sig == nil {
signaturesBytes[i] = nil
continue
}
signaturesBytes[i] = wrapperspb.Bytes(sig.Serialize())
}

return &pb.StoreBlobsReply{Signatures: signaturesBytes}, nil
return &pb.StoreBlobsReply{}, api.NewErrorUnimplemented()
}

func (s *Server) AttestBatch(ctx context.Context, in *pb.AttestBatchRequest) (*pb.AttestBatchReply, error) {
start := time.Now()

// Validate the batch root
blobHeaderHashes := make([][32]byte, len(in.GetBlobHeaderHashes()))
for i, hash := range in.GetBlobHeaderHashes() {
if len(hash) != 32 {
return nil, api.NewErrorInvalidArg("invalid blob header hash")
}
var h [32]byte
copy(h[:], hash)
blobHeaderHashes[i] = h
}
batchHeader, err := node.GetBatchHeader(in.GetBatchHeader())
if err != nil {
return nil, fmt.Errorf("failed to get the batch header: %w", err)
}
err = s.node.ValidateBatchContents(ctx, blobHeaderHashes, batchHeader)
if err != nil {
return nil, fmt.Errorf("failed to validate the batch header root: %w", err)
}

// Store the mapping from batch header + blob index to blob header hashes
err = s.node.Store.StoreBatchBlobMapping(ctx, batchHeader, blobHeaderHashes)
if err != nil {
return nil, fmt.Errorf("failed to store the batch blob mapping: %w", err)
}

// Sign the batch header
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
if err != nil {
return nil, fmt.Errorf("failed to get the batch header hash: %w", err)
}
sig := s.node.KeyPair.SignMessage(batchHeaderHash)

s.node.Logger.Info("AttestBatch complete", "duration", time.Since(start))
return &pb.AttestBatchReply{
Signature: sig.Serialize(),
}, nil
return &pb.AttestBatchReply{}, api.NewErrorUnimplemented()
}

func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksRequest) (*pb.RetrieveChunksReply, error) {
Expand Down
181 changes: 0 additions & 181 deletions node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,187 +381,6 @@ func TestStoreChunksRequestValidation(t *testing.T) {
assert.True(t, strings.Contains(err.Error(), "adversary threshold equals 0"))
}

func TestStoreBlobs(t *testing.T) {
server := newTestServer(t, true)

reqToCopy, _, _, blobHeaders, _ := makeStoreChunksRequest(t, 66, 33)
reqToCopy.BatchHeader = nil
req := &pb.StoreBlobsRequest{
Blobs: reqToCopy.Blobs,
ReferenceBlockNumber: 1,
}
reply, err := server.StoreBlobs(context.Background(), req)
assert.NoError(t, err)
assert.NotNil(t, reply.GetSignatures())
assert.Len(t, reply.GetSignatures(), len(blobHeaders))
for i, sig := range reply.GetSignatures() {
assert.NotNil(t, sig)
assert.NotNil(t, sig.Value)
batchHeader := &core.BatchHeader{
ReferenceBlockNumber: 1,
BatchRoot: [32]byte{},
}
_, err := batchHeader.SetBatchRoot([]*core.BlobHeader{blobHeaders[i]})
assert.NoError(t, err)
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
assert.NoError(t, err)
point, err := new(core.Signature).Deserialize(sig.Value)
assert.NoError(t, err)
s := &core.Signature{G1Point: point}
ok := s.Verify(keyPair.GetPubKeyG2(), batchHeaderHash)
assert.True(t, ok)
}
}

func TestMinibatchDispersalAndRetrieval(t *testing.T) {
server := newTestServer(t, true)

reqToCopy, _, _, blobHeaders, _ := makeStoreChunksRequest(t, 66, 33)
reqToCopy.BatchHeader = nil
req := &pb.StoreBlobsRequest{
Blobs: reqToCopy.Blobs,
ReferenceBlockNumber: 1,
}
ctx := context.Background()
reply, err := server.StoreBlobs(ctx, req)
assert.NoError(t, err)
assert.NotNil(t, reply.GetSignatures())

assert.Len(t, blobHeaders, 2)
bhh0, err := blobHeaders[0].GetBlobHeaderHash()
assert.NoError(t, err)
bhh1, err := blobHeaders[1].GetBlobHeaderHash()
assert.NoError(t, err)
batchHeader := &core.BatchHeader{
ReferenceBlockNumber: 1,
BatchRoot: [32]byte{},
}
_, err = batchHeader.SetBatchRoot([]*core.BlobHeader{blobHeaders[0], blobHeaders[1]})
assert.NoError(t, err)
attestReq := &pb.AttestBatchRequest{
BatchHeader: &pb.BatchHeader{
BatchRoot: batchHeader.BatchRoot[:],
ReferenceBlockNumber: 1,
},
BlobHeaderHashes: [][]byte{bhh0[:], bhh1[:]},
}
attestReply, err := server.AttestBatch(ctx, attestReq)
assert.NotNil(t, reply)
assert.NoError(t, err)
sig := attestReply.GetSignature()
assert.NotNil(t, sig)
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
assert.NoError(t, err)
point, err := new(core.Signature).Deserialize(sig)
assert.NoError(t, err)
s := &core.Signature{G1Point: point}
ok := s.Verify(keyPair.GetPubKeyG2(), batchHeaderHash)
assert.True(t, ok)

// Get blob headers
blobHeaderReply, err := server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 0,
QuorumId: 0,
})
assert.NoError(t, err)
assert.NotNil(t, blobHeaderReply)
blobHeader, err := node.GetBlobHeaderFromProto(blobHeaderReply.GetBlobHeader())
assert.NoError(t, err)
assert.Equal(t, blobHeader, blobHeaders[0])
proof := &merkletree.Proof{
Hashes: blobHeaderReply.GetProof().GetHashes(),
Index: uint64(blobHeaderReply.GetProof().GetIndex()),
}
ok, err = merkletree.VerifyProofUsing(bhh0[:], false, proof, [][]byte{batchHeader.BatchRoot[:]}, keccak256.New())
assert.NoError(t, err)
assert.True(t, ok)

blobHeaderReply, err = server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 1,
QuorumId: 0,
})
assert.NoError(t, err)
assert.NotNil(t, blobHeaderReply)
blobHeader, err = node.GetBlobHeaderFromProto(blobHeaderReply.GetBlobHeader())
assert.NoError(t, err)
assert.Equal(t, blobHeader, blobHeaders[1])
proof = &merkletree.Proof{
Hashes: blobHeaderReply.GetProof().GetHashes(),
Index: uint64(blobHeaderReply.GetProof().GetIndex()),
}
ok, err = merkletree.VerifyProofUsing(bhh1[:], false, proof, [][]byte{batchHeader.BatchRoot[:]}, keccak256.New())
assert.NoError(t, err)
assert.True(t, ok)

// non-existent blob index
_, err = server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 2,
QuorumId: 0,
})
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "commit not found in db"))

// Test GetChunks
p := &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP("0.0.0.0"),
Port: 3000,
},
}
ctx = peer.NewContext(context.Background(), p)
retrieveChunksReply, err := server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 0,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err := new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err := new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

retrieveChunksReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 1,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err = new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err = new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

retrieveChunksReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 0,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err = new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err = new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

_, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 1,
QuorumId: 1,
})
assert.ErrorContains(t, err, "quorum ID 1 not found in blob header")
}

func TestRetrieveChunks(t *testing.T) {
server := newTestServer(t, true)
batchHeaderHash, _, _, _ := storeChunks(t, server, false)
Expand Down
Loading

0 comments on commit 0761b56

Please sign in to comment.