Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed GRPC layer from Engine API #7878

Merged
merged 5 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions cl/phase1/execution_client/execution_client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,17 @@ import (
"time"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces/engine"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client/rpc_helper"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/jsonrpc"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_types"
"github.com/ledgerwatch/log/v3"
)

const DefaultRPCHTTPTimeout = time.Second * 30

const (
ValidStatus = "VALID"
InvalidStatus = "INVALID"
SyncingStatus = "SYNCING"
AcceptedStatus = "ACCEPTED"
InvalidBlockHashStatus = "INVALID_BLOCK_HASH"
)

type ExecutionClientRpc struct {
client *rpc.Client
ctx context.Context
Expand Down Expand Up @@ -85,7 +76,7 @@ func (cc *ExecutionClientRpc) NewPayload(payload *cltypes.Eth1Block) (invalid bo
return
}

request := jsonrpc.ExecutionPayload{
request := engine_types.ExecutionPayload{
ParentHash: payload.ParentHash,
FeeRecipient: payload.FeeRecipient,
StateRoot: payload.StateRoot,
Expand Down Expand Up @@ -117,7 +108,7 @@ func (cc *ExecutionClientRpc) NewPayload(payload *cltypes.Eth1Block) (invalid bo
*request.ExcessDataGas = hexutil.Uint64(payload.ExcessDataGas)
}

payloadStatus := make(map[string]interface{}) // As it is done in the rpcdaemon
payloadStatus := &engine_types.PayloadStatus{} // As it is done in the rpcdaemon
log.Debug("[ExecutionClientRpc] Calling EL", "method", engineMethod)
err = cc.client.CallContext(cc.ctx, &payloadStatus, engineMethod, request)
if err != nil {
Expand All @@ -128,25 +119,19 @@ func (cc *ExecutionClientRpc) NewPayload(payload *cltypes.Eth1Block) (invalid bo
err = fmt.Errorf("execution Client RPC failed to retrieve the NewPayload status response, err: %w", err)
return
}
var status string
var ok bool
if status, ok = payloadStatus["status"].(string); !ok {
err = fmt.Errorf("invalid response received from NewPayload")
return

}
invalid = status == InvalidStatus || status == InvalidBlockHashStatus
invalid = payloadStatus.Status == engine_types.InvalidStatus || payloadStatus.Status == engine_types.InvalidBlockHashStatus

return
}

func (cc *ExecutionClientRpc) ForkChoiceUpdate(finalized libcommon.Hash, head libcommon.Hash) error {
forkChoiceRequest := jsonrpc.ForkChoiceState{
forkChoiceRequest := engine_types.ForkChoiceState{
HeadHash: head,
SafeBlockHash: head,
FinalizedBlockHash: finalized,
}
forkChoiceResp := &engine.EngineForkChoiceUpdatedResponse{}
forkChoiceResp := &engine_types.ForkChoiceUpdatedResponse{}
log.Debug("[ExecutionClientRpc] Calling EL", "method", rpc_helper.ForkChoiceUpdatedV1)

err := cc.client.CallContext(cc.ctx, forkChoiceResp, rpc_helper.ForkChoiceUpdatedV1, forkChoiceRequest)
Expand Down
60 changes: 30 additions & 30 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/engine"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
Expand Down Expand Up @@ -239,10 +238,10 @@ func checkDbCompatibility(ctx context.Context, db kv.RoDB) error {

func EmbeddedServices(ctx context.Context,
erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig,
blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, engineClient engine.EngineClient, txPoolServer txpool.TxpoolServer,
blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer,
miningServer txpool.MiningServer, stateDiffClient StateChangesClient,
logger log.Logger,
) (eth rpchelper.ApiBackend, engine rpchelper.EngineBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, stateCache kvcache.Cache, ff *rpchelper.Filters, err error) {
) (eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, stateCache kvcache.Cache, ff *rpchelper.Filters, err error) {
if stateCacheCfg.CacheSize > 0 {
// notification about new blocks (state stream) doesn't work now inside erigon - because
// erigon does send this stream to privateAPI (erigon with enabled rpc, still have enabled privateAPI).
Expand All @@ -261,7 +260,6 @@ func EmbeddedServices(ctx context.Context,

txPool = direct.NewTxPoolClient(txPoolServer)
mining = direct.NewMiningClient(miningServer)
engine = rpcservices.NewEngineBackend(engineClient)
ff = rpchelper.New(ctx, eth, txPool, mining, func() {}, logger)

return
Expand All @@ -271,27 +269,26 @@ func EmbeddedServices(ctx context.Context,
// `cfg.WithDatadir` (mode when it on 1 machine with Erigon)
func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
db kv.RoDB, borDb kv.RoDB,
eth rpchelper.ApiBackend, engineBackend rpchelper.EngineBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
stateCache kvcache.Cache, blockReader services.FullBlockReader,
ff *rpchelper.Filters, agg *libstate.AggregatorV3, err error) {
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified")
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified")
}
creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
}
conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
}

remoteBackendClient := remote.NewETHBACKENDClient(conn)
engineBackendClient := engine.NewEngineClient(conn)
remoteKvClient := remote.NewKVClient(conn)
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKvClient).Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
}

// Configure DB first
Expand All @@ -304,10 +301,10 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Readonly().Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
if compatErr := checkDbCompatibility(ctx, rwKv); compatErr != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr
return nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr
}
db = rwKv

Expand All @@ -327,10 +324,10 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
}
return nil
}); err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
if cc == nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
}
cfg.Snap.Enabled = cfg.Snap.Enabled || cfg.Sync.UseSnapshots
if !cfg.Snap.Enabled {
Expand All @@ -345,7 +342,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
allSnapshots.LogStat()

if agg, err = libstate.NewAggregatorV3(ctx, cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger); err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder()

Expand Down Expand Up @@ -396,7 +393,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
logger.Info("HistoryV3", "enable", histV3Enabled)
db, err = temporal.New(rwKv, agg, systemcontracts.SystemContractCodeLookup[cc.ChainName])
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}
}
stateCache = kvcache.NewDummy()
Expand All @@ -413,14 +410,14 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
// ensure db exist
tmpDb, err := kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
tmpDb.Close()
}
logger.Trace("Creating consensus db", "path", borDbPath)
borKv, err = kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Readonly().Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
// Skip the compatibility check, until we have a schema in erigon-lib
borDb = borKv
Expand All @@ -439,7 +436,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
if cfg.TxPoolApiAddr != cfg.PrivateApiAddr {
txpoolConn, err = grpcutil.Connect(creds, cfg.TxPoolApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to txpool api: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to txpool api: %w", err)
}
}

Expand All @@ -455,7 +452,6 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
remoteEth := rpcservices.NewRemoteBackend(remoteBackendClient, db, blockReader)
blockReader = remoteEth
eth = remoteEth
engineBackend = rpcservices.NewEngineBackend(engineBackendClient)

go func() {
if !remoteKv.EnsureVersionCompatibility() {
Expand All @@ -473,25 +469,29 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
}()

ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot, logger)
return db, borDb, eth, engineBackend, txPool, mining, stateCache, blockReader, ff, agg, err
return db, borDb, eth, txPool, mining, stateCache, blockReader, ff, agg, err
}

func StartRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API, authAPI []rpc.API, logger log.Logger) error {
if len(authAPI) > 0 {
engineInfo, err := startAuthenticatedRpcServer(cfg, authAPI, logger)
if err != nil {
return err
}
go stopAuthenticatedRpcServer(ctx, engineInfo, logger)
}

func StartRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error {
if cfg.Enabled {
return startRegularRpcServer(ctx, cfg, rpcAPI, logger)
}

return nil
}

func StartRpcServerWithJwtAuthentication(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error {
if len(rpcAPI) == 0 {
return nil
}
engineInfo, err := startAuthenticatedRpcServer(cfg, rpcAPI, logger)
if err != nil {
return err
}
go stopAuthenticatedRpcServer(ctx, engineInfo, logger)
return nil
}

func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error {
// register apis and create handler stack
httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort)
Expand Down
6 changes: 3 additions & 3 deletions cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func main() {
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
logger := debug.SetupCobra(cmd, "sentry")
db, borDb, backend, engineBackend, txPool, mining, stateCache, blockReader, ff, agg, err := cli.RemoteServices(ctx, *cfg, logger, rootCancel)
db, borDb, backend, txPool, mining, stateCache, blockReader, ff, agg, err := cli.RemoteServices(ctx, *cfg, logger, rootCancel)
if err != nil {
logger.Error("Could not connect to DB", "err", err)
return nil
Expand All @@ -30,8 +30,8 @@ func main() {

// TODO: Replace with correct consensus Engine
engine := ethash.NewFaker()
apiList := jsonrpc.APIList(db, borDb, backend, engineBackend, txPool, mining, ff, stateCache, blockReader, agg, *cfg, engine, logger)
if err := cli.StartRpcServer(ctx, *cfg, apiList, nil, logger); err != nil {
apiList := jsonrpc.APIList(db, borDb, backend, txPool, mining, ff, stateCache, blockReader, agg, *cfg, engine, logger)
if err := cli.StartRpcServer(ctx, *cfg, apiList, logger); err != nil {
logger.Error(err.Error())
return nil
}
Expand Down
38 changes: 0 additions & 38 deletions cmd/rpcdaemon/rpcservices/engine_backend.go

This file was deleted.

18 changes: 8 additions & 10 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import (
downloader3 "github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/erigon-lib/gointerfaces/engine"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
Expand Down Expand Up @@ -139,7 +138,7 @@ type Ethereum struct {
genesisHash libcommon.Hash

ethBackendRPC *privateapi.EthBackendServer
engineBackendRPC engine.EngineClient
engineBackendRPC *engineapi.EngineServer
miningRPC txpool_proto.MiningServer
stateChangesClient txpool.StateChangesClient

Expand Down Expand Up @@ -555,8 +554,8 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, blockReader, logger, latestBlockBuiltStore)
// intiialize engine backend
engineSrv := engineapi.NewEngineServer(ctx, logger, chainConfig, assembleBlockPOS, backend.chainDB, blockReader, backend.sentriesClient.Hd, config.Miner.EnabledPOS)
backend.engineBackendRPC = direct.NewEngineClient(engineSrv)
backend.engineBackendRPC = engineapi.NewEngineServer(ctx, logger, chainConfig, assembleBlockPOS, backend.chainDB, blockReader, backend.sentriesClient.Hd, config.Miner.EnabledPOS)

miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi, logger)

var creds credentials.TransportCredentials
Expand All @@ -570,7 +569,6 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
backend.privateAPI, err = privateapi.StartGrpc(
kvRPC,
ethBackendRPC,
engineSrv,
backend.txPoolGrpcServer,
miningRPC,
stack.Config().PrivateApiAddr,
Expand Down Expand Up @@ -770,8 +768,8 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
}
// start HTTP API
httpRpcCfg := stack.Config().Http
ethRpcClient, engineClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC,
s.engineBackendRPC, s.txPoolGrpcServer, miningRPC, stateDiffClient, s.logger)
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC,
s.txPoolGrpcServer, miningRPC, stateDiffClient, s.logger)
if err != nil {
return err
}
Expand All @@ -780,14 +778,14 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
if casted, ok := s.engine.(*bor.Bor); ok {
borDb = casted.DB
}
apiList := jsonrpc.APIList(chainKv, borDb, ethRpcClient, engineClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, httpRpcCfg, s.engine, s.logger)
authApiList := jsonrpc.AuthAPIList(chainKv, ethRpcClient, engineClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, httpRpcCfg, s.engine, s.logger)
apiList := jsonrpc.APIList(chainKv, borDb, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, httpRpcCfg, s.engine, s.logger)
go func() {
if err := cli.StartRpcServer(ctx, httpRpcCfg, apiList, authApiList, s.logger); err != nil {
if err := cli.StartRpcServer(ctx, httpRpcCfg, apiList, s.logger); err != nil {
s.logger.Error(err.Error())
return
}
}()
go s.engineBackendRPC.Start(httpRpcCfg, ff, stateCache, s.agg, s.engine, ethRpcClient, txPoolRpcClient, miningRpcClient)

// Register the backend on the node
stack.RegisterLifecycle(s)
Expand Down
Loading