Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Node Client StoreChunks method #852

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions api/clients/mock/node_client_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

type MockNodeClientV2 struct {
mock.Mock
}

var _ clients.NodeClientV2 = (*MockNodeClientV2)(nil)

func NewNodeClientV2() *MockNodeClientV2 {
return &MockNodeClientV2{}
}

func (c *MockNodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
args := c.Called()
var signature *core.Signature
if args.Get(0) != nil {
signature = (args.Get(0)).(*core.Signature)
}
return signature, args.Error(1)
}

func (c *MockNodeClientV2) Close() error {
args := c.Called()
return args.Error(0)
}
116 changes: 116 additions & 0 deletions api/clients/node_client_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package clients

import (
"context"
"fmt"
"sync"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
nodegrpc "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"google.golang.org/grpc"
)

type NodeClientV2Config struct {
Hostname string
Port string
UseSecureGrpcFlag bool
}

type NodeClientV2 interface {
StoreChunks(ctx context.Context, certs *corev2.Batch) (*core.Signature, error)
Close() error
}

type nodeClientV2 struct {
config *NodeClientV2Config
initOnce sync.Once
conn *grpc.ClientConn

dispersalClient nodegrpc.DispersalClient
}

var _ NodeClientV2 = (*nodeClientV2)(nil)

func NewNodeClientV2(config *NodeClientV2Config) (*nodeClientV2, error) {
if config == nil || config.Hostname == "" || config.Port == "" {
return nil, fmt.Errorf("invalid config: %v", config)
}
return &nodeClientV2{
config: config,
}, nil
}

func (c *nodeClientV2) StoreChunks(ctx context.Context, batch *corev2.Batch) (*core.Signature, error) {
if len(batch.BlobCertificates) == 0 {
return nil, fmt.Errorf("no blob certificates in the batch")
}

if err := c.initOnceGrpcConnection(); err != nil {
return nil, err
}

blobCerts := make([]*commonpb.BlobCertificate, len(batch.BlobCertificates))
for i, cert := range batch.BlobCertificates {
var err error
blobCerts[i], err = cert.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob certificate to protobuf: %v", err)
}
}

// Call the gRPC method to store chunks
response, err := c.dispersalClient.StoreChunks(ctx, &nodegrpc.StoreChunksRequest{
Batch: &commonpb.Batch{
Header: &commonpb.BatchHeader{
BatchRoot: batch.BatchHeader.BatchRoot[:],
ReferenceBlockNumber: batch.BatchHeader.ReferenceBlockNumber,
},
BlobCertificates: blobCerts,
},
})
if err != nil {
return nil, err
}

// Extract signatures from the response
if response == nil {
return nil, fmt.Errorf("received nil response from StoreChunks")
}

sigBytes := response.GetSignature()
point, err := new(core.Signature).Deserialize(sigBytes)
if err != nil {
return nil, fmt.Errorf("failed to deserialize signature: %v", err)
}
return &core.Signature{G1Point: point}, nil
}

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *nodeClientV2) Close() error {
if c.conn != nil {
err := c.conn.Close()
c.conn = nil
c.dispersalClient = nil
return err
}
return nil
}

func (c *nodeClientV2) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
conn, err := grpc.Dial(addr, dialOptions...)
if err != nil {
initErr = err
return
}
c.conn = conn
c.dispersalClient = nodegrpc.NewDispersalClient(conn)
})
return initErr
}
8 changes: 8 additions & 0 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,14 @@ func (pm *PaymentMetadata) UnmarshalDynamoDBAttributeValue(av types.AttributeVal
return nil
}

func (pm *PaymentMetadata) ToProtobuf() *commonpb.PaymentHeader {
return &commonpb.PaymentHeader{
AccountId: pm.AccountID,
BinIndex: pm.BinIndex,
CumulativePayment: pm.CumulativePayment.Bytes(),
}
}

// ConvertPaymentHeader converts a protobuf payment header to a PaymentMetadata
func ConvertPaymentHeader(header *commonpb.PaymentHeader) *PaymentMetadata {
return &PaymentMetadata{
Expand Down
46 changes: 44 additions & 2 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package v2
import (
"encoding/hex"
"errors"
"fmt"
"math"
"math/big"
"strings"

pb "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
commonpb "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254"
Expand Down Expand Up @@ -72,7 +73,7 @@ type BlobHeader struct {
Signature []byte
}

func NewBlobHeader(proto *pb.BlobHeader) (*BlobHeader, error) {
func NewBlobHeader(proto *commonpb.BlobHeader) (*BlobHeader, error) {
commitment, err := new(encoding.G1Commitment).Deserialize(proto.GetCommitment().GetCommitment())
if err != nil {
return nil, err
Expand Down Expand Up @@ -126,6 +127,26 @@ func NewBlobHeader(proto *pb.BlobHeader) (*BlobHeader, error) {
}, nil
}

func (b *BlobHeader) ToProtobuf() (*commonpb.BlobHeader, error) {
quorums := make([]uint32, len(b.QuorumNumbers))
for i, q := range b.QuorumNumbers {
quorums[i] = uint32(q)
}

commitments, err := b.BlobCommitments.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob commitments to protobuf: %v", err)
}

return &commonpb.BlobHeader{
Version: uint32(b.BlobVersion),
QuorumNumbers: quorums,
Commitment: commitments,
PaymentHeader: b.PaymentMetadata.ToProtobuf(),
Signature: b.Signature,
}, nil
}

func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) {
params := ParametersMap[b.BlobVersion]

Expand Down Expand Up @@ -296,6 +317,27 @@ type BlobCertificate struct {
RelayKeys []RelayKey
}

func (c *BlobCertificate) ToProtobuf() (*commonpb.BlobCertificate, error) {
if c.BlobHeader == nil {
return nil, fmt.Errorf("blob header is nil")
}

blobHeader, err := c.BlobHeader.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob header to protobuf: %v", err)
}

relays := make([]uint32, len(c.RelayKeys))
for i, r := range c.RelayKeys {
relays[i] = uint32(r)
}

return &commonpb.BlobCertificate{
BlobHeader: blobHeader,
Relays: relays,
}, nil
}

type BatchHeader struct {
BatchRoot [32]byte
ReferenceBlockNumber uint64
Expand Down
4 changes: 2 additions & 2 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestV2DisperseBlob(t *testing.T) {
assert.NoError(t, err)
accountID, err := c.Signer.GetAccountID()
assert.NoError(t, err)
commitmentProto, err := commitments.ToProfobuf()
commitmentProto, err := commitments.ToProtobuf()
assert.NoError(t, err)
blobHeaderProto := &pbcommonv2.BlobHeader{
Version: 0,
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestV2DisperseBlobRequestValidation(t *testing.T) {
BlobHeader: invalidReqProto,
})
assert.ErrorContains(t, err, "blob header must contain commitments")
commitmentProto, err := commitments.ToProfobuf()
commitmentProto, err := commitments.ToProtobuf()
assert.NoError(t, err)

// request with too many quorums
Expand Down
2 changes: 1 addition & 1 deletion encoding/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type BlobCommitments struct {
}

// ToProfobuf converts the BlobCommitments to protobuf format
func (c *BlobCommitments) ToProfobuf() (*pbcommon.BlobCommitment, error) {
func (c *BlobCommitments) ToProtobuf() (*pbcommon.BlobCommitment, error) {
commitData, err := c.Commitment.Serialize()
if err != nil {
return nil, err
Expand Down
Loading