Skip to content

Commit df75dbf

Browse files
committed
cmd, node, rpc: readd inproc RPC client, expose via node
1 parent 900e124 commit df75dbf

File tree

11 files changed

+178
-25
lines changed

11 files changed

+178
-25
lines changed

cmd/geth/js_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"fmt"
2121
"io/ioutil"
2222
"math/big"
23-
"math/rand"
2423
"os"
2524
"path/filepath"
2625
"regexp"
@@ -30,7 +29,6 @@ import (
3029
"time"
3130

3231
"github.com/ethereum/go-ethereum/accounts"
33-
"github.com/ethereum/go-ethereum/cmd/utils"
3432
"github.com/ethereum/go-ethereum/common"
3533
"github.com/ethereum/go-ethereum/common/compiler"
3634
"github.com/ethereum/go-ethereum/common/httpclient"
@@ -96,7 +94,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod
9694
t.Fatal(err)
9795
}
9896
// Create a networkless protocol stack
99-
stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true, IPCPath: fmt.Sprintf("geth-test-%d.ipc", rand.Int63())})
97+
stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true})
10098
if err != nil {
10199
t.Fatalf("failed to create node: %v", err)
102100
}
@@ -142,7 +140,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod
142140
stack.Service(&ethereum)
143141

144142
assetPath := filepath.Join(os.Getenv("GOPATH"), "src", "github.com", "ethereum", "go-ethereum", "cmd", "mist", "assets", "ext")
145-
client, err := utils.NewRemoteRPCClientFromString("ipc:" + stack.IPCEndpoint())
143+
client, err := stack.Attach()
146144
if err != nil {
147145
t.Fatalf("failed to attach to node: %v", err)
148146
}

cmd/geth/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ func console(ctx *cli.Context) {
425425
startNode(ctx, node)
426426

427427
// Attach to the newly started node, and either execute script or become interactive
428-
client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IPCEndpoint())
428+
client, err := node.Attach()
429429
if err != nil {
430430
utils.Fatalf("Failed to attach to the inproc geth: %v", err)
431431
}
@@ -451,7 +451,7 @@ func execScripts(ctx *cli.Context) {
451451
startNode(ctx, node)
452452

453453
// Attach to the newly started node and execute the given scripts
454-
client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IPCEndpoint())
454+
client, err := node.Attach()
455455
if err != nil {
456456
utils.Fatalf("Failed to attach to the inproc geth: %v", err)
457457
}

cmd/utils/client.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,5 @@ func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) {
5151
if strings.HasPrefix(endpoint, "ws:") {
5252
return rpc.NewWSClient(endpoint)
5353
}
54-
5554
return nil, fmt.Errorf("invalid endpoint")
5655
}

node/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (api *PrivateAdminAPI) StartWS(host string, port int, cors string, apis str
8989
defer api.node.lock.Unlock()
9090

9191
if api.node.wsHandler != nil {
92-
return false, fmt.Errorf("WebSocker RPC already running on %s", api.node.wsEndpoint)
92+
return false, fmt.Errorf("WebSocket RPC already running on %s", api.node.wsEndpoint)
9393
}
9494
if err := api.node.startWS(fmt.Sprintf("%s:%d", host, port), api.node.rpcAPIs, strings.Split(apis, ","), cors); err != nil {
9595
return false, err

node/node.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ type Node struct {
5555
serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
5656
services map[reflect.Type]Service // Currently running services
5757

58-
rpcAPIs []rpc.API // List of APIs currently provided by the node
58+
rpcAPIs []rpc.API // List of APIs currently provided by the node
59+
inprocHandler *rpc.Server // In-process RPC request handler to process the API requests
60+
5961
ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled)
6062
ipcListener net.Listener // IPC RPC listener socket to serve API requests
6163
ipcHandler *rpc.Server // IPC RPC request handler to process the API requests
@@ -217,23 +219,51 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
217219
apis = append(apis, service.APIs()...)
218220
}
219221
// Start the various API endpoints, terminating all in case of errors
222+
if err := n.startInProc(apis); err != nil {
223+
return err
224+
}
220225
if err := n.startIPC(apis); err != nil {
226+
n.stopInProc()
221227
return err
222228
}
223229
if err := n.startHTTP(n.httpEndpoint, apis, n.httpWhitelist, n.httpCors); err != nil {
224230
n.stopIPC()
231+
n.stopInProc()
225232
return err
226233
}
227234
if err := n.startWS(n.wsEndpoint, apis, n.wsWhitelist, n.wsDomains); err != nil {
228235
n.stopHTTP()
229236
n.stopIPC()
237+
n.stopInProc()
230238
return err
231239
}
232240
// All API endpoints started successfully
233241
n.rpcAPIs = apis
234242
return nil
235243
}
236244

245+
// startInProc initializes an in-process RPC endpoint.
246+
func (n *Node) startInProc(apis []rpc.API) error {
247+
// Register all the APIs exposed by the services
248+
handler := rpc.NewServer()
249+
for _, api := range apis {
250+
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
251+
return err
252+
}
253+
glog.V(logger.Debug).Infof("InProc registered %T under '%s'", api.Service, api.Namespace)
254+
}
255+
n.inprocHandler = handler
256+
return nil
257+
}
258+
259+
// stopInProc terminates the in-process RPC endpoint.
260+
func (n *Node) stopInProc() {
261+
if n.inprocHandler != nil {
262+
n.inprocHandler.Stop()
263+
n.inprocHandler = nil
264+
}
265+
}
266+
237267
// startIPC initializes and starts the IPC RPC endpoint.
238268
func (n *Node) startIPC(apis []rpc.API) error {
239269
// Short circuit if the IPC endpoint isn't being exposed
@@ -468,6 +498,19 @@ func (n *Node) Restart() error {
468498
return nil
469499
}
470500

501+
// Attach creates an RPC client attached to an in-process API handler.
502+
func (n *Node) Attach() (rpc.Client, error) {
503+
n.lock.RLock()
504+
defer n.lock.RUnlock()
505+
506+
// Short circuit if the node's not running
507+
if n.server == nil {
508+
return nil, ErrNodeStopped
509+
}
510+
// Otherwise attach to the API and return
511+
return rpc.NewInProcRPCClient(n.inprocHandler), nil
512+
}
513+
471514
// Server retrieves the currently running P2P network layer. This method is meant
472515
// only to inspect fields of the currently running server, life cycle management
473516
// should be left to this Node entity.
@@ -506,6 +549,16 @@ func (n *Node) IPCEndpoint() string {
506549
return n.ipcEndpoint
507550
}
508551

552+
// HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack.
553+
func (n *Node) HTTPEndpoint() string {
554+
return n.httpEndpoint
555+
}
556+
557+
// WSEndpoint retrieves the current WS endpoint used by the protocol stack.
558+
func (n *Node) WSEndpoint() string {
559+
return n.wsEndpoint
560+
}
561+
509562
// EventMux retrieves the event multiplexer used by all the network services in
510563
// the current protocol stack.
511564
func (n *Node) EventMux() *event.TypeMux {

node/node_test.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ package node
1818

1919
import (
2020
"errors"
21-
"fmt"
2221
"io/ioutil"
23-
"math/rand"
2422
"os"
2523
"reflect"
2624
"testing"
@@ -37,7 +35,6 @@ var (
3735

3836
func testNodeConfig() *Config {
3937
return &Config{
40-
IPCPath: fmt.Sprintf("test-%d.ipc", rand.Int63()),
4138
PrivateKey: testNodeKey,
4239
Name: "test node",
4340
}
@@ -541,10 +538,11 @@ func TestAPIGather(t *testing.T) {
541538
defer stack.Stop()
542539

543540
// Connect to the RPC server and verify the various registered endpoints
544-
ipcClient, err := rpc.NewIPCClient(stack.IPCEndpoint())
541+
client, err := stack.Attach()
545542
if err != nil {
546-
t.Fatalf("failed to connect to the IPC API server: %v", err)
543+
t.Fatalf("failed to connect to the inproc API server: %v", err)
547544
}
545+
defer client.Close()
548546

549547
tests := []struct {
550548
Method string
@@ -556,11 +554,11 @@ func TestAPIGather(t *testing.T) {
556554
{"multi.v2.nested_theOneMethod", "multi.v2.nested"},
557555
}
558556
for i, test := range tests {
559-
if err := ipcClient.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil {
557+
if err := client.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil {
560558
t.Fatalf("test %d: failed to send API request: %v", i, err)
561559
}
562560
reply := new(rpc.JSONSuccessResponse)
563-
if err := ipcClient.Recv(reply); err != nil {
561+
if err := client.Recv(reply); err != nil {
564562
t.Fatalf("test %d: failed to read API reply: %v", i, err)
565563
}
566564
select {

rpc/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ type httpClient struct {
259259

260260
// NewHTTPClient create a new RPC clients that connection to a geth RPC server
261261
// over HTTP.
262-
func NewHTTPClient(endpoint string) (*httpClient, error) {
262+
func NewHTTPClient(endpoint string) (Client, error) {
263263
url, err := url.Parse(endpoint)
264264
if err != nil {
265265
return nil, err

rpc/inproc.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright 2016 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package rpc
18+
19+
import "encoding/json"
20+
21+
// NewInProcRPCClient creates an in-process buffer stream attachment to a given
22+
// RPC server.
23+
func NewInProcRPCClient(handler *Server) Client {
24+
buffer := &inprocBuffer{
25+
requests: make(chan []byte, 16),
26+
responses: make(chan []byte, 16),
27+
}
28+
client := &inProcClient{
29+
server: handler,
30+
buffer: buffer,
31+
}
32+
go handler.ServeCodec(NewJSONCodec(client.buffer))
33+
return client
34+
}
35+
36+
// inProcClient is an in-process buffer stream attached to an RPC server.
37+
type inProcClient struct {
38+
server *Server
39+
buffer *inprocBuffer
40+
}
41+
42+
// Close tears down the request channel of the in-proc client.
43+
func (c *inProcClient) Close() {
44+
c.buffer.Close()
45+
}
46+
47+
// Send marshals a message into a json format and injects in into the client
48+
// request channel.
49+
func (c *inProcClient) Send(msg interface{}) error {
50+
d, err := json.Marshal(msg)
51+
if err != nil {
52+
return err
53+
}
54+
c.buffer.requests <- d
55+
return nil
56+
}
57+
58+
// Recv reads a message from the response channel and tries to parse it into the
59+
// given msg interface.
60+
func (c *inProcClient) Recv(msg interface{}) error {
61+
data := <-c.buffer.responses
62+
return json.Unmarshal(data, &msg)
63+
}
64+
65+
// Returns the collection of modules the RPC server offers.
66+
func (c *inProcClient) SupportedModules() (map[string]string, error) {
67+
return SupportedModules(c)
68+
}
69+
70+
// inprocBuffer represents the connection between the RPC server and console
71+
type inprocBuffer struct {
72+
readBuf []byte // store remaining request bytes after a partial read
73+
requests chan []byte // list with raw serialized requests
74+
responses chan []byte // list with raw serialized responses
75+
}
76+
77+
// Read will read the next request in json format.
78+
func (b *inprocBuffer) Read(p []byte) (int, error) {
79+
// last read didn't read entire request, return remaining bytes
80+
if len(b.readBuf) > 0 {
81+
n := copy(p, b.readBuf)
82+
if n < len(b.readBuf) {
83+
b.readBuf = b.readBuf[:n]
84+
} else {
85+
b.readBuf = b.readBuf[:0]
86+
}
87+
return n, nil
88+
}
89+
// read next request
90+
req := <-b.requests
91+
n := copy(p, req)
92+
if n < len(req) {
93+
// inprocBuffer too small, store remaining chunk for next read
94+
b.readBuf = req[n:]
95+
}
96+
return n, nil
97+
}
98+
99+
// Write sends the given buffer to the backend.
100+
func (b *inprocBuffer) Write(p []byte) (n int, err error) {
101+
b.responses <- p
102+
return len(p), nil
103+
}
104+
105+
// Close cleans up obtained resources.
106+
func (b *inprocBuffer) Close() error {
107+
close(b.requests)
108+
close(b.responses)
109+
110+
return nil
111+
}

rpc/ipc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type ipcClient struct {
3838
// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded.
3939
// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a
4040
// named pipe.
41-
func NewIPCClient(endpoint string) (*ipcClient, error) {
41+
func NewIPCClient(endpoint string) (Client, error) {
4242
conn, err := newIPCConnection(endpoint)
4343
if err != nil {
4444
return nil, err

rpc/ipc_windows.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,6 @@ func Dial(address string) (*PipeConn, error) {
239239
for {
240240
conn, err := dial(address, nmpwait_wait_forever)
241241
if err == nil {
242-
// Ugly hack working around some async connectivity issues
243-
time.Sleep(100 * time.Millisecond)
244-
245242
return conn, nil
246243
}
247244
if isPipeNotReady(err) {
@@ -363,9 +360,6 @@ func Listen(address string) (*PipeListener, error) {
363360
if err != nil {
364361
return nil, err
365362
}
366-
// Ugly hack working around some async connectivity issues
367-
time.Sleep(100 * time.Millisecond)
368-
369363
return &PipeListener{
370364
addr: PipeAddr(address),
371365
handle: handle,

0 commit comments

Comments
 (0)