Skip to content

Commit

Permalink
[v2] node/relay bug fixes (#908)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 20, 2024
1 parent d6b99b5 commit 88b1ac3
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 91 deletions.
92 changes: 53 additions & 39 deletions api/clients/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,49 +45,44 @@ type RelayClient interface {
type relayClient struct {
config *RelayClientConfig

initOnce map[corev2.RelayKey]*sync.Once
conns map[corev2.RelayKey]*grpc.ClientConn
logger logging.Logger

grpcClients map[corev2.RelayKey]relaygrpc.RelayClient
// initOnce is used to ensure that the connection to each relay is initialized only once.
// It maps relay key to a sync.Once instance: `map[corev2.RelayKey]*sync.Once`
initOnce *sync.Map
// conns maps relay key to the gRPC connection: `map[corev2.RelayKey]*grpc.ClientConn`
conns sync.Map
logger logging.Logger

// grpcClients maps relay key to the gRPC client: `map[corev2.RelayKey]relaygrpc.RelayClient`
grpcClients sync.Map
}

var _ RelayClient = (*relayClient)(nil)

// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayClient, error) {
if config == nil || len(config.Sockets) > 0 {
if config == nil || len(config.Sockets) <= 0 {
return nil, fmt.Errorf("invalid config: %v", config)
}

initOnce := make(map[corev2.RelayKey]*sync.Once)
conns := make(map[corev2.RelayKey]*grpc.ClientConn)
grpcClients := make(map[corev2.RelayKey]relaygrpc.RelayClient)
initOnce := sync.Map{}
for key := range config.Sockets {
initOnce[key] = &sync.Once{}
initOnce.Store(key, &sync.Once{})
}
return &relayClient{
config: config,

initOnce: initOnce,
conns: conns,
initOnce: &initOnce,
logger: logger,

grpcClients: grpcClients,
}, nil
}

func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
if err := c.initOnceGrpcConnection(relayKey); err != nil {
client, err := c.getClient(relayKey)
if err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

res, err := client.GetBlob(ctx, &relaygrpc.GetBlobRequest{
BlobKey: blobKey[:],
})
Expand All @@ -102,15 +97,11 @@ func (c *relayClient) GetChunksByRange(ctx context.Context, relayKey corev2.Rela
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
client, err := c.getClient(relayKey)
if err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
for i, req := range requests {
grpcRequests[i] = &relaygrpc.ChunkRequest{
Expand Down Expand Up @@ -138,13 +129,10 @@ func (c *relayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.Rela
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
client, err := c.getClient(relayKey)
if err != nil {
return nil, err
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
Expand All @@ -169,9 +157,28 @@ func (c *relayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.Rela
return res.GetData(), nil
}

func (c *relayClient) getClient(key corev2.RelayKey) (relaygrpc.RelayClient, error) {
if err := c.initOnceGrpcConnection(key); err != nil {
return nil, err
}
maybeClient, ok := c.grpcClients.Load(key)
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", key)
}
client, ok := maybeClient.(relaygrpc.RelayClient)
if !ok {
return nil, fmt.Errorf("invalid grpc client for relay key: %v", key)
}
return client, nil
}

func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
var initErr error
c.initOnce[key].Do(func() {
once, ok := c.initOnce.Load(key)
if !ok {
return fmt.Errorf("unknown relay key: %v", key)
}
once.(*sync.Once).Do(func() {
socket, ok := c.config.Sockets[key]
if !ok {
initErr = fmt.Errorf("unknown relay key: %v", key)
Expand All @@ -183,24 +190,31 @@ func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
initErr = err
return
}
c.conns[key] = conn
c.grpcClients[key] = relaygrpc.NewRelayClient(conn)
c.conns.Store(key, conn)
c.grpcClients.Store(key, relaygrpc.NewRelayClient(conn))
})
return initErr
}

func (c *relayClient) Close() error {
var errList *multierror.Error
for k, conn := range c.conns {
c.conns.Range(func(k, v interface{}) bool {
conn, ok := v.(*grpc.ClientConn)
if !ok {
errList = multierror.Append(errList, fmt.Errorf("invalid connection for relay key: %v", k))
return true
}

if conn != nil {
err := conn.Close()
conn = nil
c.grpcClients[k] = nil
c.conns.Delete(k)
c.grpcClients.Delete(k)
if err != nil {
c.logger.Error("failed to close connection", "err", err)
errList = multierror.Append(errList, err)
}
}
}
return true
})
return errList.ErrorOrNil()
}
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type Config struct {
EthClientConfig geth.EthClientConfig
LoggerConfig common.LoggerConfig
EncoderConfig kzg.KzgConfig

EnableV2 bool
}

// NewConfig parses the Config from the provided flags or environment variables and
Expand Down Expand Up @@ -232,5 +234,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
BLSKeyPassword: ctx.GlobalString(flags.BlsKeyPasswordFlag.Name),
BLSSignerTLSCertFilePath: ctx.GlobalString(flags.BLSSignerCertFileFlag.Name),
BLSRemoteSignerEnabled: blsRemoteSignerEnabled,
EnableV2: ctx.GlobalBool(flags.EnableV2Flag.Name),
}, nil
}
7 changes: 7 additions & 0 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ var (
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_GNARK_BUNDLE_ENCODING"),
}
EnableV2Flag = cli.BoolFlag{
Name: "enable-v2",
Usage: "Enable V2 features",
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_V2"),
}

// Test only, DO NOT USE the following flags in production

Expand Down Expand Up @@ -346,6 +352,7 @@ var optionalFlags = []cli.Flag{
BLSRemoteSignerUrlFlag,
BLSPublicKeyHexFlag,
BLSSignerCertFileFlag,
EnableV2Flag,
}

func init() {
Expand Down
1 change: 1 addition & 0 deletions node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func makeConfig(t *testing.T) *node.Config {
DbPath: t.TempDir(),
ID: opID,
NumBatchValidators: runtime.GOMAXPROCS(0),
EnableV2: false,
}
}

Expand Down
16 changes: 16 additions & 0 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.No
}

func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) {
if !s.config.EnableV2 {
return nil, api.NewErrorInvalidArg("v2 API is disabled")
}

if s.node.StoreV2 == nil {
return nil, api.NewErrorInternal("v2 store not initialized")
}
batch, err := s.validateStoreChunksRequest(in)
if err != nil {
return nil, err
Expand All @@ -68,6 +75,7 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
return nil, api.NewErrorInternal(fmt.Sprintf("invalid batch header: %v", err))
}

s.logger.Info("new StoreChunks request", "batchHeaderHash", hex.EncodeToString(batchHeaderHash[:]), "numBlobs", len(batch.BlobCertificates), "referenceBlockNumber", batch.BatchHeader.ReferenceBlockNumber)
operatorState, err := s.node.ChainState.GetOperatorStateByOperator(ctx, uint(batch.BatchHeader.ReferenceBlockNumber), s.node.Config.ID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -136,6 +144,14 @@ func (s *ServerV2) validateStoreChunksRequest(req *pb.StoreChunksRequest) (*core
}

func (s *ServerV2) GetChunks(ctx context.Context, in *pb.GetChunksRequest) (*pb.GetChunksReply, error) {
if !s.config.EnableV2 {
return nil, api.NewErrorInvalidArg("v2 API is disabled")
}

if s.node.StoreV2 == nil {
return nil, api.NewErrorInternal("v2 store not initialized")
}

blobKey, err := corev2.BytesToBlobKey(in.GetBlobKey())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid blob key: %v", err))
Expand Down
Loading

0 comments on commit 88b1ac3

Please sign in to comment.