Skip to content

Commit

Permalink
dot, dot/rpc: implement basic websocket support for RPC (#791)
Browse files Browse the repository at this point in the history
* implement basic websocket support for RPC

* remove redundant code

* update tests, lint

* added tests

* merge conflicts

* lint issues
  • Loading branch information
edwardmack authored Apr 23, 2020
1 parent 2eb3fb4 commit 6f62c61
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 13 deletions.
5 changes: 5 additions & 0 deletions cmd/gossamer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func TestRPCConfigFromFlags(t *testing.T) {
Port: testCfg.RPC.Port,
Host: testCfg.RPC.Host,
Modules: testCfg.RPC.Modules,
WSPort: testCfg.RPC.WSPort,
},
},
{
Expand All @@ -425,6 +426,7 @@ func TestRPCConfigFromFlags(t *testing.T) {
Port: testCfg.RPC.Port,
Host: testCfg.RPC.Host,
Modules: testCfg.RPC.Modules,
WSPort: testCfg.RPC.WSPort,
},
},
{
Expand All @@ -436,6 +438,7 @@ func TestRPCConfigFromFlags(t *testing.T) {
Port: testCfg.RPC.Port,
Host: "testhost",
Modules: testCfg.RPC.Modules,
WSPort: testCfg.RPC.WSPort,
},
},
{
Expand All @@ -447,6 +450,7 @@ func TestRPCConfigFromFlags(t *testing.T) {
Port: 5678,
Host: testCfg.RPC.Host,
Modules: testCfg.RPC.Modules,
WSPort: testCfg.RPC.WSPort,
},
},
{
Expand All @@ -458,6 +462,7 @@ func TestRPCConfigFromFlags(t *testing.T) {
Port: testCfg.RPC.Port,
Host: testCfg.RPC.Host,
Modules: []string{"mod1", "mod2"},
WSPort: testCfg.RPC.WSPort,
},
},
}
Expand Down
3 changes: 3 additions & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type RPCConfig struct {
Port uint32 `toml:"port"`
Host string `toml:"host"`
Modules []string `toml:"modules"`
WSPort uint32 `toml:"ws-port"`
}

// String will return the json representation for a Config
Expand Down Expand Up @@ -128,6 +129,7 @@ func GssmrConfig() *Config {
Port: gssmr.DefaultRPCHTTPPort,
Host: gssmr.DefaultRPCHTTPHost,
Modules: gssmr.DefaultRPCModules,
WSPort: gssmr.DefaultRPCWSPort,
},
}
}
Expand Down Expand Up @@ -162,6 +164,7 @@ func KsmccConfig() *Config {
Port: ksmcc.DefaultRPCHTTPPort,
Host: ksmcc.DefaultRPCHTTPHost,
Modules: ksmcc.DefaultRPCModules,
WSPort: ksmcc.DefaultRPCWSPort,
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions dot/rpc/dot_up_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rpc

import (
"fmt"
"net/http"
"strings"
"unicode"
Expand Down Expand Up @@ -72,6 +73,9 @@ func (c *DotUpCodecRequest) Method() (string, error) {
m, err := c.CodecRequest.Method()
if len(m) > 1 && err == nil {
parts := strings.Split(m, "_")
if len(parts) < 2 {
return "", fmt.Errorf("rpc error method %s not found", m)
}
service, method := parts[0], parts[1]
r, n := utf8.DecodeRuneInString(method) // get the first rune, and it's length
if unicode.IsLower(r) {
Expand Down
25 changes: 17 additions & 8 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package rpc

import (
"fmt"

"github.com/gorilla/mux"
"github.com/gorilla/rpc/v2"

"net/http"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/gorilla/mux"
"github.com/gorilla/rpc/v2"

log "github.com/ChainSafe/log15"
)
Expand All @@ -43,7 +41,8 @@ type HTTPServerConfig struct {
CoreAPI modules.CoreAPI
TransactionQueueAPI modules.TransactionQueueAPI
Host string
Port uint32
RPCPort uint32
WSPort uint32
Modules []string
}

Expand Down Expand Up @@ -85,19 +84,29 @@ func (h *HTTPServer) RegisterModules(mods []string) {
}
}

// Start registers the rpc handler function and starts the server listening on `h.port`
// Start registers the rpc handler function and starts the rpc http and websocket server
func (h *HTTPServer) Start() error {
// use our DotUpCodec which will capture methods passed in json as _x that is
// underscore followed by lower case letter, instead of default RPC calls which
// use . followed by Upper case letter
h.rpcServer.RegisterCodec(NewDotUpCodec(), "application/json")
h.rpcServer.RegisterCodec(NewDotUpCodec(), "application/json;charset=UTF-8")

log.Debug("[rpc] Starting HTTP Server...", "host", h.serverConfig.Host, "port", h.serverConfig.Port)
log.Info("[rpc] Starting HTTP Server...", "host", h.serverConfig.Host, "port", h.serverConfig.RPCPort)
r := mux.NewRouter()
r.Handle("/", h.rpcServer)
go func() {
err := http.ListenAndServe(fmt.Sprintf(":%d", h.serverConfig.Port), r)
err := http.ListenAndServe(fmt.Sprintf(":%d", h.serverConfig.RPCPort), r)
if err != nil {
log.Error("[rpc] http error", "err", err)
}
}()

log.Info("[rpc] Starting WebSocket Server...", "host", h.serverConfig.Host, "port", h.serverConfig.WSPort)
ws := mux.NewRouter()
ws.Handle("/", h)
go func() {
err := http.ListenAndServe(fmt.Sprintf(":%d", h.serverConfig.WSPort), ws)
if err != nil {
log.Error("[rpc] http error", "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestNewHTTPServer(t *testing.T) {

cfg := &HTTPServerConfig{
Modules: []string{"system"},
Port: 8545,
RPCPort: 8545,
}
s := NewHTTPServer(cfg)
err := s.Start()
Expand Down
82 changes: 82 additions & 0 deletions dot/rpc/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package rpc

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/websocket"
)

// ServeHTTP implemented to handle WebSocket connections
func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var upg = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
ws, err := upg.Upgrade(w, r, nil)
if err != nil {
log.Error("[rpc] websocket upgrade failed", "error", err)
return
}
for {
rpcHost := fmt.Sprintf("http://%s:%d/", h.serverConfig.Host, h.serverConfig.RPCPort)
for {
_, mbytes, err := ws.ReadMessage()
if err != nil {
log.Error("[rpc] websocket failed to read message", "error", err)
return
}
log.Trace("[rpc] websocket received", "message", fmt.Sprintf("%s", mbytes))
client := &http.Client{}
buf := &bytes.Buffer{}
_, err = buf.Write(mbytes)
if err != nil {
log.Error("[rpc] failed to write message to buffer", "error", err)
return
}

req, err := http.NewRequest("POST", rpcHost, buf)
if err != nil {
log.Error("[rpc] failed request to rpc service", "error", err)
return
}

req.Header.Set("Content-Type", "application/json;")

res, err := client.Do(req)
if err != nil {
log.Error("[rpc] websocket error calling rpc", "error", err)
return
}

body, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Error("[rpc] error reading response body", "error", err)
return
}

err = res.Body.Close()
if err != nil {
log.Error("[rpc] error closing response body", "error", err)
return
}
var wsSend interface{}
err = json.Unmarshal(body, &wsSend)
if err != nil {
log.Error("[rpc] error unmarshal rpc response", "error", err)
return
}

err = ws.WriteJSON(wsSend)
if err != nil {
log.Error("[rpc] error writing json response", "error", err)
return
}
}
}
}
54 changes: 54 additions & 0 deletions dot/rpc/websocket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package rpc

import (
"flag"
"log"
"net/url"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)

var addr = flag.String("addr", "localhost:8546", "http service address")
var testCalls = []struct {
call []byte
expected []byte
}{
{[]byte(`{"jsonrpc":"2.0","method":"system_name","params":[],"id":1}`), []byte(`{"id":1,"jsonrpc":"2.0","result":"gossamer v0.0"}` + "\n")}, // working request
{[]byte(`{"jsonrpc":"2.0","method":"unknown","params":[],"id":1}`), []byte(`{"error":{"code":-32000,"data":null,"message":"rpc error method unknown not found"},"id":1,"jsonrpc":"2.0"}` + "\n")}, // unknown method
{[]byte{}, []byte(`{"error":{"code":-32700,"data":{"id":null,"jsonrpc":"","method":"","params":null},"message":"EOF"},"id":null,"jsonrpc":"2.0"}` + "\n")}, // empty request
}

func TestNewWebSocketServer(t *testing.T) {

cfg := &HTTPServerConfig{
Modules: []string{"system"},
RPCPort: 8545,
WSPort: 8546,
}
s := NewHTTPServer(cfg)
err := s.Start()
require.Nil(t, err)

time.Sleep(time.Second) // give server a second to start

u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}
log.Printf("connecting to %s", u.String())

c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()

for _, item := range testCalls {
err = c.WriteMessage(websocket.TextMessage, item.call)
require.Nil(t, err)

_, message, err := c.ReadMessage()
require.Nil(t, err)
require.Equal(t, item.expected, message)
}
}
7 changes: 4 additions & 3 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/runtime"

log "github.com/ChainSafe/log15"
)

Expand Down Expand Up @@ -149,8 +148,9 @@ func createRPCService(cfg *Config, stateSrvc *state.Service, coreSrvc *core.Serv
log.Info(
"[dot] creating rpc service...",
"host", cfg.RPC.Host,
"port", cfg.RPC.Port,
"rpc port", cfg.RPC.Port,
"mods", cfg.RPC.Modules,
"ws port", cfg.RPC.WSPort,
)

rpcConfig := &rpc.HTTPServerConfig{
Expand All @@ -160,7 +160,8 @@ func createRPCService(cfg *Config, stateSrvc *state.Service, coreSrvc *core.Serv
CoreAPI: coreSrvc,
TransactionQueueAPI: stateSrvc.TransactionQueue,
Host: cfg.RPC.Host,
Port: cfg.RPC.Port,
RPCPort: cfg.RPC.Port,
WSPort: cfg.RPC.WSPort,
Modules: cfg.RPC.Modules,
}

Expand Down
2 changes: 1 addition & 1 deletion dot/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/lib/utils"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -64,6 +63,7 @@ func NewTestConfig(t *testing.T) *Config {
Host: string("localhost"),
Port: uint32(8545),
Modules: []string{"system", "author", "chain"},
WSPort: uint32(8546),
},
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/google/go-cmp v0.3.1 // indirect
github.com/gorilla/mux v1.7.4
github.com/gorilla/rpc v1.2.0
github.com/gorilla/websocket v1.4.1
github.com/gtank/merlin v0.1.1 // indirect
github.com/ipfs/go-datastore v0.3.1
github.com/jcelliott/lumber v0.0.0-20160324203708-dd349441af25 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jcelliott/lumber v0.0.0-20160324203708-dd349441af25 h1:EFT6MH3igZK/dIVqgGbTqWVvkZ7wJ5iGN03SVtvvdd8=
github.com/jcelliott/lumber v0.0.0-20160324203708-dd349441af25/go.mod h1:sWkGw/wsaHtRsT9zGQ/WyJCotGWG/Anow/9hsAcBWRw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jingyugao/rowserrcheck v0.0.0-20191204022205-72ab7603b68a h1:GmsqmapfzSJkm28dhRoHz2tLRbJmqhU86IPgBtN3mmk=
Expand Down Expand Up @@ -505,6 +507,8 @@ github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wS
github.com/multiformats/go-varint v0.0.1 h1:TR/0rdQtnNxuN2IhiB639xC3tWM4IUi7DkTBVTdGW/M=
github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nanobox-io/golang-scribble v0.0.0-20190309225732-aa3e7c118975 h1:zm/Rb2OsnLWCY88Njoqgo4X6yt/lx3oBNWhepX0AOMU=
github.com/nanobox-io/golang-scribble v0.0.0-20190309225732-aa3e7c118975/go.mod h1:4Mct/lWCFf1jzQTTAaWtOI7sXqmG+wBeiBfT4CxoaJk=
github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks=
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8=
Expand Down
1 change: 1 addition & 0 deletions node/gssmr/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ enabled = false
port = 8545
host = "localhost"
modules = ["system", "author", "chain"]
ws-port = 8546
2 changes: 2 additions & 0 deletions node/gssmr/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ var (
DefaultRPCHTTPPort = uint32(8545)
// DefaultRPCModules rpc modules
DefaultRPCModules = []string{"system", "author", "chain"}
// DefaultRPCWSPort rpc websocket port
DefaultRPCWSPort = uint32(8546)
)
1 change: 1 addition & 0 deletions node/ksmcc/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ enabled = false
port = 8545
host = "localhost"
modules = ["system"]
ws-port = 8546
2 changes: 2 additions & 0 deletions node/ksmcc/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ var (
DefaultRPCHTTPPort = uint32(8545)
// DefaultRPCModules rpc modules
DefaultRPCModules = []string{"system"}
// DefaultRPCWSPort rpc websocket port
DefaultRPCWSPort = uint32(8546)
)

0 comments on commit 6f62c61

Please sign in to comment.