From 05597cb19569277af2ab318d68f1cbc52e367c2c Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Wed, 17 May 2023 17:36:15 +0100 Subject: [PATCH] [devnet tool] separare logging (#7531) Co-authored-by: Alex Sharp --- cmd/erigon-el/backend/backend.go | 8 ++- cmd/rpcdaemon/cli/config.go | 63 ++++++++++--------- cmd/rpcdaemon/commands/eth_block_test.go | 2 +- cmd/rpcdaemon/commands/eth_call_test.go | 2 +- cmd/rpcdaemon/commands/eth_filters_test.go | 7 ++- cmd/rpcdaemon/commands/eth_mining_test.go | 7 ++- cmd/rpcdaemon/commands/eth_subscribe_test.go | 2 +- .../commands/send_transaction_test.go | 2 +- cmd/rpcdaemon/commands/txpool_api_test.go | 2 +- cmd/rpcdaemon/main.go | 2 +- eth/backend.go | 8 ++- ethdb/privateapi/all.go | 6 +- node/rpcstack.go | 12 ++-- rpc/client_test.go | 6 +- rpc/websocket.go | 24 +++---- rpc/websocket_test.go | 10 ++- turbo/rpchelper/filters.go | 34 +++++----- turbo/rpchelper/filters_deadlock_test.go | 4 +- turbo/rpchelper/filters_test.go | 11 ++-- 19 files changed, 115 insertions(+), 97 deletions(-) diff --git a/cmd/erigon-el/backend/backend.go b/cmd/erigon-el/backend/backend.go index bf2faa6d606..4079b81c905 100644 --- a/cmd/erigon-el/backend/backend.go +++ b/cmd/erigon-el/backend/backend.go @@ -528,7 +528,8 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( stack.Config().PrivateApiAddr, stack.Config().PrivateApiRateLimit, creds, - stack.Config().HealthCheck) + stack.Config().HealthCheck, + logger) if err != nil { return nil, fmt.Errorf("private api: %w", err) } @@ -651,7 +652,8 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( } // start HTTP API httpRpcCfg := stack.Config().Http - ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, backend.blockReader, ethBackendRPC, backend.txPool2GrpcServer, miningRPC, stateDiffClient) + ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, backend.blockReader, ethBackendRPC, + backend.txPool2GrpcServer, miningRPC, stateDiffClient, logger) if err != nil { return nil, err } @@ -663,7 +665,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) ( apiList := commands.APIList(chainKv, borDb, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, backend.blockReader, backend.agg, httpRpcCfg, backend.engine, logger) authApiList := commands.AuthAPIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, backend.blockReader, backend.agg, httpRpcCfg, backend.engine, logger) go func() { - if err := cli.StartRpcServer(ctx, httpRpcCfg, apiList, authApiList); err != nil { + if err := cli.StartRpcServer(ctx, httpRpcCfg, apiList, authApiList, logger); err != nil { logger.Error(err.Error()) return } diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index b9b61f4623d..35ef30816d7 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -245,6 +245,7 @@ func EmbeddedServices(ctx context.Context, erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig, blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer, miningServer txpool.MiningServer, stateDiffClient StateChangesClient, + logger log.Logger, ) (eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, stateCache kvcache.Cache, ff *rpchelper.Filters, err error) { if stateCacheCfg.CacheSize > 0 { // notification about new blocks (state stream) doesn't work now inside erigon - because @@ -263,7 +264,7 @@ func EmbeddedServices(ctx context.Context, eth = rpcservices.NewRemoteBackend(directClient, erigonDB, blockReader) txPool = direct.NewTxPoolClient(txPoolServer) mining = direct.NewMiningClient(miningServer) - ff = rpchelper.New(ctx, eth, txPool, mining, func() {}) + ff = rpchelper.New(ctx, eth, txPool, mining, func() {}, logger) return } @@ -473,31 +474,31 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, } }() - ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot) + ff = rpchelper.New(ctx, eth, txPool, mining, onNewSnapshot, logger) return db, borDb, eth, txPool, mining, stateCache, blockReader, ff, agg, err } -func StartRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API, authAPI []rpc.API) error { +func StartRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API, authAPI []rpc.API, logger log.Logger) error { if len(authAPI) > 0 { - engineInfo, err := startAuthenticatedRpcServer(cfg, authAPI) + engineInfo, err := startAuthenticatedRpcServer(cfg, authAPI, logger) if err != nil { return err } - go stopAuthenticatedRpcServer(ctx, engineInfo) + go stopAuthenticatedRpcServer(ctx, engineInfo, logger) } if cfg.Enabled { - return startRegularRpcServer(ctx, cfg, rpcAPI) + return startRegularRpcServer(ctx, cfg, rpcAPI, logger) } return nil } -func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API) error { +func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error { // register apis and create handler stack httpEndpoint := fmt.Sprintf("%s:%d", cfg.HttpListenAddress, cfg.HttpPort) - log.Trace("TraceRequests = %t\n", cfg.TraceRequests) + logger.Trace("TraceRequests = %t\n", cfg.TraceRequests) srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable) allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath) @@ -523,14 +524,14 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp } } - if err := node.RegisterApisFromWhitelist(defaultAPIList, apiFlags, srv, false); err != nil { + if err := node.RegisterApisFromWhitelist(defaultAPIList, apiFlags, srv, false, logger); err != nil { return fmt.Errorf("could not start register RPC apis: %w", err) } httpHandler := node.NewHTTPHandlerStack(srv, cfg.HttpCORSDomain, cfg.HttpVirtualHost, cfg.HttpCompression) var wsHandler http.Handler if cfg.WebsocketEnabled { - wsHandler = srv.WebsocketHandler([]string{"*"}, nil, cfg.WebsocketCompression) + wsHandler = srv.WebsocketHandler([]string{"*"}, nil, cfg.WebsocketCompression, logger) } graphQLHandler := graphql.CreateHandler(defaultAPIList) @@ -555,10 +556,10 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp defer tcpListener.Close() err := srv.ServeListener(tcpListener) if err != nil { - log.Error("TCP Listener Fatal Error", "err", err) + logger.Error("TCP Listener Fatal Error", "err", err) } }() - log.Info("TCP Endpoint opened", "url", tcpEndpoint) + logger.Info("TCP Endpoint opened", "url", tcpEndpoint) } info := []interface{}{ @@ -586,14 +587,14 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp info = append(info, "grpc.port", cfg.GRPCPort) } - log.Info("HTTP endpoint opened", info...) + logger.Info("HTTP endpoint opened", info...) defer func() { srv.Stop() shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = listener.Shutdown(shutdownCtx) - log.Info("HTTP endpoint closed", "url", httpAddr) + logger.Info("HTTP endpoint closed", "url", httpAddr) if cfg.GRPCServerEnabled { if cfg.GRPCHealthCheckEnabled { @@ -601,11 +602,11 @@ func startRegularRpcServer(ctx context.Context, cfg httpcfg.HttpCfg, rpcAPI []rp } grpcServer.GracefulStop() _ = grpcListener.Close() - log.Info("GRPC endpoint closed", "url", grpcEndpoint) + logger.Info("GRPC endpoint closed", "url", grpcEndpoint) } }() <-ctx.Done() - log.Info("Exiting...") + logger.Info("Exiting...") return nil } @@ -616,18 +617,18 @@ type engineInfo struct { EngineHttpEndpoint string } -func startAuthenticatedRpcServer(cfg httpcfg.HttpCfg, rpcAPI []rpc.API) (*engineInfo, error) { - log.Trace("TraceRequests = %t\n", cfg.TraceRequests) +func startAuthenticatedRpcServer(cfg httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) (*engineInfo, error) { + logger.Trace("TraceRequests = %t\n", cfg.TraceRequests) srv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable) - engineListener, engineSrv, engineHttpEndpoint, err := createEngineListener(cfg, rpcAPI) + engineListener, engineSrv, engineHttpEndpoint, err := createEngineListener(cfg, rpcAPI, logger) if err != nil { return nil, fmt.Errorf("could not start RPC api for engine: %w", err) } return &engineInfo{Srv: srv, EngineSrv: engineSrv, EngineListener: engineListener, EngineHttpEndpoint: engineHttpEndpoint}, nil } -func stopAuthenticatedRpcServer(ctx context.Context, engineInfo *engineInfo) { +func stopAuthenticatedRpcServer(ctx context.Context, engineInfo *engineInfo, logger log.Logger) { defer func() { engineInfo.Srv.Stop() if engineInfo.EngineSrv != nil { @@ -638,11 +639,11 @@ func stopAuthenticatedRpcServer(ctx context.Context, engineInfo *engineInfo) { if engineInfo.EngineListener != nil { _ = engineInfo.EngineListener.Shutdown(shutdownCtx) - log.Info("Engine HTTP endpoint close", "url", engineInfo.EngineHttpEndpoint) + logger.Info("Engine HTTP endpoint close", "url", engineInfo.EngineHttpEndpoint) } }() <-ctx.Done() - log.Info("Exiting Engine...") + logger.Info("Exiting Engine...") } // isWebsocket checks the header of a http request for a websocket upgrade request. @@ -654,9 +655,9 @@ func isWebsocket(r *http.Request) bool { // obtainJWTSecret loads the jwt-secret, either from the provided config, // or from the default location. If neither of those are present, it generates // a new secret and stores to the default location. -func obtainJWTSecret(cfg httpcfg.HttpCfg) ([]byte, error) { +func obtainJWTSecret(cfg httpcfg.HttpCfg, logger log.Logger) ([]byte, error) { // try reading from file - log.Info("Reading JWT secret", "path", cfg.JWTSecretPath) + logger.Info("Reading JWT secret", "path", cfg.JWTSecretPath) // If we run the rpcdaemon and datadir is not specified we just use jwt.hex in current directory. if len(cfg.JWTSecretPath) == 0 { cfg.JWTSecretPath = "jwt.hex" @@ -666,7 +667,7 @@ func obtainJWTSecret(cfg httpcfg.HttpCfg) ([]byte, error) { if len(jwtSecret) == 32 { return jwtSecret, nil } - log.Error("Invalid JWT secret", "path", cfg.JWTSecretPath, "length", len(jwtSecret)) + logger.Error("Invalid JWT secret", "path", cfg.JWTSecretPath, "length", len(jwtSecret)) return nil, errors.New("invalid JWT secret") } // Need to generate one @@ -676,7 +677,7 @@ func obtainJWTSecret(cfg httpcfg.HttpCfg) ([]byte, error) { if err := os.WriteFile(cfg.JWTSecretPath, []byte(hexutility.Encode(jwtSecret)), 0600); err != nil { return nil, err } - log.Info("Generated JWT secret", "path", cfg.JWTSecretPath) + logger.Info("Generated JWT secret", "path", cfg.JWTSecretPath) return jwtSecret, nil } @@ -705,21 +706,21 @@ func createHandler(cfg httpcfg.HttpCfg, apiList []rpc.API, httpHandler http.Hand return handler, nil } -func createEngineListener(cfg httpcfg.HttpCfg, engineApi []rpc.API) (*http.Server, *rpc.Server, string, error) { +func createEngineListener(cfg httpcfg.HttpCfg, engineApi []rpc.API, logger log.Logger) (*http.Server, *rpc.Server, string, error) { engineHttpEndpoint := fmt.Sprintf("%s:%d", cfg.AuthRpcHTTPListenAddress, cfg.AuthRpcPort) engineSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, true) - if err := node.RegisterApisFromWhitelist(engineApi, nil, engineSrv, true); err != nil { + if err := node.RegisterApisFromWhitelist(engineApi, nil, engineSrv, true, logger); err != nil { return nil, nil, "", fmt.Errorf("could not start register RPC engine api: %w", err) } - jwtSecret, err := obtainJWTSecret(cfg) + jwtSecret, err := obtainJWTSecret(cfg, logger) if err != nil { return nil, nil, "", err } - wsHandler := engineSrv.WebsocketHandler([]string{"*"}, jwtSecret, cfg.WebsocketCompression) + wsHandler := engineSrv.WebsocketHandler([]string{"*"}, jwtSecret, cfg.WebsocketCompression, logger) engineHttpHandler := node.NewHTTPHandlerStack(engineSrv, nil /* authCors */, cfg.AuthRpcVirtualHost, cfg.HttpCompression) @@ -736,7 +737,7 @@ func createEngineListener(cfg httpcfg.HttpCfg, engineApi []rpc.API) (*http.Serve } engineInfo := []interface{}{"url", engineAddr, "ws", true, "ws.compression", cfg.WebsocketCompression} - log.Info("HTTP endpoint opened for Engine API", engineInfo...) + logger.Info("HTTP endpoint opened for Engine API", engineInfo...) return engineListener, engineSrv, engineAddr.String(), nil } diff --git a/cmd/rpcdaemon/commands/eth_block_test.go b/cmd/rpcdaemon/commands/eth_block_test.go index 81f3f5bf7ff..a32a1ffd4fb 100644 --- a/cmd/rpcdaemon/commands/eth_block_test.go +++ b/cmd/rpcdaemon/commands/eth_block_test.go @@ -79,7 +79,7 @@ func TestGetBlockByNumberWithPendingTag(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}) + ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) expected := 1 header := &types.Header{ diff --git a/cmd/rpcdaemon/commands/eth_call_test.go b/cmd/rpcdaemon/commands/eth_call_test.go index 18831d5df1d..c6812f1ff5f 100644 --- a/cmd/rpcdaemon/commands/eth_call_test.go +++ b/cmd/rpcdaemon/commands/eth_call_test.go @@ -43,7 +43,7 @@ func TestEstimateGas(t *testing.T) { stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}) + ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) api := NewEthAPI(NewBaseApi(ff, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 100_000, log.New()) var from = libcommon.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") var to = libcommon.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e") diff --git a/cmd/rpcdaemon/commands/eth_filters_test.go b/cmd/rpcdaemon/commands/eth_filters_test.go index 0e13a5de5fb..30d617238c7 100644 --- a/cmd/rpcdaemon/commands/eth_filters_test.go +++ b/cmd/rpcdaemon/commands/eth_filters_test.go @@ -31,7 +31,7 @@ func TestNewFilters(t *testing.T) { stateCache := kvcache.New(kvcache.DefaultCoherentConfig) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}) + ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) api := NewEthAPI(NewBaseApi(ff, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, nil, nil, 5000000, 100_000, log.New()) ptf, err := api.NewPendingTransactionFilter(ctx) @@ -57,9 +57,10 @@ func TestNewFilters(t *testing.T) { } func TestLogsSubscribeAndUnsubscribe_WithoutConcurrentMapIssue(t *testing.T) { - ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) + m := stages.Mock(t) + ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}) + ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) // generate some random topics topics := make([][]libcommon.Hash, 0) diff --git a/cmd/rpcdaemon/commands/eth_mining_test.go b/cmd/rpcdaemon/commands/eth_mining_test.go index a7c77cf4f45..d5ca063fe75 100644 --- a/cmd/rpcdaemon/commands/eth_mining_test.go +++ b/cmd/rpcdaemon/commands/eth_mining_test.go @@ -24,7 +24,7 @@ func TestPendingBlock(t *testing.T) { m := stages.Mock(t) ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}) + ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) engine := ethash.NewFaker() api := NewEthAPI(NewBaseApi(ff, stateCache, snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3), nil, false, rpccfg.DefaultEvmCallTimeout, engine, @@ -48,9 +48,10 @@ func TestPendingBlock(t *testing.T) { } func TestPendingLogs(t *testing.T) { - ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) + m := stages.Mock(t) + ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) mining := txpool.NewMiningClient(conn) - ff := rpchelper.New(ctx, nil, nil, mining, func() {}) + ff := rpchelper.New(ctx, nil, nil, mining, func() {}, m.Log) expect := []byte{211} ch, id := ff.SubscribePendingLogs(1) diff --git a/cmd/rpcdaemon/commands/eth_subscribe_test.go b/cmd/rpcdaemon/commands/eth_subscribe_test.go index 6aa4f77fbf8..a2eee7fc56a 100644 --- a/cmd/rpcdaemon/commands/eth_subscribe_test.go +++ b/cmd/rpcdaemon/commands/eth_subscribe_test.go @@ -46,7 +46,7 @@ func TestEthSubscribe(t *testing.T) { backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, br, nil, nil, nil, false, logger) backendClient := direct.NewEthBackendClientDirect(backendServer) backend := rpcservices.NewRemoteBackend(backendClient, m.DB, br) - ff := rpchelper.New(ctx, backend, nil, nil, func() {}) + ff := rpchelper.New(ctx, backend, nil, nil, func() {}, m.Log) newHeads, id := ff.SubscribeNewHeads(16) defer ff.UnsubscribeHeads(id) diff --git a/cmd/rpcdaemon/commands/send_transaction_test.go b/cmd/rpcdaemon/commands/send_transaction_test.go index 59d167bfc13..306fe28474c 100644 --- a/cmd/rpcdaemon/commands/send_transaction_test.go +++ b/cmd/rpcdaemon/commands/send_transaction_test.go @@ -73,7 +73,7 @@ func TestSendRawTransaction(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}) + ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) stateCache := kvcache.New(kvcache.DefaultCoherentConfig) br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := commands.NewEthAPI(commands.NewBaseApi(ff, stateCache, br, nil, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, nil, txPool, nil, 5000000, 100_000, logger) diff --git a/cmd/rpcdaemon/commands/txpool_api_test.go b/cmd/rpcdaemon/commands/txpool_api_test.go index fe6fd07fd4c..55c6a6f3eb9 100644 --- a/cmd/rpcdaemon/commands/txpool_api_test.go +++ b/cmd/rpcdaemon/commands/txpool_api_test.go @@ -34,7 +34,7 @@ func TestTxPoolContent(t *testing.T) { ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) - ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}) + ff := rpchelper.New(ctx, nil, txPool, txpool.NewMiningClient(conn), func() {}, m.Log) agg := m.HistoryV3Components() br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3) api := NewTxPoolAPI(NewBaseApi(ff, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs), m.DB, txPool) diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index 3f53d5f66b1..ba4522e1da9 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -37,7 +37,7 @@ func main() { // TODO: Replace with correct consensus Engine engine := ethash.NewFaker() apiList := commands.APIList(db, borDb, backend, txPool, mining, ff, stateCache, blockReader, agg, *cfg, engine, logger) - if err := cli.StartRpcServer(ctx, *cfg, apiList, nil); err != nil { + if err := cli.StartRpcServer(ctx, *cfg, apiList, nil, logger); err != nil { logger.Error(err.Error()) return nil } diff --git a/eth/backend.go b/eth/backend.go index 4b1ec015345..e8a10f5eac7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -564,7 +564,8 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere stack.Config().PrivateApiAddr, stack.Config().PrivateApiRateLimit, creds, - stack.Config().HealthCheck) + stack.Config().HealthCheck, + logger) if err != nil { return nil, fmt.Errorf("private api: %w", err) } @@ -740,7 +741,8 @@ func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error } // start HTTP API httpRpcCfg := stack.Config().Http - ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC, backend.txPool2GrpcServer, miningRPC, stateDiffClient) + ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC, + backend.txPool2GrpcServer, miningRPC, stateDiffClient, backend.logger) if err != nil { return err } @@ -752,7 +754,7 @@ func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error apiList := commands.APIList(chainKv, borDb, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, backend.agg, httpRpcCfg, backend.engine, backend.logger) authApiList := commands.AuthAPIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, backend.agg, httpRpcCfg, backend.engine, backend.logger) go func() { - if err := cli.StartRpcServer(ctx, httpRpcCfg, apiList, authApiList); err != nil { + if err := cli.StartRpcServer(ctx, httpRpcCfg, apiList, authApiList, backend.logger); err != nil { backend.logger.Error(err.Error()) return } diff --git a/ethdb/privateapi/all.go b/ethdb/privateapi/all.go index d374074732f..817dec06152 100644 --- a/ethdb/privateapi/all.go +++ b/ethdb/privateapi/all.go @@ -17,8 +17,8 @@ import ( func StartGrpc(kv *remotedbserver.KvServer, ethBackendSrv *EthBackendServer, txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto.MiningServer, addr string, rateLimit uint32, creds credentials.TransportCredentials, - healthCheck bool) (*grpc.Server, error) { - log.Info("Starting private RPC server", "on", addr) + healthCheck bool, logger log.Logger) (*grpc.Server, error) { + logger.Info("Starting private RPC server", "on", addr) lis, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("could not create listener: %w, addr=%s", err, addr) @@ -43,7 +43,7 @@ func StartGrpc(kv *remotedbserver.KvServer, ethBackendSrv *EthBackendServer, txP defer healthServer.Shutdown() } if err := grpcServer.Serve(lis); err != nil { - log.Error("private RPC server fail", "err", err) + logger.Error("private RPC server fail", "err", err) } }() diff --git a/node/rpcstack.go b/node/rpcstack.go index 86240416d51..c5f3f99ac71 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -176,7 +176,7 @@ func (h *httpServer) start() error { for _, path := range paths { name := h.handlerNames[path] if !logged[name] { - log.Info(name+" enabled", "url", "http://"+listener.Addr().String()+path) + h.log.Info(name+" enabled", "url", "http://"+listener.Addr().String()+path) logged[name] = true } } @@ -267,7 +267,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig, allowList rpc. // Create RPC server and handler. srv := rpc.NewServer(50, false /* traceRequests */, true) srv.SetAllowList(allowList) - if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil { + if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false, h.log); err != nil { return err } h.httpConfig = config @@ -300,12 +300,12 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig, allowList rpc.All // Create RPC server and handler. srv := rpc.NewServer(50, false /* traceRequests */, true) srv.SetAllowList(allowList) - if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil { + if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false, h.log); err != nil { return err } h.wsConfig = config h.wsHandler.Store(&rpcHandler{ - Handler: srv.WebsocketHandler(config.Origins, nil, false), + Handler: srv.WebsocketHandler(config.Origins, nil, false, h.log), server: srv, }) return nil @@ -451,9 +451,9 @@ func newGzipHandler(next http.Handler) http.Handler { // RegisterApisFromWhitelist checks the given modules' availability, generates a whitelist based on the allowed modules, // and then registers all of the APIs exposed by the services. -func RegisterApisFromWhitelist(apis []rpc.API, modules []string, srv *rpc.Server, exposeAll bool) error { +func RegisterApisFromWhitelist(apis []rpc.API, modules []string, srv *rpc.Server, exposeAll bool, logger log.Logger) error { if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 { - log.Error("Non-existing modules in HTTP API list, please remove it", "non-existing", bad, "existing", available) + logger.Error("Non-existing modules in HTTP API list, please remove it", "non-existing", bad, "existing", available) } // Generate the whitelist based on the allowed modules whitelist := make(map[string]bool) diff --git a/rpc/client_test.go b/rpc/client_test.go index 5af53af3538..46e95fcb487 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -502,13 +502,14 @@ func TestClientHTTP(t *testing.T) { } func TestClientReconnect(t *testing.T) { + logger := log.New() startServer := func(addr string) (*Server, net.Listener) { srv := newTestServer() l, err := net.Listen("tcp", addr) if err != nil { t.Fatal("can't listen:", err) } - go http.Serve(l, srv.WebsocketHandler([]string{"*"}, nil, false)) + go http.Serve(l, srv.WebsocketHandler([]string{"*"}, nil, false, logger)) return srv, l } @@ -569,11 +570,12 @@ func TestClientReconnect(t *testing.T) { } func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) { + logger := log.New() // Create the HTTP server. var hs *httptest.Server switch transport { case "ws": - hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"}, nil, false)) + hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"}, nil, false, logger)) case "http": hs = httptest.NewUnstartedServer(srv) default: diff --git a/rpc/websocket.go b/rpc/websocket.go index cb2415918d8..b86a7cd71e8 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -46,13 +46,13 @@ var wsBufferPool = new(sync.Pool) // // allowedOrigins should be a comma-separated list of allowed origin URLs. // To allow connections with any origin, pass "*". -func (s *Server) WebsocketHandler(allowedOrigins []string, jwtSecret []byte, compression bool) http.Handler { +func (s *Server) WebsocketHandler(allowedOrigins []string, jwtSecret []byte, compression bool, logger log.Logger) http.Handler { upgrader := websocket.Upgrader{ EnableCompression: compression, ReadBufferSize: wsReadBuffer, WriteBufferSize: wsWriteBuffer, WriteBufferPool: wsBufferPool, - CheckOrigin: wsHandshakeValidator(allowedOrigins), + CheckOrigin: wsHandshakeValidator(allowedOrigins, logger), } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if jwtSecret != nil && !CheckJwtSecret(w, r, jwtSecret) { @@ -60,7 +60,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string, jwtSecret []byte, com } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { - log.Warn("WebSocket upgrade failed", "err", err) + logger.Warn("WebSocket upgrade failed", "err", err) return } codec := newWebsocketCodec(conn) @@ -71,7 +71,7 @@ func (s *Server) WebsocketHandler(allowedOrigins []string, jwtSecret []byte, com // wsHandshakeValidator returns a handler that verifies the origin during the // websocket upgrade process. When a '*' is specified as an allowed origins all // connections are accepted. -func wsHandshakeValidator(allowedOrigins []string) func(*http.Request) bool { +func wsHandshakeValidator(allowedOrigins []string, logger log.Logger) func(*http.Request) bool { origins := mapset.NewSet() allowAllOrigins := false @@ -90,7 +90,7 @@ func wsHandshakeValidator(allowedOrigins []string) func(*http.Request) bool { origins.Add("http://" + hostname) } } - log.Trace(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v", origins.ToSlice())) + logger.Trace(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v", origins.ToSlice())) f := func(req *http.Request) bool { // Skip origin verification if no Origin header is present. The origin check @@ -102,10 +102,10 @@ func wsHandshakeValidator(allowedOrigins []string) func(*http.Request) bool { } // Verify origin against whitelist. origin := strings.ToLower(req.Header.Get("Origin")) - if allowAllOrigins || originIsAllowed(origins, origin) { + if allowAllOrigins || originIsAllowed(origins, origin, logger) { return true } - log.Warn("Rejected WebSocket connection", "origin", origin) + logger.Warn("Rejected WebSocket connection", "origin", origin) return false } @@ -125,17 +125,17 @@ func (e wsHandshakeError) Error() string { return s } -func originIsAllowed(allowedOrigins mapset.Set, browserOrigin string) bool { +func originIsAllowed(allowedOrigins mapset.Set, browserOrigin string, logger log.Logger) bool { it := allowedOrigins.Iterator() for origin := range it.C { - if ruleAllowsOrigin(origin.(string), browserOrigin) { + if ruleAllowsOrigin(origin.(string), browserOrigin, logger) { return true } } return false } -func ruleAllowsOrigin(allowedOrigin string, browserOrigin string) bool { +func ruleAllowsOrigin(allowedOrigin string, browserOrigin string, logger log.Logger) bool { var ( allowedScheme, allowedHostname, allowedPort string browserScheme, browserHostname, browserPort string @@ -143,12 +143,12 @@ func ruleAllowsOrigin(allowedOrigin string, browserOrigin string) bool { ) allowedScheme, allowedHostname, allowedPort, err = parseOriginURL(allowedOrigin) if err != nil { - log.Warn("Error parsing allowed origin specification", "spec", allowedOrigin, "err", err) + logger.Warn("Error parsing allowed origin specification", "spec", allowedOrigin, "err", err) return false } browserScheme, browserHostname, browserPort, err = parseOriginURL(browserOrigin) if err != nil { - log.Warn("Error parsing browser 'Origin' field", "Origin", browserOrigin, "err", err) + logger.Warn("Error parsing browser 'Origin' field", "Origin", browserOrigin, "err", err) return false } if allowedScheme != "" && allowedScheme != browserScheme { diff --git a/rpc/websocket_test.go b/rpc/websocket_test.go index cd0fbb1c0b6..178ba956072 100644 --- a/rpc/websocket_test.go +++ b/rpc/websocket_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/ledgerwatch/log/v3" ) func TestWebsocketClientHeaders(t *testing.T) { @@ -51,9 +52,10 @@ func TestWebsocketClientHeaders(t *testing.T) { func TestWebsocketOriginCheck(t *testing.T) { t.Parallel() + logger := log.New() var ( srv = newTestServer() - httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"http://example.com"}, nil, false)) + httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"http://example.com"}, nil, false, logger)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() @@ -80,10 +82,11 @@ func TestWebsocketOriginCheck(t *testing.T) { // This test checks whether calls exceeding the request size limit are rejected. func TestWebsocketLargeCall(t *testing.T) { t.Parallel() + logger := log.New() var ( srv = newTestServer() - httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"}, nil, false)) + httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"}, nil, false, logger)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() @@ -162,9 +165,10 @@ func TestClientWebsocketPing(t *testing.T) { // This checks that the websocket transport can deal with large messages. func TestClientWebsocketLargeMessage(t *testing.T) { + logger := log.New() var ( srv = NewServer(50, false /* traceRequests */, true) - httpsrv = httptest.NewServer(srv.WebsocketHandler(nil, nil, false)) + httpsrv = httptest.NewServer(srv.WebsocketHandler(nil, nil, false, logger)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() diff --git a/turbo/rpchelper/filters.go b/turbo/rpchelper/filters.go index 690010902a7..48234a2eec4 100644 --- a/turbo/rpchelper/filters.go +++ b/turbo/rpchelper/filters.go @@ -43,10 +43,11 @@ type Filters struct { logsStores *SyncMap[LogsSubID, []*types.Log] pendingHeadsStores *SyncMap[HeadsSubID, []*types.Header] pendingTxsStores *SyncMap[PendingTxsSubID, [][]types.Transaction] + logger log.Logger } -func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, onNewSnapshot func()) *Filters { - log.Info("rpc filters: subscribing to Erigon events") +func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, onNewSnapshot func(), logger log.Logger) *Filters { + logger.Info("rpc filters: subscribing to Erigon events") ff := &Filters{ headsSubs: NewSyncMap[HeadsSubID, Sub[*types.Header]](), @@ -58,6 +59,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, logsStores: NewSyncMap[LogsSubID, []*types.Log](), pendingHeadsStores: NewSyncMap[HeadsSubID, []*types.Header](), pendingTxsStores: NewSyncMap[PendingTxsSubID, [][]types.Transaction](), + logger: logger, } go func() { @@ -81,7 +83,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, time.Sleep(3 * time.Second) continue } - log.Warn("rpc filters: error subscribing to events", "err", err) + logger.Warn("rpc filters: error subscribing to events", "err", err) } } }() @@ -106,7 +108,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, time.Sleep(3 * time.Second) continue } - log.Warn("rpc filters: error subscribing to logs", "err", err) + logger.Warn("rpc filters: error subscribing to logs", "err", err) } } }() @@ -129,7 +131,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, time.Sleep(3 * time.Second) continue } - log.Warn("rpc filters: error subscribing to pending transactions", "err", err) + logger.Warn("rpc filters: error subscribing to pending transactions", "err", err) } } }() @@ -152,7 +154,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, time.Sleep(3 * time.Second) continue } - log.Warn("rpc filters: error subscribing to pending blocks", "err", err) + logger.Warn("rpc filters: error subscribing to pending blocks", "err", err) } } }() @@ -173,7 +175,7 @@ func New(ctx context.Context, ethBackend ApiBackend, txPool txpool.TxpoolClient, time.Sleep(3 * time.Second) continue } - log.Warn("rpc filters: error subscribing to pending logs", "err", err) + logger.Warn("rpc filters: error subscribing to pending logs", "err", err) } } }() @@ -197,7 +199,7 @@ func (ff *Filters) subscribeToPendingTransactions(ctx context.Context, txPool tx for { event, err := subscription.Recv() if errors.Is(err, io.EOF) { - log.Debug("rpcdaemon: the subscription to pending transactions channel was closed") + ff.logger.Debug("rpcdaemon: the subscription to pending transactions channel was closed") break } if err != nil { @@ -223,7 +225,7 @@ func (ff *Filters) subscribeToPendingBlocks(ctx context.Context, mining txpool.M event, err := subscription.Recv() if errors.Is(err, io.EOF) { - log.Debug("rpcdaemon: the subscription to pending blocks channel was closed") + ff.logger.Debug("rpcdaemon: the subscription to pending blocks channel was closed") break } if err != nil { @@ -241,7 +243,7 @@ func (ff *Filters) HandlePendingBlock(reply *txpool.OnPendingBlockReply) { return } if err := rlp.Decode(bytes.NewReader(reply.RplBlock), b); err != nil { - log.Warn("OnNewPendingBlock rpc filters, unprocessable payload", "err", err) + ff.logger.Warn("OnNewPendingBlock rpc filters, unprocessable payload", "err", err) } ff.mu.Lock() @@ -267,7 +269,7 @@ func (ff *Filters) subscribeToPendingLogs(ctx context.Context, mining txpool.Min } event, err := subscription.Recv() if errors.Is(err, io.EOF) { - log.Debug("rpcdaemon: the subscription to pending logs channel was closed") + ff.logger.Debug("rpcdaemon: the subscription to pending logs channel was closed") break } if err != nil { @@ -285,7 +287,7 @@ func (ff *Filters) HandlePendingLogs(reply *txpool.OnPendingLogsReply) { } l := []*types.Log{} if err := rlp.Decode(bytes.NewReader(reply.RplLogs), &l); err != nil { - log.Warn("OnNewPendingLogs rpc filters, unprocessable payload", "err", err) + ff.logger.Warn("OnNewPendingLogs rpc filters, unprocessable payload", "err", err) } ff.pendingLogsSubs.Range(func(k PendingLogsSubID, v Sub[types.Logs]) error { v.Send(l) @@ -402,7 +404,7 @@ func (ff *Filters) SubscribeLogs(size int, crit filters.FilterCriteria) (<-chan loaded := ff.loadLogsRequester() if loaded != nil { if err := loaded.(func(*remote.LogsFilterRequest) error)(lfr); err != nil { - log.Warn("Could not update remote logs filter", "err", err) + ff.logger.Warn("Could not update remote logs filter", "err", err) ff.logsSubs.removeLogsFilter(id) } } @@ -433,7 +435,7 @@ func (ff *Filters) UnsubscribeLogs(id LogsSubID) bool { loaded := ff.loadLogsRequester() if loaded != nil { if err := loaded.(func(*remote.LogsFilterRequest) error)(lfr); err != nil { - log.Warn("Could not update remote logs filter", "err", err) + ff.logger.Warn("Could not update remote logs filter", "err", err) return isDeleted || ff.logsSubs.removeLogsFilter(id) } } @@ -451,7 +453,7 @@ func (ff *Filters) deleteLogStore(id LogsSubID) { func (ff *Filters) OnNewEvent(event *remote.SubscribeReply) { err := ff.onNewEvent(event) if err != nil { - log.Warn("OnNewEvent Filters", "event", event.Type, "err", err) + ff.logger.Warn("OnNewEvent Filters", "event", event.Type, "err", err) } } @@ -529,7 +531,7 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) { txs[i], decodeErr = types.DecodeWrappedTransaction(rlpTx) if decodeErr != nil { // ignoring what we can't unmarshal - log.Warn("OnNewTx rpc filters, unprocessable payload", "err", decodeErr, "data", hex.EncodeToString(rlpTx)) + ff.logger.Warn("OnNewTx rpc filters, unprocessable payload", "err", decodeErr, "data", hex.EncodeToString(rlpTx)) break } } diff --git a/turbo/rpchelper/filters_deadlock_test.go b/turbo/rpchelper/filters_deadlock_test.go index b0ac84c4319..7e6ec6838e2 100644 --- a/turbo/rpchelper/filters_deadlock_test.go +++ b/turbo/rpchelper/filters_deadlock_test.go @@ -12,10 +12,12 @@ import ( "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/filters" "github.com/ledgerwatch/erigon/turbo/rpchelper" + "github.com/ledgerwatch/log/v3" ) func TestFiltersDeadlock_Test(t *testing.T) { - f := rpchelper.New(context.TODO(), nil, nil, nil, func() {}) + logger := log.New() + f := rpchelper.New(context.TODO(), nil, nil, nil, func() {}, logger) crit := filters.FilterCriteria{ Addresses: nil, Topics: [][]libcommon.Hash{}, diff --git a/turbo/rpchelper/filters_test.go b/turbo/rpchelper/filters_test.go index b8cc1918d85..bdf61f98fb3 100644 --- a/turbo/rpchelper/filters_test.go +++ b/turbo/rpchelper/filters_test.go @@ -11,6 +11,7 @@ import ( types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon/eth/filters" + "github.com/ledgerwatch/log/v3" ) func createLog() *remote.SubscribeLogsReply { @@ -55,7 +56,7 @@ func TestFilters_GenerateSubscriptionID(t *testing.T) { } func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing.T) { - f := New(context.TODO(), nil, nil, nil, func() {}) + f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) subbedTopic := libcommon.BytesToHash([]byte{10, 20}) @@ -86,7 +87,7 @@ func TestFilters_SingleSubscription_OnlyTopicsSubscribedAreBroadcast(t *testing. } func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAreBroadcast(t *testing.T) { - f := New(context.TODO(), nil, nil, nil, func() {}) + f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) var nilTopic libcommon.Hash subbedTopic := libcommon.BytesToHash([]byte{10, 20}) @@ -118,7 +119,7 @@ func TestFilters_SingleSubscription_EmptyTopicsInCriteria_OnlyTopicsSubscribedAr } func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) { - f := New(context.TODO(), nil, nil, nil, func() {}) + f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) criteria1 := filters.FilterCriteria{ Addresses: nil, @@ -157,7 +158,7 @@ func TestFilters_TwoSubscriptionsWithDifferentCriteria(t *testing.T) { } func TestFilters_ThreeSubscriptionsWithDifferentCriteria(t *testing.T) { - f := New(context.TODO(), nil, nil, nil, func() {}) + f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) criteria1 := filters.FilterCriteria{ Addresses: nil, @@ -231,7 +232,7 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) { return nil } - f := New(context.TODO(), nil, nil, nil, func() {}) + f := New(context.TODO(), nil, nil, nil, func() {}, log.New()) f.logsRequestor.Store(loadRequester) // first request has no filters