Skip to content

Commit

Permalink
feat: eigenda client returns 503 errors (for failover purpose) (#828)
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf authored Oct 30, 2024
1 parent aca3040 commit 96c02de
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 70 deletions.
2 changes: 2 additions & 0 deletions api/clients/codecs/default_blob_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func NewDefaultBlobCodec() DefaultBlobCodec {
return DefaultBlobCodec{}
}

// EncodeBlob can never return an error, but to maintain the interface it is included
// so that it can be swapped for the IFFTCodec without changing the interface
func (v DefaultBlobCodec) EncodeBlob(rawData []byte) ([]byte, error) {
codecBlobHeader := make([]byte, 32)
// first byte is always 0 to ensure the codecBlobHeader is a valid bn254 element
Expand Down
1 change: 1 addition & 0 deletions api/clients/codecs/ifft_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (v IFFTCodec) EncodeBlob(data []byte) ([]byte, error) {
var err error
data, err = v.writeCodec.EncodeBlob(data)
if err != nil {
// this cannot happen, because EncodeBlob never returns an error
return nil, fmt.Errorf("error encoding data: %w", err)
}

Expand Down
23 changes: 21 additions & 2 deletions api/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ type EigenDAClientConfig struct {
// TODO: we should change this param as its name is quite confusing
ResponseTimeout time.Duration

// The total amount of time that the client will spend waiting for EigenDA
// to "confirm" (include onchain) a blob after it has been dispersed. Note that
// we stick to "confirm" here but this really means InclusionTimeout,
// not confirmation in the sense of confirmation depth.
//
// If ConfirmationTimeout time passes and the blob is not yet confirmed,
// the client will return an api.ErrorFailover to let the caller failover to EthDA.
ConfirmationTimeout time.Duration

// The total amount of time that the client will spend waiting for EigenDA
// to confirm a blob after it has been dispersed
// Note that reasonable values for this field will depend on the value of WaitForFinalization.
Expand Down Expand Up @@ -81,15 +90,25 @@ func (c *EigenDAClientConfig) CheckAndSetDefaults() error {
return fmt.Errorf("EigenDAClientConfig.EthRpcUrl not set. Needed to verify blob confirmed on-chain.")
}

if c.ResponseTimeout == 0 {
c.ResponseTimeout = 30 * time.Second
}
if c.ConfirmationTimeout == 0 {
// batching interval on mainnet is 10 minutes,
// so we set the confirmation timeout to 15 minutes to give some buffer
c.ConfirmationTimeout = 15 * time.Minute
}
if c.StatusQueryRetryInterval == 0 {
c.StatusQueryRetryInterval = 5 * time.Second
}
if c.StatusQueryTimeout == 0 {
c.StatusQueryTimeout = 25 * time.Minute
}
if c.ResponseTimeout == 0 {
c.ResponseTimeout = 30 * time.Second
if c.ConfirmationTimeout > c.StatusQueryTimeout {
// doesn't make sense... confirmation is about onchain inclusion, whereas status query is about reaching finality (after inclusion)
return fmt.Errorf("EigenDAClientConfig.ConfirmationTimeout (%v) > EigenDAClientConfig.StatusQueryTimeout (%v)", c.ConfirmationTimeout, c.StatusQueryTimeout)
}

if len(c.SignerPrivateKeyHex) > 0 && len(c.SignerPrivateKeyHex) != 64 {
return fmt.Errorf("a valid length SignerPrivateKeyHex needs to have 64 bytes")
}
Expand Down
53 changes: 30 additions & 23 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package clients
import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -42,6 +41,8 @@ func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag b
type DisperserClient interface {
Close() error
DisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
// DisperseBlobAuthenticated disperses a blob with an authenticated request.
// The BlobStatus returned will always be PROCESSSING if error is nil.
DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
DispersePaidBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
Expand Down Expand Up @@ -112,7 +113,7 @@ func (c *disperserClient) Close() error {
func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, nil, fmt.Errorf("error initializing connection: %w", err)
return nil, nil, api.NewErrorFailover(err)
}

ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout)
Expand Down Expand Up @@ -154,17 +155,17 @@ func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quo
func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, nil, fmt.Errorf("error initializing connection: %w", err)
return nil, nil, api.NewErrorFailover(err)
}

if c.signer == nil {
return nil, nil, fmt.Errorf("uninitialized signer for authenticated dispersal")
return nil, nil, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
}

// first check if signer is valid
accountId, err := c.signer.GetAccountID()
if err != nil {
return nil, nil, fmt.Errorf("please configure signer key if you want to use authenticated endpoint %w", err)
return nil, nil, api.NewErrorInvalidArg(fmt.Sprintf("please configure signer key if you want to use authenticated endpoint %v", err))
}

quorumNumbers := make([]uint32, len(quorums))
Expand All @@ -175,7 +176,10 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
// check every 32 bytes of data are within the valid range for a bn254 field element
_, err = rs.ToFrArray(data)
if err != nil {
return nil, nil, fmt.Errorf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617, %w", err)
return nil, nil, api.NewErrorInvalidArg(
fmt.Sprintf("encountered an error to convert a 32-bytes into a valid field element, "+
"please use the correct format where every 32bytes(big-endian) is less than "+
"21888242871839275222246405745257275088548364400416034343698204186575808495617, %v", err))
}

request := &disperser_rpc.DisperseBlobRequest{
Expand All @@ -185,19 +189,19 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
}

ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout)

defer cancel()

stream, err := c.client.DisperseBlobAuthenticated(ctxTimeout)
if err != nil {
// grpc client errors return grpc errors, so we can just wrap the error in a normal wrapError,
// no need to wrap in another grpc error as we do with other errors above.
return nil, nil, fmt.Errorf("error while calling DisperseBlobAuthenticated: %w", err)
}

// Send the initial request
err = stream.Send(&disperser_rpc.AuthenticatedRequest{Payload: &disperser_rpc.AuthenticatedRequest_DisperseRequest{
DisperseRequest: request,
}})

if err != nil {
return nil, nil, fmt.Errorf("failed to send request: %w", err)
}
Expand All @@ -209,18 +213,17 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
}
authHeaderReply, ok := reply.Payload.(*disperser_rpc.AuthenticatedReply_BlobAuthHeader)
if !ok {
return nil, nil, errors.New("expected challenge")
return nil, nil, api.NewErrorInternal(fmt.Sprintf("client expected challenge from disperser, instead received: %v", reply))
}

authHeader := core.BlobAuthHeader{
BlobCommitments: encoding.BlobCommitments{},
AccountID: "",
Nonce: authHeaderReply.BlobAuthHeader.ChallengeParameter,
}

authData, err := c.signer.SignBlobRequest(authHeader)
if err != nil {
return nil, nil, errors.New("error signing blob request")
return nil, nil, api.NewErrorInternal(fmt.Sprintf("error signing blob request: %v", err))
}

// Process challenge and send back challenge_reply
Expand All @@ -239,12 +242,17 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
}
disperseReply, ok := reply.Payload.(*disperser_rpc.AuthenticatedReply_DisperseReply) // Process the final disperse_reply
if !ok {
return nil, nil, errors.New("expected DisperseReply")
return nil, nil, api.NewErrorInternal(fmt.Sprintf("client expected DisperseReply from disperser, instead received: %v", reply))
}

blobStatus, err := disperser.FromBlobStatusProto(disperseReply.DisperseReply.GetResult())
if err != nil {
return nil, nil, err
return nil, nil, api.NewErrorInternal(fmt.Sprintf("parsing blob status: %v", err))
}

// Assert: only status that makes sense is processing. Anything else is a bug on disperser side.
if *blobStatus != disperser.Processing {
return nil, nil, api.NewErrorInternal(fmt.Sprintf("expected status to be Processing, got %v", *blobStatus))
}

return blobStatus, disperseReply.DisperseReply.GetRequestId(), nil
Expand All @@ -253,7 +261,7 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) (*disperser_rpc.BlobStatusReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, fmt.Errorf("error initializing connection: %w", err)
return nil, api.NewErrorInternal(err.Error())
}

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
Expand All @@ -262,19 +270,13 @@ func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) (
request := &disperser_rpc.BlobStatusRequest{
RequestId: requestID,
}

reply, err := c.client.GetBlobStatus(ctxTimeout, request)
if err != nil {
return nil, err
}

return reply, nil
return c.client.GetBlobStatus(ctxTimeout, request)
}

func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, fmt.Errorf("error initializing connection: %w", err)
return nil, api.NewErrorInternal(err.Error())
}

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
Expand All @@ -289,6 +291,8 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by
return reply.Data, nil
}

// initOnceGrpcConnection initializes the grpc connection and client if they are not already initialized.
// If initialization fails, it caches the error and will return it on every subsequent call.
func (c *disperserClient) initOnceGrpcConnection() error {
var initErr error
c.initOnce.Do(func() {
Expand All @@ -302,7 +306,10 @@ func (c *disperserClient) initOnceGrpcConnection() error {
c.conn = conn
c.client = disperser_rpc.NewDisperserClient(conn)
})
return initErr
if initErr != nil {
return fmt.Errorf("initializing grpc connection: %w", initErr)
}
return nil
}

func getGrpcDialOptions(useSecureGrpcFlag bool) []grpc.DialOption {
Expand Down
27 changes: 27 additions & 0 deletions api/clients/disperser_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package clients_test

import (
"context"
"testing"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestPutBlobNoopSigner(t *testing.T) {
config := clients.NewConfig("nohost", "noport", time.Second, false)
disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner())

test := []byte("test")
test[0] = 0x00 // make sure the first byte of the requst is always 0
quorums := []uint8{0}
_, _, err := disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums)
st, isGRPCError := status.FromError(err)
assert.True(t, isGRPCError)
assert.Equal(t, codes.InvalidArgument.String(), st.Code().String())
assert.Equal(t, "please configure signer key if you want to use authenticated endpoint noop signer cannot get accountID", st.Message())
}
Loading

0 comments on commit 96c02de

Please sign in to comment.