Skip to content

collection/rpc: convert RPC engine to component.Component #7726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 30 additions & 31 deletions engine/collection/rpc/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import (
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip" // required for gRPC compression
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"

_ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate"
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
)

// Backend defines the core functionality required by the RPC API.
Expand All @@ -41,10 +43,12 @@ type Config struct {
// Engine implements a gRPC server with a simplified version of the Observation
// API to enable receiving transactions into the system.
type Engine struct {
unit *engine.Unit
component.Component
cm *component.ComponentManager

log zerolog.Logger
handler *handler // the gRPC service implementation
server *grpc.Server // the gRPC server
handler *handler
server *grpc.Server
config Config
}

Expand Down Expand Up @@ -83,8 +87,7 @@ func New(
server := grpc.NewServer(grpcOpts...)

e := &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "collection_rpc").Logger(),
log: log.With().Str("engine", "collection_rpc").Logger(),
handler: &handler{
UnimplementedAccessAPIServer: access.UnimplementedAccessAPIServer{},
backend: backend,
Expand All @@ -101,42 +104,38 @@ func New(

access.RegisterAccessAPIServer(e.server, e.handler)

return e
}

// Ready returns a ready channel that is closed once the module has fully
// started. The ingress module is ready when the gRPC server has successfully
// started.
func (e *Engine) Ready() <-chan struct{} {
e.unit.Launch(e.serve)
return e.unit.Ready()
}
e.cm = component.NewComponentManagerBuilder().
AddWorker(e.serveGRPCWorker).
AddWorker(e.shutdownWorker).
Build()
e.Component = e.cm

// Done returns a done channel that is closed once the module has fully stopped.
// It sends a signal to stop the gRPC server, then closes the channel.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done(e.server.GracefulStop)
return e
}

// serve starts the gRPC server .
//
// When this function returns, the server is considered ready.
func (e *Engine) serve() {

e.log.Info().Msgf("starting server on address %s", e.config.ListenAddr)
func (e *Engine) serveGRPCWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
e.log.Info().Str("listen_addr", e.config.ListenAddr).Msg("starting server on address")

l, err := net.Listen("tcp", e.config.ListenAddr)
if err != nil {
e.log.Fatal().Err(err).Msg("failed to start server")
e.log.Err(err).Msg("failed to start server")
ctx.Throw(fmt.Errorf("failed to start server: %w", err))
return
}
ready()

err = e.server.Serve(l)
if err != nil {
if err := e.server.Serve(l); err != nil {
e.log.Error().Err(err).Msg("fatal error in server")
ctx.Throw(fmt.Errorf("error encountered while running grpc server: %w", err))
}
}

func (e *Engine) shutdownWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
<-ctx.Done()
e.server.GracefulStop()
}

// handler implements a subset of the Observation API.
type handler struct {
access.UnimplementedAccessAPIServer
Expand Down
Loading