Skip to content

Commit

Permalink
External Subscriber Support (#1457)
Browse files Browse the repository at this point in the history
* add external subscriber + update morpheuscli

rebase main

* add external subscriber tests

* address cr nits

* remove chainID, networkID from initialization message

* nit

* undo snowctx diff

* nit

* clean up integration tests + nits

* nits

* Update external subscriber log to use zap.Uint64 instead of Any
  • Loading branch information
RodrigoVillar authored Aug 30, 2024
1 parent 2aaa5b4 commit 802f358
Show file tree
Hide file tree
Showing 18 changed files with 870 additions and 35 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ awscpu
.vscode*
.env

*.pb*

# db*

*cpu[0-9]*
Expand Down
28 changes: 3 additions & 25 deletions api/ws/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ const (
)

var (
_ api.HandlerFactory[api.VM] = (*WebSocketServerFactory)(nil)
_ event.Subscription[struct{}] = (*subscriptionFunc[struct{}])(nil)
_ event.SubscriptionFactory[struct{}] = (*subscriptionFuncFactory[struct{}])(nil)
_ api.HandlerFactory[api.VM] = (*WebSocketServerFactory)(nil)

ErrExpired = errors.New("expired")
)
Expand Down Expand Up @@ -75,13 +73,13 @@ func OptionFunc(v *vm.VM, configBytes []byte) error {
)

webSocketFactory := NewWebSocketServerFactory(handler)
txRemovedSubscription := subscriptionFuncFactory[vm.TxRemovedEvent]{
txRemovedSubscription := event.SubscriptionFuncFactory[vm.TxRemovedEvent]{
AcceptF: func(event vm.TxRemovedEvent) error {
return server.RemoveTx(event.TxID, event.Err)
},
}

blockSubscription := subscriptionFuncFactory[*chain.StatelessBlock]{
blockSubscription := event.SubscriptionFuncFactory[*chain.StatelessBlock]{
AcceptF: func(event *chain.StatelessBlock) error {
return server.AcceptBlock(event)
},
Expand All @@ -94,26 +92,6 @@ func OptionFunc(v *vm.VM, configBytes []byte) error {
return nil
}

type subscriptionFuncFactory[T any] struct {
AcceptF func(t T) error
}

func (s subscriptionFuncFactory[T]) New() (event.Subscription[T], error) {
return subscriptionFunc[T](s), nil
}

type subscriptionFunc[T any] struct {
AcceptF func(t T) error
}

func (s subscriptionFunc[T]) Accept(t T) error {
return s.AcceptF(t)
}

func (subscriptionFunc[_]) Close() error {
return nil
}

func NewWebSocketServerFactory(server *pubsub.Server) *WebSocketServerFactory {
return &WebSocketServerFactory{
handler: server,
Expand Down
25 changes: 25 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

package event

var (
_ Subscription[struct{}] = (*SubscriptionFunc[struct{}])(nil)
_ SubscriptionFactory[struct{}] = (*SubscriptionFuncFactory[struct{}])(nil)
)

// SubscriptionFactory returns an instance of a concrete Subscription
type SubscriptionFactory[T any] interface {
New() (Subscription[T], error)
Expand All @@ -15,3 +20,23 @@ type Subscription[T any] interface {
// Close returns fatal errors
Close() error
}

type SubscriptionFuncFactory[T any] struct {
AcceptF func(t T) error
}

func (s SubscriptionFuncFactory[T]) New() (Subscription[T], error) {
return SubscriptionFunc[T](s), nil
}

type SubscriptionFunc[T any] struct {
AcceptF func(t T) error
}

func (s SubscriptionFunc[T]) Accept(t T) error {
return s.AcceptF(t)
}

func (SubscriptionFunc[_]) Close() error {
return nil
}
10 changes: 10 additions & 0 deletions examples/morpheusvm/controller/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package controller

import (
"context"
"encoding/json"
"strings"

"github.com/ava-labs/hypersdk/api/jsonrpc"
Expand Down Expand Up @@ -113,3 +114,12 @@ func (*Parser) StateManager() chain.StateManager {
func NewParser(genesis *genesis.DefaultGenesis) chain.Parser {
return &Parser{genesis: genesis}
}

// Used as a lambda function for creating ExternalSubscriberServer parser
func CreateParser(genesisBytes []byte) (chain.Parser, error) {
var genesis genesis.DefaultGenesis
if err := json.Unmarshal(genesisBytes, &genesis); err != nil {
return nil, err
}
return NewParser(&genesis), nil
}
4 changes: 3 additions & 1 deletion examples/morpheusvm/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ import (
"github.com/ava-labs/hypersdk/examples/morpheusvm/consts"
"github.com/ava-labs/hypersdk/examples/morpheusvm/registry"
"github.com/ava-labs/hypersdk/examples/morpheusvm/storage"
"github.com/ava-labs/hypersdk/extension/externalsubscriber"
"github.com/ava-labs/hypersdk/genesis"
"github.com/ava-labs/hypersdk/vm"
)

// New returns a VM with the indexer, websocket, and rpc apis enabled.
// New returns a VM with the indexer, websocket, rpc, and external subscriber apis enabled.
func New(options ...vm.Option) (*vm.VM, error) {
opts := append([]vm.Option{
indexer.With(),
ws.With(),
jsonrpc.With(),
With(), // Add Controller API
externalsubscriber.With(),
}, options...)

return NewWithOptions(opts...)
Expand Down
3 changes: 1 addition & 2 deletions examples/morpheusvm/tests/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ var _ = ginkgo.BeforeSuite(func() {
genesis, workloadFactory, err := morpheusWorkload.New(0 /* minBlockGap: 0ms */)
require.NoError(err)

parser := controller.NewParser(genesis)
genesisBytes, err := json.Marshal(genesis)
require.NoError(err)

Expand All @@ -42,7 +41,7 @@ var _ = ginkgo.BeforeSuite(func() {
controller.New,
genesisBytes,
lconsts.ID,
parser,
controller.CreateParser,
controller.JSONRPCEndpoint,
workloadFactory,
randomEd25519AuthFactory,
Expand Down
82 changes: 82 additions & 0 deletions extension/externalsubscriber/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package externalsubscriber

import (
"context"

"github.com/ava-labs/avalanchego/utils/logging"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/event"

pb "github.com/ava-labs/hypersdk/proto/pb/externalsubscriber"
)

var _ event.Subscription[*chain.StatelessBlock] = (*ExternalSubscriberClient)(nil)

type ExternalSubscriberClient struct {
conn *grpc.ClientConn
client pb.ExternalSubscriberClient
log logging.Logger
}

func NewExternalSubscriberClient(
ctx context.Context,
log logging.Logger,
serverAddr string,
genesisBytes []byte,
) (*ExternalSubscriberClient, error) {
// Establish connection to server
conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
client := pb.NewExternalSubscriberClient(conn)
// Initialize parser
_, err = client.Initialize(
ctx,
&pb.InitializeRequest{
Genesis: genesisBytes,
},
)
if err != nil {
return nil, err
}
log.Debug("connected to external subscriber server", zap.String("address", serverAddr))
return &ExternalSubscriberClient{
conn: conn,
client: client,
log: log,
}, nil
}

func (e *ExternalSubscriberClient) Accept(blk *chain.StatelessBlock) error {
blockBytes, err := blk.Marshal()
if err != nil {
return err
}
resultsMarshaled, err := chain.MarshalResults(blk.Results())
if err != nil {
return err
}

req := &pb.BlockRequest{
BlockData: blockBytes,
Results: resultsMarshaled,
}
e.log.Debug("sending accepted block to server",
zap.Stringer("blockID", blk.ID()),
zap.Uint64("blockHeight", blk.Hght),
)
_, err = e.client.AcceptBlock(context.TODO(), req)
return err
}

func (e *ExternalSubscriberClient) Close() error {
return e.conn.Close()
}
60 changes: 60 additions & 0 deletions extension/externalsubscriber/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package externalsubscriber

import (
"context"
"encoding/json"

"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/event"
"github.com/ava-labs/hypersdk/vm"
)

const (
Namespace = "externalSubscriber"
)

type Config struct {
Enabled bool `json:"enabled"`
ServerAddress string `json:"serverAddress"`
}

func NewDefaultConfig() Config {
return Config{}
}

func With() vm.Option {
return vm.NewOption(Namespace, OptionFunc)
}

func OptionFunc(v *vm.VM, configBytes []byte) error {
config := NewDefaultConfig()
if len(configBytes) > 0 {
if err := json.Unmarshal(configBytes, &config); err != nil {
return err
}
}
if !config.Enabled {
return nil
}
server, err := NewExternalSubscriberClient(
context.TODO(),
v.Logger(),
config.ServerAddress,
v.GenesisBytes,
)
if err != nil {
return err
}

blockSubscription := event.SubscriptionFuncFactory[*chain.StatelessBlock]{
AcceptF: func(blk *chain.StatelessBlock) error {
return server.Accept(blk)
},
}

vm.WithBlockSubscriptions(blockSubscription)(v)
return nil
}
Loading

0 comments on commit 802f358

Please sign in to comment.