diff --git a/cmd/XDC/main.go b/cmd/XDC/main.go index 864e5e1cd087..dcaad38cd9c2 100644 --- a/cmd/XDC/main.go +++ b/cmd/XDC/main.go @@ -147,7 +147,9 @@ var ( utils.RPCGlobalGasCapFlag, utils.RPCListenAddrFlag, utils.RPCPortFlag, + utils.RPCHttpReadTimeoutFlag, utils.RPCHttpWriteTimeoutFlag, + utils.RPCHttpIdleTimeoutFlag, utils.RPCApiFlag, utils.WSEnabledFlag, utils.WSListenAddrFlag, diff --git a/cmd/XDC/usage.go b/cmd/XDC/usage.go index b724155162a5..39a7c4367320 100644 --- a/cmd/XDC/usage.go +++ b/cmd/XDC/usage.go @@ -147,7 +147,9 @@ var AppHelpFlagGroups = []flagGroup{ utils.RPCGlobalGasCapFlag, utils.RPCListenAddrFlag, utils.RPCPortFlag, + utils.RPCHttpReadTimeoutFlag, utils.RPCHttpWriteTimeoutFlag, + utils.RPCHttpIdleTimeoutFlag, utils.RPCApiFlag, utils.WSEnabledFlag, utils.WSListenAddrFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5d661fa0e77c..8041c2f337af 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -435,10 +435,20 @@ var ( Usage: "HTTP-RPC server listening port", Value: node.DefaultHTTPPort, } + RPCHttpReadTimeoutFlag = cli.DurationFlag{ + Name: "rpcreadtimeout", + Usage: "HTTP-RPC server read timeout", + Value: rpc.DefaultHTTPTimeouts.ReadTimeout, + } RPCHttpWriteTimeoutFlag = cli.DurationFlag{ Name: "rpcwritetimeout", - Usage: "HTTP-RPC server write timeout (default = 10s)", - Value: node.DefaultHTTPWriteTimeOut, + Usage: "HTTP-RPC server write timeout", + Value: rpc.DefaultHTTPTimeouts.WriteTimeout, + } + RPCHttpIdleTimeoutFlag = cli.DurationFlag{ + Name: "rpcidletimeout", + Usage: "HTTP-RPC server idle timeout", + Value: rpc.DefaultHTTPTimeouts.IdleTimeout, } RPCCORSDomainFlag = cli.StringFlag{ Name: "rpccorsdomain", @@ -779,8 +789,14 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) { if ctx.GlobalIsSet(RPCPortFlag.Name) { cfg.HTTPPort = ctx.GlobalInt(RPCPortFlag.Name) } + if ctx.GlobalIsSet(RPCHttpReadTimeoutFlag.Name) { + cfg.HTTPTimeouts.ReadTimeout = ctx.GlobalDuration(RPCHttpReadTimeoutFlag.Name) + } if ctx.GlobalIsSet(RPCHttpWriteTimeoutFlag.Name) { - cfg.HTTPWriteTimeout = ctx.GlobalDuration(RPCHttpWriteTimeoutFlag.Name) + cfg.HTTPTimeouts.WriteTimeout = ctx.GlobalDuration(RPCHttpWriteTimeoutFlag.Name) + } + if ctx.GlobalIsSet(RPCHttpIdleTimeoutFlag.Name) { + cfg.HTTPTimeouts.IdleTimeout = ctx.GlobalDuration(RPCHttpIdleTimeoutFlag.Name) } if ctx.GlobalIsSet(RPCCORSDomainFlag.Name) { cfg.HTTPCors = splitAndTrim(ctx.GlobalString(RPCCORSDomainFlag.Name)) diff --git a/node/api.go b/node/api.go index 8a3611524418..4db65d762fd5 100644 --- a/node/api.go +++ b/node/api.go @@ -158,7 +158,7 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis } } - if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts); err != nil { + if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts, api.node.config.HTTPTimeouts); err != nil { return false, err } return true, nil diff --git a/node/config.go b/node/config.go index 848d563e9fa2..1984120f9ff3 100644 --- a/node/config.go +++ b/node/config.go @@ -23,7 +23,6 @@ import ( "path/filepath" "runtime" "strings" - "time" "github.com/XinFinOrg/XDPoSChain/accounts" "github.com/XinFinOrg/XDPoSChain/accounts/keystore" @@ -33,6 +32,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/p2p" "github.com/XinFinOrg/XDPoSChain/p2p/discover" + "github.com/XinFinOrg/XDPoSChain/rpc" ) const ( @@ -100,9 +100,6 @@ type Config struct { // for ephemeral nodes). HTTPPort int `toml:",omitempty"` - // HTTPWriteTimeout is the write timeout for the HTTP RPC server. - HTTPWriteTimeout time.Duration `toml:",omitempty"` - // HTTPCors is the Cross-Origin Resource Sharing header to send to requesting // clients. Please be aware that CORS is a browser enforced security, it's fully // useless for custom HTTP clients. @@ -122,6 +119,10 @@ type Config struct { // exposed. HTTPModules []string `toml:",omitempty"` + // HTTPTimeouts allows for customization of the timeout values used by the HTTP RPC + // interface. + HTTPTimeouts rpc.HTTPTimeouts + // WSHost is the host interface on which to start the websocket RPC server. If // this field is empty, no websocket API endpoint will be started. WSHost string `toml:",omitempty"` diff --git a/node/defaults.go b/node/defaults.go index f9f4f8f2688e..ff9348929554 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -21,27 +21,26 @@ import ( "os/user" "path/filepath" "runtime" - "time" "github.com/XinFinOrg/XDPoSChain/p2p" "github.com/XinFinOrg/XDPoSChain/p2p/nat" + "github.com/XinFinOrg/XDPoSChain/rpc" ) const ( - DefaultHTTPHost = "localhost" // Default host interface for the HTTP RPC server - DefaultHTTPPort = 8545 // Default TCP port for the HTTP RPC server - DefaultHTTPWriteTimeOut = 10 * time.Second // Default write timeout for the HTTP RPC server - DefaultWSHost = "localhost" // Default host interface for the websocket RPC server - DefaultWSPort = 8546 // Default TCP port for the websocket RPC server + DefaultHTTPHost = "localhost" // Default host interface for the HTTP RPC server + DefaultHTTPPort = 8545 // Default TCP port for the HTTP RPC server + DefaultWSHost = "localhost" // Default host interface for the websocket RPC server + DefaultWSPort = 8546 // Default TCP port for the websocket RPC server ) // DefaultConfig contains reasonable default settings. var DefaultConfig = Config{ DataDir: DefaultDataDir(), HTTPPort: DefaultHTTPPort, - HTTPWriteTimeout: DefaultHTTPWriteTimeOut, HTTPModules: []string{"net", "web3"}, HTTPVirtualHosts: []string{"localhost"}, + HTTPTimeouts: rpc.DefaultHTTPTimeouts, WSPort: DefaultWSPort, WSModules: []string{"net", "web3"}, P2P: p2p.Config{ diff --git a/node/node.go b/node/node.go index a9f767bf28f1..c5adec4451bd 100644 --- a/node/node.go +++ b/node/node.go @@ -140,7 +140,7 @@ func (n *Node) Register(constructor ServiceConstructor) error { return nil } -// Start create a live P2P node and starts running it. +// Start creates a live P2P node and starts running it. func (n *Node) Start() error { n.lock.Lock() defer n.lock.Unlock() @@ -203,7 +203,7 @@ func (n *Node) Start() error { return convertFileLockError(err) } // Start each of the services - started := []reflect.Type{} + var started []reflect.Type for kind, service := range services { // Start the next service, stopping all previous upon failure if err := service.Start(running); err != nil { @@ -217,7 +217,7 @@ func (n *Node) Start() error { // Mark the service started for potential cleanup started = append(started, kind) } - // Lastly start the configured RPC interfaces + // Lastly, start the configured RPC interfaces if err := n.startRPC(services); err != nil { for _, service := range services { service.Stop() @@ -252,7 +252,7 @@ func (n *Node) openDataDir() error { return nil } -// startRPC is a helper method to start all the various RPC endpoint during node +// startRPC is a helper method to start all the various RPC endpoints during node // startup. It's not meant to be called at any time afterwards as it makes certain // assumptions about the state of the node. func (n *Node) startRPC(services map[reflect.Type]Service) error { @@ -269,7 +269,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { n.stopInProc() return err } - if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil { + if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts, n.config.HTTPTimeouts); err != nil { n.stopIPC() n.stopInProc() return err @@ -293,7 +293,7 @@ func (n *Node) startInProc(apis []rpc.API) error { if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } - n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace) + n.log.Debug("InProc registered", "namespace", api.Namespace) } n.inprocHandler = handler return nil @@ -320,51 +320,16 @@ func (n *Node) RegisterAPIs(apis []rpc.API) { // startIPC initializes and starts the IPC RPC endpoint. func (n *Node) startIPC(apis []rpc.API) error { - // Short circuit if the IPC endpoint isn't being exposed if n.ipcEndpoint == "" { - return nil + return nil // IPC disabled. } - // Register all the APIs exposed by the services - handler := rpc.NewServer() - for _, api := range apis { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - n.log.Debug("IPC registered", "service", api.Service, "namespace", api.Namespace) - } - // All APIs registered, start the IPC listener - var ( - listener net.Listener - err error - ) - if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil { + listener, handler, err := rpc.StartIPCEndpoint(n.ipcEndpoint, apis) + if err != nil { return err } - go func() { - n.log.Info("IPC endpoint opened", "url", n.ipcEndpoint) - - for { - conn, err := listener.Accept() - if err != nil { - // Terminate if the listener was closed - n.lock.RLock() - closed := n.ipcListener == nil - n.lock.RUnlock() - if closed { - return - } - // Not closed, just some error; report and continue - n.log.Error("IPC accept failed", "err", err) - continue - } - log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) - go handler.ServeCodec(rpc.NewCodec(conn), 0) - } - }() - // All listeners booted successfully n.ipcListener = listener n.ipcHandler = handler - + log.Info("IPC endpoint opened", "url", n.ipcEndpoint) return nil } @@ -374,7 +339,7 @@ func (n *Node) stopIPC() { n.ipcListener.Close() n.ipcListener = nil - n.log.Info("IPC endpoint closed", "endpoint", n.ipcEndpoint) + n.log.Info("IPC endpoint closed", "url", n.ipcEndpoint) } if n.ipcHandler != nil { n.ipcHandler.Stop() @@ -383,36 +348,18 @@ func (n *Node) stopIPC() { } // startHTTP initializes and starts the HTTP RPC endpoint. -func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string) error { +func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error { // Short circuit if the HTTP endpoint isn't being exposed if endpoint == "" { return nil } - // Generate the whitelist based on the allowed modules - whitelist := make(map[string]bool) - for _, module := range modules { - whitelist[module] = true - } - // Register all the APIs exposed by the services - handler := rpc.NewServer() - for _, api := range apis { - if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - n.log.Debug("HTTP registered", "service", api.Service, "namespace", api.Namespace) - } - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", endpoint); err != nil { + listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts) + if err != nil { return err } - go rpc.NewHTTPServer(cors, vhosts, handler, n.config.HTTPWriteTimeout).Serve(listener) - n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ",")) + n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", listener.Addr()), + "cors", strings.Join(cors, ","), + "vhosts", strings.Join(vhosts, ",")) // All listeners booted successfully n.httpEndpoint = endpoint n.httpListener = listener @@ -424,10 +371,10 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors // stopHTTP terminates the HTTP RPC endpoint. func (n *Node) stopHTTP() { if n.httpListener != nil { + url := fmt.Sprintf("http://%v/", n.httpListener.Addr()) n.httpListener.Close() n.httpListener = nil - - n.log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", n.httpEndpoint)) + n.log.Info("HTTP endpoint closed", "url", url) } if n.httpHandler != nil { n.httpHandler.Stop() @@ -441,32 +388,11 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig if endpoint == "" { return nil } - // Generate the whitelist based on the allowed modules - whitelist := make(map[string]bool) - for _, module := range modules { - whitelist[module] = true - } - // Register all the APIs exposed by the services - handler := rpc.NewServer() - for _, api := range apis { - if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - n.log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) - } - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", endpoint); err != nil { + listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll) + if err != nil { return err } - go rpc.NewWSServer(wsOrigins, handler).Serve(listener) n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr())) - // All listeners booted successfully n.wsEndpoint = endpoint n.wsListener = listener @@ -645,11 +571,23 @@ func (n *Node) IPCEndpoint() string { // HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack. func (n *Node) HTTPEndpoint() string { + n.lock.Lock() + defer n.lock.Unlock() + + if n.httpListener != nil { + return n.httpListener.Addr().String() + } return n.httpEndpoint } // WSEndpoint retrieves the current WS endpoint used by the protocol stack. func (n *Node) WSEndpoint() string { + n.lock.Lock() + defer n.lock.Unlock() + + if n.wsListener != nil { + return n.wsListener.Addr().String() + } return n.wsEndpoint } diff --git a/rpc/endpoints.go b/rpc/endpoints.go index 1f5770a45d50..21802c9542db 100644 --- a/rpc/endpoints.go +++ b/rpc/endpoints.go @@ -23,6 +23,64 @@ import ( "github.com/XinFinOrg/XDPoSChain/log" ) +// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules +func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string, timeouts HTTPTimeouts) (net.Listener, *Server, error) { + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("HTTP registered", "namespace", api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return nil, nil, err + } + go NewHTTPServer(cors, vhosts, timeouts, handler).Serve(listener) + return listener, handler, err +} + +// StartWSEndpoint starts a websocket endpoint +func StartWSEndpoint(endpoint string, apis []API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *Server, error) { + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := NewServer() + for _, api := range apis { + if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return nil, nil, err + } + go NewWSServer(wsOrigins, handler).Serve(listener) + return listener, handler, err +} + // StartIPCEndpoint starts an IPC endpoint. func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) { // Register all the APIs exposed by the services. @@ -36,6 +94,7 @@ func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, er log.Info("IPC registration failed", "namespace", api.Namespace, "error", err) return nil, nil, err } + log.Debug("IPC registered", "service", api.Service, "namespace", api.Namespace) if _, ok := regMap[api.Namespace]; !ok { registered = append(registered, api.Namespace) regMap[api.Namespace] = struct{}{} diff --git a/rpc/http.go b/rpc/http.go index 6f267aa9d66e..2416d66753b4 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -240,17 +240,35 @@ func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil } // NewHTTPServer creates a new HTTP RPC server around an API provider. // // Deprecated: Server implements http.Handler -func NewHTTPServer(cors []string, vhosts []string, srv *Server, writeTimeout time.Duration) *http.Server { +func NewHTTPServer(cors []string, vhosts []string, timeouts HTTPTimeouts, srv *Server) *http.Server { // Wrap the CORS-handler within a host-handler handler := newCorsHandler(srv, cors) handler = newVHostHandler(vhosts, handler) - handler = http.TimeoutHandler(handler, writeTimeout, `{"error":"http server timeout"}`) - log.Info("NewHTTPServer", "writeTimeout", writeTimeout) + + // Make sure timeout values are meaningful + if timeouts.ReadTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", DefaultHTTPTimeouts.ReadTimeout) + timeouts.ReadTimeout = DefaultHTTPTimeouts.ReadTimeout + } + if timeouts.WriteTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", DefaultHTTPTimeouts.WriteTimeout) + timeouts.WriteTimeout = DefaultHTTPTimeouts.WriteTimeout + } + if timeouts.IdleTimeout < time.Second { + log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", DefaultHTTPTimeouts.IdleTimeout) + timeouts.IdleTimeout = DefaultHTTPTimeouts.IdleTimeout + } + + // PR #469: return http code 503 and error message to client when timeout + handler = http.TimeoutHandler(handler, timeouts.WriteTimeout, `{"error":"http server timeout"}`) + log.Info("NewHTTPServer", "writeTimeout", timeouts.WriteTimeout) + + // Bundle and start the HTTP server return &http.Server{ Handler: handler, - ReadTimeout: 5 * time.Second, - WriteTimeout: writeTimeout + time.Second, - IdleTimeout: 120 * time.Second, + ReadTimeout: timeouts.ReadTimeout, + WriteTimeout: timeouts.WriteTimeout + time.Second, + IdleTimeout: timeouts.IdleTimeout, } } diff --git a/rpc/ipc.go b/rpc/ipc.go index 77e06b491950..e617e37ca7c1 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -24,23 +24,17 @@ import ( "github.com/XinFinOrg/XDPoSChain/p2p/netutil" ) -// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on -// Windows this is a named pipe -func CreateIPCListener(endpoint string) (net.Listener, error) { - return ipcListen(endpoint) -} - -// ServeListener accepts connections on l, serving JSON-RPC on them. +// ServeListener accepts connections on l, serving IPC-RPC on them. func (s *Server) ServeListener(l net.Listener) error { for { conn, err := l.Accept() if netutil.IsTemporaryError(err) { - log.Warn("RPC accept error", "err", err) + log.Warn("IPC accept error", "err", err) continue } else if err != nil { return err } - log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) + log.Trace("IPC accepted connection") go s.ServeCodec(NewCodec(conn), 0) } }