From 877f9c48b6ea903884138609ccb068f2f7eb3773 Mon Sep 17 00:00:00 2001 From: Ian Cottrell Date: Tue, 9 Feb 2021 22:45:26 -0500 Subject: [PATCH] internal/jsonrpc2_v2: an updated jsonrpc2 library Change-Id: I609173baa6842d33068a7e9596d54f03d89c5401 Reviewed-on: https://go-review.googlesource.com/c/tools/+/292169 Run-TryBot: Ian Cottrell gopls-CI: kokoro TryBot-Result: Go Bot Trust: Ian Cottrell Reviewed-by: Robert Findley --- internal/jsonrpc2_v2/conn.go | 486 ++++++++++++++++++++++++++ internal/jsonrpc2_v2/frame.go | 179 ++++++++++ internal/jsonrpc2_v2/jsonrpc2.go | 84 +++++ internal/jsonrpc2_v2/jsonrpc2_test.go | 389 +++++++++++++++++++++ internal/jsonrpc2_v2/messages.go | 181 ++++++++++ internal/jsonrpc2_v2/net.go | 129 +++++++ internal/jsonrpc2_v2/serve.go | 208 +++++++++++ internal/jsonrpc2_v2/serve_test.go | 144 ++++++++ internal/jsonrpc2_v2/wire.go | 74 ++++ internal/jsonrpc2_v2/wire_test.go | 118 +++++++ 10 files changed, 1992 insertions(+) create mode 100644 internal/jsonrpc2_v2/conn.go create mode 100644 internal/jsonrpc2_v2/frame.go create mode 100644 internal/jsonrpc2_v2/jsonrpc2.go create mode 100644 internal/jsonrpc2_v2/jsonrpc2_test.go create mode 100644 internal/jsonrpc2_v2/messages.go create mode 100644 internal/jsonrpc2_v2/net.go create mode 100644 internal/jsonrpc2_v2/serve.go create mode 100644 internal/jsonrpc2_v2/serve_test.go create mode 100644 internal/jsonrpc2_v2/wire.go create mode 100644 internal/jsonrpc2_v2/wire_test.go diff --git a/internal/jsonrpc2_v2/conn.go b/internal/jsonrpc2_v2/conn.go new file mode 100644 index 00000000000..6d92c0c9ce7 --- /dev/null +++ b/internal/jsonrpc2_v2/conn.go @@ -0,0 +1,486 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2 + +import ( + "context" + "encoding/json" + "fmt" + "io" + "sync/atomic" + + "golang.org/x/tools/internal/event" + "golang.org/x/tools/internal/event/label" + "golang.org/x/tools/internal/lsp/debug/tag" + errors "golang.org/x/xerrors" +) + +// Binder builds a connection configuration. +// This may be used in servers to generate a new configuration per connection. +// ConnectionOptions itself implements Binder returning itself unmodified, to +// allow for the simple cases where no per connection information is needed. +type Binder interface { + // Bind is invoked when creating a new connection. + // The connection is not ready to use when Bind is called. + Bind(context.Context, *Connection) (ConnectionOptions, error) +} + +// ConnectionOptions holds the options for new connections. +type ConnectionOptions struct { + // Framer allows control over the message framing and encoding. + // If nil, HeaderFramer will be used. + Framer Framer + // Preempter allows registration of a pre-queue message handler. + // If nil, no messages will be preempted. + Preempter Preempter + // Handler is used as the queued message handler for inbound messages. + // If nil, all responses will be ErrNotHandled. + Handler Handler +} + +// Connection manages the jsonrpc2 protocol, connecting responses back to their +// calls. +// Connection is bidirectional; it does not have a designated server or client +// end. +type Connection struct { + seq int64 // must only be accessed using atomic operations + closer io.Closer + writerBox chan Writer + outgoingBox chan map[ID]chan<- *Response + incomingBox chan map[ID]*incoming + async async +} + +type AsyncCall struct { + id ID + response chan *Response // the channel a response will be delivered on + resultBox chan asyncResult + endSpan func() // close the tracing span when all processing for the message is complete +} + +type asyncResult struct { + result []byte + err error +} + +// incoming is used to track an incoming request as it is being handled +type incoming struct { + request *Request // the request being processed + baseCtx context.Context // a base context for the message processing + done func() // a function called when all processing for the message is complete + handleCtx context.Context // the context for handling the message, child of baseCtx + cancel func() // a function that cancels the handling context +} + +// Bind returns the options unmodified. +func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) { + return o, nil +} + +// newConnection creates a new connection and runs it. +// This is used by the Dial and Serve functions to build the actual connection. +func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) { + c := &Connection{ + closer: rwc, + writerBox: make(chan Writer, 1), + outgoingBox: make(chan map[ID]chan<- *Response, 1), + incomingBox: make(chan map[ID]*incoming, 1), + } + + options, err := binder.Bind(ctx, c) + if err != nil { + return nil, err + } + if options.Framer == nil { + options.Framer = HeaderFramer() + } + if options.Preempter == nil { + options.Preempter = defaultHandler{} + } + if options.Handler == nil { + options.Handler = defaultHandler{} + } + c.outgoingBox <- make(map[ID]chan<- *Response) + c.incomingBox <- make(map[ID]*incoming) + c.async.init() + // the goroutines started here will continue until the underlying stream is closed + reader := options.Framer.Reader(rwc) + readToQueue := make(chan *incoming) + queueToDeliver := make(chan *incoming) + go c.readIncoming(ctx, reader, readToQueue) + go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver) + go c.deliverMessages(ctx, options.Handler, queueToDeliver) + // releaseing the writer must be the last thing we do in case any requests + // are blocked waiting for the connection to be ready + c.writerBox <- options.Framer.Writer(rwc) + return c, nil +} + +// Notify invokes the target method but does not wait for a response. +// The params will be marshaled to JSON before sending over the wire, and will +// be handed to the method invoked. +func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error { + notify, err := NewNotification(method, params) + if err != nil { + return errors.Errorf("marshaling notify parameters: %v", err) + } + ctx, done := event.Start(ctx, method, + tag.Method.Of(method), + tag.RPCDirection.Of(tag.Outbound), + ) + event.Metric(ctx, tag.Started.Of(1)) + err = c.write(ctx, notify) + switch { + case err != nil: + event.Label(ctx, tag.StatusCode.Of("ERROR")) + default: + event.Label(ctx, tag.StatusCode.Of("OK")) + } + done() + return err +} + +// Call invokes the target method and returns an object that can be used to await the response. +// The params will be marshaled to JSON before sending over the wire, and will +// be handed to the method invoked. +// You do not have to wait for the response, it can just be ignored if not needed. +// If sending the call failed, the response will be ready and have the error in it. +func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall { + result := &AsyncCall{ + id: Int64ID(atomic.AddInt64(&c.seq, 1)), + resultBox: make(chan asyncResult, 1), + } + // generate a new request identifier + call, err := NewCall(result.id, method, params) + if err != nil { + //set the result to failed + result.resultBox <- asyncResult{err: errors.Errorf("marshaling call parameters: %w", err)} + return result + } + ctx, endSpan := event.Start(ctx, method, + tag.Method.Of(method), + tag.RPCDirection.Of(tag.Outbound), + tag.RPCID.Of(fmt.Sprintf("%q", result.id)), + ) + result.endSpan = endSpan + event.Metric(ctx, tag.Started.Of(1)) + // We have to add ourselves to the pending map before we send, otherwise we + // are racing the response. + // rchan is buffered in case the response arrives without a listener. + result.response = make(chan *Response, 1) + pending := <-c.outgoingBox + pending[result.id] = result.response + c.outgoingBox <- pending + // now we are ready to send + if err := c.write(ctx, call); err != nil { + // sending failed, we will never get a response, so deliver a fake one + r, _ := NewResponse(result.id, nil, err) + c.incomingResponse(r) + } + return result +} + +// ID used for this call. +// This can be used to cancel the call if needed. +func (a *AsyncCall) ID() ID { return a.id } + +// IsReady can be used to check if the result is already prepared. +// This is guaranteed to return true on a result for which Await has already +// returned, or a call that failed to send in the first place. +func (a *AsyncCall) IsReady() bool { + select { + case r := <-a.resultBox: + a.resultBox <- r + return true + default: + return false + } +} + +// Await the results of a Call. +// The response will be unmarshaled from JSON into the result. +func (a *AsyncCall) Await(ctx context.Context, result interface{}) error { + defer a.endSpan() + var r asyncResult + select { + case response := <-a.response: + // response just arrived, prepare the result + switch { + case response.Error != nil: + r.err = response.Error + event.Label(ctx, tag.StatusCode.Of("ERROR")) + default: + r.result = response.Result + event.Label(ctx, tag.StatusCode.Of("OK")) + } + case r = <-a.resultBox: + // result already available + case <-ctx.Done(): + event.Label(ctx, tag.StatusCode.Of("CANCELLED")) + return ctx.Err() + } + // refill the box for the next caller + a.resultBox <- r + // and unpack the result + if r.err != nil { + return r.err + } + if result == nil || len(r.result) == 0 { + return nil + } + return json.Unmarshal(r.result, result) +} + +// Respond deliverers a response to an incoming Call. +// It is an error to not call this exactly once for any message for which a +// handler has previously returned ErrAsyncResponse. It is also an error to +// call this for any other message. +func (c *Connection) Respond(id ID, result interface{}, rerr error) error { + pending := <-c.incomingBox + defer func() { c.incomingBox <- pending }() + entry, found := pending[id] + if !found { + return nil + } + delete(pending, id) + return c.respond(entry, result, rerr) +} + +// Cancel is used to cancel an inbound message by ID, it does not cancel +// outgoing messages. +// This is only used inside a message handler that is layering a +// cancellation protocol on top of JSON RPC 2. +// It will not complain if the ID is not a currently active message, and it will +// not cause any messages that have not arrived yet with that ID to be +// cancelled. +func (c *Connection) Cancel(id ID) { + pending := <-c.incomingBox + defer func() { c.incomingBox <- pending }() + if entry, found := pending[id]; found && entry.cancel != nil { + entry.cancel() + entry.cancel = nil + } +} + +// Wait blocks until the connection is fully closed, but does not close it. +func (c *Connection) Wait() error { + return c.async.wait() +} + +// Close can be used to close the underlying stream, and then wait for the connection to +// fully shut down. +// This does not cancel in flight requests, but waits for them to gracefully complete. +func (c *Connection) Close() error { + // close the underlying stream + if err := c.closer.Close(); err != nil && !isClosingError(err) { + return err + } + // and then wait for it to cause the connection to close + if err := c.Wait(); err != nil && !isClosingError(err) { + return err + } + return nil +} + +// readIncoming collects inbound messages from the reader and delivers them, either responding +// to outgoing calls or feeding requests to the queue. +func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) { + defer close(toQueue) + for { + // get the next message + // no lock is needed, this is the only reader + msg, n, err := reader.Read(ctx) + if err != nil { + // The stream failed, we cannot continue + c.async.setError(err) + return + } + switch msg := msg.(type) { + case *Request: + entry := &incoming{ + request: msg, + } + // add a span to the context for this request + labels := append(make([]label.Label, 0, 3), // make space for the id if present + tag.Method.Of(msg.Method), + tag.RPCDirection.Of(tag.Inbound), + ) + if msg.IsCall() { + labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID))) + } + entry.baseCtx, entry.done = event.Start(ctx, msg.Method, labels...) + event.Metric(entry.baseCtx, + tag.Started.Of(1), + tag.ReceivedBytes.Of(n)) + // in theory notifications cannot be cancelled, but we build them a cancel context anyway + entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx) + // if the request is a call, add it to the incoming map so it can be + // cancelled by id + if msg.IsCall() { + pending := <-c.incomingBox + c.incomingBox <- pending + pending[msg.ID] = entry + } + // send the message to the incoming queue + toQueue <- entry + case *Response: + // If method is not set, this should be a response, in which case we must + // have an id to send the response back to the caller. + c.incomingResponse(msg) + } + } +} + +func (c *Connection) incomingResponse(msg *Response) { + pending := <-c.outgoingBox + response, ok := pending[msg.ID] + if ok { + delete(pending, msg.ID) + } + c.outgoingBox <- pending + if response != nil { + response <- msg + } +} + +// manageQueue reads incoming requests, attempts to proccess them with the preempter, or queue them +// up for normal handling. +func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) { + defer close(toDeliver) + q := []*incoming{} + ok := true + for { + var nextReq *incoming + if len(q) == 0 { + // no messages in the queue + // if we were closing, then we are done + if !ok { + return + } + // not closing, but nothing in the queue, so just block waiting for a read + nextReq, ok = <-fromRead + } else { + // we have a non empty queue, so pick whichever of reading or delivering + // that we can make progress on + select { + case nextReq, ok = <-fromRead: + case toDeliver <- q[0]: + //TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction? + q = q[1:] + } + } + if nextReq != nil { + // TODO: should we allow to limit the queue size? + var result interface{} + rerr := nextReq.handleCtx.Err() + if rerr == nil { + // only preempt if not already cancelled + result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request) + } + switch { + case rerr == ErrNotHandled: + // message not handled, add it to the queue for the main handler + q = append(q, nextReq) + case rerr == ErrAsyncResponse: + // message handled but the response will come later + default: + // anything else means the message is fully handled + c.reply(nextReq, result, rerr) + } + } + } +} + +func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) { + defer c.async.done() + for entry := range fromQueue { + // cancel any messages in the queue that we have a pending cancel for + var result interface{} + rerr := entry.handleCtx.Err() + if rerr == nil { + // only deliver if not already cancelled + result, rerr = handler.Handle(entry.handleCtx, entry.request) + } + switch { + case rerr == ErrNotHandled: + // message not handled, report it back to the caller as an error + c.reply(entry, nil, errors.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method)) + case rerr == ErrAsyncResponse: + // message handled but the response will come later + default: + c.reply(entry, result, rerr) + } + } +} + +// reply is used to reply to an incoming request that has just been handled +func (c *Connection) reply(entry *incoming, result interface{}, rerr error) { + if entry.request.IsCall() { + // we have a call finishing, remove it from the incoming map + pending := <-c.incomingBox + defer func() { c.incomingBox <- pending }() + delete(pending, entry.request.ID) + } + if err := c.respond(entry, result, rerr); err != nil { + // no way to propagate this error + //TODO: should we do more than just log it? + event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err) + } +} + +// respond sends a response. +// This is the code shared between reply and SendResponse. +func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error { + var err error + if entry.request.IsCall() { + // send the response + if result == nil && rerr == nil { + // call with no response, send an error anyway + rerr = errors.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method) + } + var response *Response + response, err = NewResponse(entry.request.ID, result, rerr) + if err == nil { + // we write the response with the base context, in case the message was cancelled + err = c.write(entry.baseCtx, response) + } + } else { + switch { + case rerr != nil: + // notification failed + err = errors.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr) + rerr = nil + case result != nil: + //notification produced a response, which is an error + err = errors.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method) + default: + // normal notification finish + } + } + switch { + case rerr != nil || err != nil: + event.Label(entry.baseCtx, tag.StatusCode.Of("ERROR")) + default: + event.Label(entry.baseCtx, tag.StatusCode.Of("OK")) + } + // and just to be clean, invoke and clear the cancel if needed + if entry.cancel != nil { + entry.cancel() + entry.cancel = nil + } + // mark the entire request processing as done + entry.done() + return err +} + +// write is used by all things that write outgoing messages, including replies. +// it makes sure that writes are atomic +func (c *Connection) write(ctx context.Context, msg Message) error { + writer := <-c.writerBox + defer func() { c.writerBox <- writer }() + n, err := writer.Write(ctx, msg) + event.Metric(ctx, tag.SentBytes.Of(n)) + return err +} diff --git a/internal/jsonrpc2_v2/frame.go b/internal/jsonrpc2_v2/frame.go new file mode 100644 index 00000000000..634717c73e2 --- /dev/null +++ b/internal/jsonrpc2_v2/frame.go @@ -0,0 +1,179 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2 + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "strconv" + "strings" + + errors "golang.org/x/xerrors" +) + +// Reader abstracts the transport mechanics from the JSON RPC protocol. +// A Conn reads messages from the reader it was provided on construction, +// and assumes that each call to Read fully transfers a single message, +// or returns an error. +// A reader is not safe for concurrent use, it is expected it will be used by +// a single Conn in a safe manner. +type Reader interface { + // Read gets the next message from the stream. + Read(context.Context) (Message, int64, error) +} + +// Writer abstracts the transport mechanics from the JSON RPC protocol. +// A Conn writes messages using the writer it was provided on construction, +// and assumes that each call to Write fully transfers a single message, +// or returns an error. +// A writer is not safe for concurrent use, it is expected it will be used by +// a single Conn in a safe manner. +type Writer interface { + // Write sends a message to the stream. + Write(context.Context, Message) (int64, error) +} + +// Framer wraps low level byte readers and writers into jsonrpc2 message +// readers and writers. +// It is responsible for the framing and encoding of messages into wire form. +type Framer interface { + // Reader wraps a byte reader into a message reader. + Reader(rw io.Reader) Reader + // Writer wraps a byte writer into a message writer. + Writer(rw io.Writer) Writer +} + +// RawFramer returns a new Framer. +// The messages are sent with no wrapping, and rely on json decode consistency +// to determine message boundaries. +func RawFramer() Framer { return rawFramer{} } + +type rawFramer struct{} +type rawReader struct{ in *json.Decoder } +type rawWriter struct{ out io.Writer } + +func (rawFramer) Reader(rw io.Reader) Reader { + return &rawReader{in: json.NewDecoder(rw)} +} + +func (rawFramer) Writer(rw io.Writer) Writer { + return &rawWriter{out: rw} +} + +func (r *rawReader) Read(ctx context.Context) (Message, int64, error) { + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + default: + } + var raw json.RawMessage + if err := r.in.Decode(&raw); err != nil { + return nil, 0, err + } + msg, err := DecodeMessage(raw) + return msg, int64(len(raw)), err +} + +func (w *rawWriter) Write(ctx context.Context, msg Message) (int64, error) { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + data, err := EncodeMessage(msg) + if err != nil { + return 0, errors.Errorf("marshaling message: %v", err) + } + n, err := w.out.Write(data) + return int64(n), err +} + +// HeaderFramer returns a new Framer. +// The messages are sent with HTTP content length and MIME type headers. +// This is the format used by LSP and others. +func HeaderFramer() Framer { return headerFramer{} } + +type headerFramer struct{} +type headerReader struct{ in *bufio.Reader } +type headerWriter struct{ out io.Writer } + +func (headerFramer) Reader(rw io.Reader) Reader { + return &headerReader{in: bufio.NewReader(rw)} +} + +func (headerFramer) Writer(rw io.Writer) Writer { + return &headerWriter{out: rw} +} + +func (r *headerReader) Read(ctx context.Context) (Message, int64, error) { + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + default: + } + var total, length int64 + // read the header, stop on the first empty line + for { + line, err := r.in.ReadString('\n') + total += int64(len(line)) + if err != nil { + return nil, total, errors.Errorf("failed reading header line: %w", err) + } + line = strings.TrimSpace(line) + // check we have a header line + if line == "" { + break + } + colon := strings.IndexRune(line, ':') + if colon < 0 { + return nil, total, errors.Errorf("invalid header line %q", line) + } + name, value := line[:colon], strings.TrimSpace(line[colon+1:]) + switch name { + case "Content-Length": + if length, err = strconv.ParseInt(value, 10, 32); err != nil { + return nil, total, errors.Errorf("failed parsing Content-Length: %v", value) + } + if length <= 0 { + return nil, total, errors.Errorf("invalid Content-Length: %v", length) + } + default: + // ignoring unknown headers + } + } + if length == 0 { + return nil, total, errors.Errorf("missing Content-Length header") + } + data := make([]byte, length) + n, err := io.ReadFull(r.in, data) + total += int64(n) + if err != nil { + return nil, total, err + } + msg, err := DecodeMessage(data) + return msg, total, err +} + +func (w *headerWriter) Write(ctx context.Context, msg Message) (int64, error) { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + data, err := EncodeMessage(msg) + if err != nil { + return 0, errors.Errorf("marshaling message: %v", err) + } + n, err := fmt.Fprintf(w.out, "Content-Length: %v\r\n\r\n", len(data)) + total := int64(n) + if err == nil { + n, err = w.out.Write(data) + total += int64(n) + } + return total, err +} diff --git a/internal/jsonrpc2_v2/jsonrpc2.go b/internal/jsonrpc2_v2/jsonrpc2.go new file mode 100644 index 00000000000..49f32cbdf82 --- /dev/null +++ b/internal/jsonrpc2_v2/jsonrpc2.go @@ -0,0 +1,84 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec. +// https://www.jsonrpc.org/specification +// It is intended to be compatible with other implementations at the wire level. +package jsonrpc2 + +import ( + "context" + "errors" +) + +var ( + // ErrIdleTimeout is returned when serving timed out waiting for new connections. + ErrIdleTimeout = errors.New("timed out waiting for new connections") + // ErrNotHandled is returned from a handler to indicate it did not handle the + // message. + ErrNotHandled = errors.New("JSON RPC not handled") + // ErrAsyncResponse is returned from a handler to indicate it will generate a + // response asynchronously. + ErrAsyncResponse = errors.New("JSON RPC asynchronous response") +) + +// Preempter handles messages on a connection before they are queued to the main +// handler. +// Primarily this is used for cancel handlers or notifications for which out of +// order processing is not an issue. +type Preempter interface { + // Preempt is invoked for each incoming request before it is queued. + // If the request is a call, it must return a value or an error for the reply. + // Preempt should not block or start any new messages on the connection. + Preempt(ctx context.Context, req *Request) (interface{}, error) +} + +// Handler handles messages on a connection. +type Handler interface { + // Handle is invoked for each incoming request. + // If the request is a call, it must return a value or an error for the reply. + Handle(ctx context.Context, req *Request) (interface{}, error) +} + +type defaultHandler struct{} + +func (defaultHandler) Preempt(context.Context, *Request) (interface{}, error) { + return nil, ErrNotHandled +} + +func (defaultHandler) Handle(context.Context, *Request) (interface{}, error) { + return nil, ErrNotHandled +} + +// async is a small helper for things with an asynchronous result that you can +// wait for. +type async struct { + ready chan struct{} + errBox chan error +} + +func (a *async) init() { + a.ready = make(chan struct{}) + a.errBox = make(chan error, 1) + a.errBox <- nil +} + +func (a *async) done() { + close(a.ready) +} + +func (a *async) wait() error { + <-a.ready + err := <-a.errBox + a.errBox <- err + return err +} + +func (a *async) setError(err error) { + storedErr := <-a.errBox + if storedErr == nil { + storedErr = err + } + a.errBox <- storedErr +} diff --git a/internal/jsonrpc2_v2/jsonrpc2_test.go b/internal/jsonrpc2_v2/jsonrpc2_test.go new file mode 100644 index 00000000000..8f2eca1d01d --- /dev/null +++ b/internal/jsonrpc2_v2/jsonrpc2_test.go @@ -0,0 +1,389 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2_test + +import ( + "context" + "encoding/json" + "fmt" + "path" + "reflect" + "testing" + "time" + + "golang.org/x/tools/internal/event/export/eventtest" + jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2" + "golang.org/x/tools/internal/stack/stacktest" + errors "golang.org/x/xerrors" +) + +var callTests = []invoker{ + call{"no_args", nil, true}, + call{"one_string", "fish", "got:fish"}, + call{"one_number", 10, "got:10"}, + call{"join", []string{"a", "b", "c"}, "a/b/c"}, + sequence{"notify", []invoker{ + notify{"set", 3}, + notify{"add", 5}, + call{"get", nil, 8}, + }}, + sequence{"preempt", []invoker{ + async{"a", "wait", "a"}, + notify{"unblock", "a"}, + collect{"a", true, false}, + }}, + sequence{"basic cancel", []invoker{ + async{"b", "wait", "b"}, + cancel{"b"}, + collect{"b", nil, true}, + }}, + sequence{"queue", []invoker{ + async{"a", "wait", "a"}, + notify{"set", 1}, + notify{"add", 2}, + notify{"add", 3}, + notify{"add", 4}, + call{"peek", nil, 0}, // accumulator will not have any adds yet + notify{"unblock", "a"}, + collect{"a", true, false}, + call{"get", nil, 10}, // accumulator now has all the adds + }}, + sequence{"fork", []invoker{ + async{"a", "fork", "a"}, + notify{"set", 1}, + notify{"add", 2}, + notify{"add", 3}, + notify{"add", 4}, + call{"get", nil, 10}, // fork will not have blocked the adds + notify{"unblock", "a"}, + collect{"a", true, false}, + }}, +} + +type binder struct { + framer jsonrpc2.Framer + runTest func(*handler) +} + +type handler struct { + conn *jsonrpc2.Connection + accumulator int + waitersBox chan map[string]chan struct{} + calls map[string]*jsonrpc2.AsyncCall +} + +type invoker interface { + Name() string + Invoke(t *testing.T, ctx context.Context, h *handler) +} + +type notify struct { + method string + params interface{} +} + +type call struct { + method string + params interface{} + expect interface{} +} + +type async struct { + name string + method string + params interface{} +} + +type collect struct { + name string + expect interface{} + fails bool +} + +type cancel struct { + name string +} + +type sequence struct { + name string + tests []invoker +} + +type echo call + +type cancelParams struct{ ID int64 } + +func TestConnectionRaw(t *testing.T) { + testConnection(t, jsonrpc2.RawFramer()) +} + +func TestConnectionHeader(t *testing.T) { + testConnection(t, jsonrpc2.HeaderFramer()) +} + +func testConnection(t *testing.T, framer jsonrpc2.Framer) { + stacktest.NoLeak(t) + ctx := eventtest.NewContext(context.Background(), t) + listener, err := jsonrpc2.NetPipe(ctx) + if err != nil { + t.Fatal(err) + } + server, err := jsonrpc2.Serve(ctx, listener, binder{framer, nil}, jsonrpc2.ServeOptions{}) + if err != nil { + t.Fatal(err) + } + defer func() { + listener.Close() + server.Wait() + }() + + for _, test := range callTests { + t.Run(test.Name(), func(t *testing.T) { + client, err := jsonrpc2.Dial(ctx, + listener.Dialer(), binder{framer, func(h *handler) { + defer h.conn.Close() + ctx := eventtest.NewContext(ctx, t) + test.Invoke(t, ctx, h) + if call, ok := test.(*call); ok { + // also run all simple call tests in echo mode + (*echo)(call).Invoke(t, ctx, h) + } + }}) + if err != nil { + t.Fatal(err) + } + client.Wait() + }) + } +} + +func (test notify) Name() string { return test.method } +func (test notify) Invoke(t *testing.T, ctx context.Context, h *handler) { + if err := h.conn.Notify(ctx, test.method, test.params); err != nil { + t.Fatalf("%v:Notify failed: %v", test.method, err) + } +} + +func (test call) Name() string { return test.method } +func (test call) Invoke(t *testing.T, ctx context.Context, h *handler) { + results := newResults(test.expect) + if err := h.conn.Call(ctx, test.method, test.params).Await(ctx, results); err != nil { + t.Fatalf("%v:Call failed: %v", test.method, err) + } + verifyResults(t, test.method, results, test.expect) +} + +func (test echo) Invoke(t *testing.T, ctx context.Context, h *handler) { + results := newResults(test.expect) + if err := h.conn.Call(ctx, "echo", []interface{}{test.method, test.params}).Await(ctx, results); err != nil { + t.Fatalf("%v:Echo failed: %v", test.method, err) + } + verifyResults(t, test.method, results, test.expect) +} + +func (test async) Name() string { return test.name } +func (test async) Invoke(t *testing.T, ctx context.Context, h *handler) { + h.calls[test.name] = h.conn.Call(ctx, test.method, test.params) +} + +func (test collect) Name() string { return test.name } +func (test collect) Invoke(t *testing.T, ctx context.Context, h *handler) { + o := h.calls[test.name] + results := newResults(test.expect) + err := o.Await(ctx, results) + switch { + case test.fails && err == nil: + t.Fatalf("%v:Collect was supposed to fail", test.name) + case !test.fails && err != nil: + t.Fatalf("%v:Collect failed: %v", test.name, err) + } + verifyResults(t, test.name, results, test.expect) +} + +func (test cancel) Name() string { return test.name } +func (test cancel) Invoke(t *testing.T, ctx context.Context, h *handler) { + o := h.calls[test.name] + if err := h.conn.Notify(ctx, "cancel", &cancelParams{o.ID().Raw().(int64)}); err != nil { + t.Fatalf("%v:Collect failed: %v", test.name, err) + } +} + +func (test sequence) Name() string { return test.name } +func (test sequence) Invoke(t *testing.T, ctx context.Context, h *handler) { + for _, child := range test.tests { + child.Invoke(t, ctx, h) + } +} + +// newResults makes a new empty copy of the expected type to put the results into +func newResults(expect interface{}) interface{} { + switch e := expect.(type) { + case []interface{}: + var r []interface{} + for _, v := range e { + r = append(r, reflect.New(reflect.TypeOf(v)).Interface()) + } + return r + case nil: + return nil + default: + return reflect.New(reflect.TypeOf(expect)).Interface() + } +} + +// verifyResults compares the results to the expected values +func verifyResults(t *testing.T, method string, results interface{}, expect interface{}) { + if expect == nil { + if results != nil { + t.Errorf("%v:Got results %+v where none expeted", method, expect) + } + return + } + val := reflect.Indirect(reflect.ValueOf(results)).Interface() + if !reflect.DeepEqual(val, expect) { + t.Errorf("%v:Results are incorrect, got %+v expect %+v", method, val, expect) + } +} + +func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) (jsonrpc2.ConnectionOptions, error) { + h := &handler{ + conn: conn, + waitersBox: make(chan map[string]chan struct{}, 1), + calls: make(map[string]*jsonrpc2.AsyncCall), + } + h.waitersBox <- make(map[string]chan struct{}) + if b.runTest != nil { + go b.runTest(h) + } + return jsonrpc2.ConnectionOptions{ + Framer: b.framer, + Preempter: h, + Handler: h, + }, nil +} + +func (h *handler) waiter(name string) chan struct{} { + waiters := <-h.waitersBox + defer func() { h.waitersBox <- waiters }() + waiter, found := waiters[name] + if !found { + waiter = make(chan struct{}) + waiters[name] = waiter + } + return waiter +} + +func (h *handler) Preempt(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + switch req.Method { + case "unblock": + var name string + if err := json.Unmarshal(req.Params, &name); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + close(h.waiter(name)) + return nil, nil + case "peek": + if len(req.Params) > 0 { + return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams) + } + return h.accumulator, nil + case "cancel": + var params cancelParams + if err := json.Unmarshal(req.Params, ¶ms); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + h.conn.Cancel(jsonrpc2.Int64ID(params.ID)) + return nil, nil + default: + return nil, jsonrpc2.ErrNotHandled + } +} + +func (h *handler) Handle(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + switch req.Method { + case "no_args": + if len(req.Params) > 0 { + return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams) + } + return true, nil + case "one_string": + var v string + if err := json.Unmarshal(req.Params, &v); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + return "got:" + v, nil + case "one_number": + var v int + if err := json.Unmarshal(req.Params, &v); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + return fmt.Sprintf("got:%d", v), nil + case "set": + var v int + if err := json.Unmarshal(req.Params, &v); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + h.accumulator = v + return nil, nil + case "add": + var v int + if err := json.Unmarshal(req.Params, &v); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + h.accumulator += v + return nil, nil + case "get": + if len(req.Params) > 0 { + return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams) + } + return h.accumulator, nil + case "join": + var v []string + if err := json.Unmarshal(req.Params, &v); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + return path.Join(v...), nil + case "echo": + var v []interface{} + if err := json.Unmarshal(req.Params, &v); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + var result interface{} + err := h.conn.Call(ctx, v[0].(string), v[1]).Await(ctx, &result) + return result, err + case "wait": + var name string + if err := json.Unmarshal(req.Params, &name); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + select { + case <-h.waiter(name): + return true, nil + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(time.Second): + return nil, errors.Errorf("wait for %q timed out", name) + } + case "fork": + var name string + if err := json.Unmarshal(req.Params, &name); err != nil { + return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err) + } + waitFor := h.waiter(name) + go func() { + select { + case <-waitFor: + h.conn.Respond(req.ID, true, nil) + case <-ctx.Done(): + h.conn.Respond(req.ID, nil, ctx.Err()) + case <-time.After(time.Second): + h.conn.Respond(req.ID, nil, errors.Errorf("wait for %q timed out", name)) + } + }() + return nil, jsonrpc2.ErrAsyncResponse + default: + return nil, jsonrpc2.ErrNotHandled + } +} diff --git a/internal/jsonrpc2_v2/messages.go b/internal/jsonrpc2_v2/messages.go new file mode 100644 index 00000000000..652ac817a37 --- /dev/null +++ b/internal/jsonrpc2_v2/messages.go @@ -0,0 +1,181 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2 + +import ( + "encoding/json" + + errors "golang.org/x/xerrors" +) + +// ID is a Request identifier. +type ID struct { + value interface{} +} + +// Message is the interface to all jsonrpc2 message types. +// They share no common functionality, but are a closed set of concrete types +// that are allowed to implement this interface. The message types are *Request +// and *Response. +type Message interface { + // marshal builds the wire form from the API form. + // It is private, which makes the set of Message implementations closed. + marshal(to *wireCombined) +} + +// Request is a Message sent to a peer to request behavior. +// If it has an ID it is a call, otherwise it is a notification. +type Request struct { + // ID of this request, used to tie the Response back to the request. + // This will be nil for notifications. + ID ID + // Method is a string containing the method name to invoke. + Method string + // Params is either a struct or an array with the parameters of the method. + Params json.RawMessage +} + +// Response is a Message used as a reply to a call Request. +// It will have the same ID as the call it is a response to. +type Response struct { + // result is the content of the response. + Result json.RawMessage + // err is set only if the call failed. + Error error + // id of the request this is a response to. + ID ID +} + +// StringID creates a new string request identifier. +func StringID(s string) ID { return ID{value: s} } + +// Int64ID creates a new integer request identifier. +func Int64ID(i int64) ID { return ID{value: i} } + +// IsValid returns true if the ID is a valid identifier. +// The default value for ID will return false. +func (id ID) IsValid() bool { return id.value != nil } + +// Raw returns the underlying value of the ID. +func (id ID) Raw() interface{} { return id.value } + +// NewNotification constructs a new Notification message for the supplied +// method and parameters. +func NewNotification(method string, params interface{}) (*Request, error) { + p, merr := marshalToRaw(params) + return &Request{Method: method, Params: p}, merr +} + +// NewCall constructs a new Call message for the supplied ID, method and +// parameters. +func NewCall(id ID, method string, params interface{}) (*Request, error) { + p, merr := marshalToRaw(params) + return &Request{ID: id, Method: method, Params: p}, merr +} + +func (msg *Request) IsCall() bool { return msg.ID.IsValid() } + +func (msg *Request) marshal(to *wireCombined) { + to.ID = msg.ID.value + to.Method = msg.Method + to.Params = msg.Params +} + +// NewResponse constructs a new Response message that is a reply to the +// supplied. If err is set result may be ignored. +func NewResponse(id ID, result interface{}, rerr error) (*Response, error) { + r, merr := marshalToRaw(result) + return &Response{ID: id, Result: r, Error: rerr}, merr +} + +func (msg *Response) marshal(to *wireCombined) { + to.ID = msg.ID.value + to.Error = toWireError(msg.Error) + to.Result = msg.Result +} + +func toWireError(err error) *wireError { + if err == nil { + // no error, the response is complete + return nil + } + if err, ok := err.(*wireError); ok { + // already a wire error, just use it + return err + } + result := &wireError{Message: err.Error()} + var wrapped *wireError + if errors.As(err, &wrapped) { + // if we wrapped a wire error, keep the code from the wrapped error + // but the message from the outer error + result.Code = wrapped.Code + } + return result +} + +func EncodeMessage(msg Message) ([]byte, error) { + wire := wireCombined{VersionTag: wireVersion} + msg.marshal(&wire) + data, err := json.Marshal(&wire) + if err != nil { + return data, errors.Errorf("marshaling jsonrpc message: %w", err) + } + return data, nil +} + +func DecodeMessage(data []byte) (Message, error) { + msg := wireCombined{} + if err := json.Unmarshal(data, &msg); err != nil { + return nil, errors.Errorf("unmarshaling jsonrpc message: %w", err) + } + if msg.VersionTag != wireVersion { + return nil, errors.Errorf("invalid message version tag %s expected %s", msg.VersionTag, wireVersion) + } + id := ID{} + switch v := msg.ID.(type) { + case nil: + case float64: + // coerce the id type to int64 if it is float64, the spec does not allow fractional parts + id = Int64ID(int64(v)) + case int64: + id = Int64ID(v) + case string: + id = StringID(v) + default: + return nil, errors.Errorf("invalid message id type <%T>%v", v, v) + } + if msg.Method != "" { + // has a method, must be a call + return &Request{ + Method: msg.Method, + ID: id, + Params: msg.Params, + }, nil + } + // no method, should be a response + if !id.IsValid() { + return nil, ErrInvalidRequest + } + resp := &Response{ + ID: id, + Result: msg.Result, + } + // we have to check if msg.Error is nil to avoid a typed error + if msg.Error != nil { + resp.Error = msg.Error + } + return resp, nil +} + +func marshalToRaw(obj interface{}) (json.RawMessage, error) { + if obj == nil { + return nil, nil + } + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + return json.RawMessage(data), nil +} diff --git a/internal/jsonrpc2_v2/net.go b/internal/jsonrpc2_v2/net.go new file mode 100644 index 00000000000..c8cfaab40ba --- /dev/null +++ b/internal/jsonrpc2_v2/net.go @@ -0,0 +1,129 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2 + +import ( + "context" + "io" + "net" + "os" + "time" +) + +// This file contains implementations of the transport primitives that use the standard network +// package. + +// NetListenOptions is the optional arguments to the NetListen function. +type NetListenOptions struct { + NetListenConfig net.ListenConfig + NetDialer net.Dialer +} + +// NetListener returns a new Listener that listents on a socket using the net package. +func NetListener(ctx context.Context, network, address string, options NetListenOptions) (Listener, error) { + ln, err := options.NetListenConfig.Listen(ctx, network, address) + if err != nil { + return nil, err + } + return &netListener{net: ln}, nil +} + +// netListener is the implementation of Listener for connections made using the net package. +type netListener struct { + net net.Listener +} + +// Accept blocks waiting for an incoming connection to the listener. +func (l *netListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) { + return l.net.Accept() +} + +// Close will cause the listener to stop listening. It will not close any connections that have +// already been accepted. +func (l *netListener) Close() error { + addr := l.net.Addr() + err := l.net.Close() + if addr.Network() == "unix" { + rerr := os.Remove(addr.String()) + if rerr != nil && err == nil { + err = rerr + } + } + return err +} + +// Dialer returns a dialer that can be used to connect to the listener. +func (l *netListener) Dialer() Dialer { + return NetDialer(l.net.Addr().Network(), l.net.Addr().String(), net.Dialer{ + Timeout: 5 * time.Second, + }) +} + +// NetDialer returns a Dialer using the supplied standard network dialer. +func NetDialer(network, address string, nd net.Dialer) Dialer { + return &netDialer{ + network: network, + address: address, + dialer: nd, + } +} + +type netDialer struct { + network string + address string + dialer net.Dialer +} + +func (n *netDialer) Dial(ctx context.Context) (io.ReadWriteCloser, error) { + return n.dialer.DialContext(ctx, n.network, n.address) +} + +// NetPipe returns a new Listener that listens using net.Pipe. +// It is only possibly to connect to it using the Dialier returned by the +// Dialer method, each call to that method will generate a new pipe the other +// side of which will be returnd from the Accept call. +func NetPipe(ctx context.Context) (Listener, error) { + return &netPiper{ + done: make(chan struct{}), + dialed: make(chan io.ReadWriteCloser), + }, nil +} + +// netPiper is the implementation of Listener build on top of net.Pipes. +type netPiper struct { + done chan struct{} + dialed chan io.ReadWriteCloser +} + +// Accept blocks waiting for an incoming connection to the listener. +func (l *netPiper) Accept(ctx context.Context) (io.ReadWriteCloser, error) { + // block until we have a listener, or are closed or cancelled + select { + case rwc := <-l.dialed: + return rwc, nil + case <-l.done: + return nil, io.EOF + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Close will cause the listener to stop listening. It will not close any connections that have +// already been accepted. +func (l *netPiper) Close() error { + // unblock any accept calls that are pending + close(l.done) + return nil +} + +func (l *netPiper) Dialer() Dialer { + return l +} + +func (l *netPiper) Dial(ctx context.Context) (io.ReadWriteCloser, error) { + client, server := net.Pipe() + l.dialed <- server + return client, nil +} diff --git a/internal/jsonrpc2_v2/serve.go b/internal/jsonrpc2_v2/serve.go new file mode 100644 index 00000000000..1bac9740a08 --- /dev/null +++ b/internal/jsonrpc2_v2/serve.go @@ -0,0 +1,208 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2 + +import ( + "context" + "io" + "time" + + "golang.org/x/tools/internal/event" + errors "golang.org/x/xerrors" +) + +// Listener is implemented by protocols to accept new inbound connections. +type Listener interface { + // Accept an inbound connection to a server. + // It must block until an inbound connection is made, or the listener is + // shut down. + Accept(context.Context) (io.ReadWriteCloser, error) + + // Close is used to ask a listener to stop accepting new connections. + Close() error + + // Dialer returns a dialer that can be used to connect to this listener + // locally. + // If a listener does not implement this it will return a nil. + Dialer() Dialer +} + +// Dialer is used by clients to dial a server. +type Dialer interface { + // Dial returns a new communication byte stream to a listening server. + Dial(ctx context.Context) (io.ReadWriteCloser, error) +} + +// Server is a running server that is accepting incoming connections. +type Server struct { + listener Listener + binder Binder + options ServeOptions // a copy of the config that started this server + async async +} + +// ServeOptions holds the options to the Serve function. +//TODO: kill ServeOptions and push timeout into the listener +type ServeOptions struct { + // IdleTimeout is the maximum amount of time to remain idle and running. + IdleTimeout time.Duration +} + +// Dial uses the dialer to make a new connection, wraps the returned +// reader and writer using the framer to make a stream, and then builds +// a connection on top of that stream using the binder. +func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) { + // dial a server + rwc, err := dialer.Dial(ctx) + if err != nil { + return nil, err + } + return newConnection(ctx, rwc, binder) +} + +// Serve starts a new server listening for incoming connections and returns +// it. +// This returns a fully running and connected server, it does not block on +// the listener. +// You can call Wait to block on the server, or Shutdown to get the sever to +// terminate gracefully. +// To notice incoming connections, use an intercepting Binder. +func Serve(ctx context.Context, listener Listener, binder Binder, options ServeOptions) (*Server, error) { + server := &Server{ + listener: listener, + binder: binder, + options: options, + } + server.async.init() + go server.run(ctx) + return server, nil +} + +// Wait returns only when the server has shut down. +func (s *Server) Wait() error { + return s.async.wait() +} + +// run accepts incoming connections from the listener, +// If IdleTimeout is non-zero, run exits after there are no clients for this +// duration, otherwise it exits only on error. +func (s *Server) run(ctx context.Context) { + defer s.async.done() + // Max duration: ~290 years; surely that's long enough. + const forever = 1<<63 - 1 + idleTimeout := s.options.IdleTimeout + if idleTimeout <= 0 { + idleTimeout = forever + } + idleTimer := time.NewTimer(idleTimeout) + + // run a goroutine that listens for incoming connections and posts them + // back to the worker + newStreams := make(chan io.ReadWriteCloser) + go func() { + for { + // we never close the accepted connection, we rely on the other end + // closing or the socket closing itself naturally + rwc, err := s.listener.Accept(ctx) + if err != nil { + if !isClosingError(err) { + event.Error(ctx, "Accept", err) + } + // signal we are done generating new connections for good + close(newStreams) + return + } + newStreams <- rwc + } + }() + + closedConns := make(chan struct{}) + activeConns := 0 + lnClosed := false + for { + select { + case rwc := <-newStreams: + // whatever happes we are not idle anymore + idleTimer.Stop() + if rwc == nil { + // the net listener has been closed + lnClosed = true + if activeConns == 0 { + // accept is done and there are no active connections, so just stop now + return + } + // replace the channel with one that will never trigger + // this is save because the only writer has already quit + newStreams = nil + // and then wait for all active connections to stop + continue + } + // a new inbound connection, + conn, err := newConnection(ctx, rwc, s.binder) + if err != nil { + if !isClosingError(err) { + event.Error(ctx, "NewConn", err) + } + continue + } + // register the new conn as active + activeConns++ + // wrap the conn in a close monitor + //TODO: we do this to maintain our active count correctly, is there a better way? + go func() { + err := conn.Wait() + if err != nil && !isClosingError(err) { + event.Error(ctx, "closed a connection", err) + } + closedConns <- struct{}{} + }() + case <-closedConns: + activeConns-- + if activeConns == 0 { + // no more active connections, restart the idle timer + if lnClosed { + // we can never get a new connection, so we are done + return + } + // we are idle, but might get a new connection still + idleTimer.Reset(idleTimeout) + } + case <-idleTimer.C: + // no activity for a while, time to stop serving + s.async.setError(ErrIdleTimeout) + return + case <-ctx.Done(): + s.async.setError(ctx.Err()) + return + } + } +} + +// isClosingError reports if the error occurs normally during the process of +// closing a network connection. It uses imperfect heuristics that err on the +// side of false negatives, and should not be used for anything critical. +func isClosingError(err error) bool { + if err == nil { + return false + } + // fully unwrap the error, so the following tests work + for wrapped := err; wrapped != nil; wrapped = errors.Unwrap(err) { + err = wrapped + } + + // was it based on an EOF error? + if err == io.EOF { + return true + } + + // Per https://github.com/golang/go/issues/4373, this error string should not + // change. This is not ideal, but since the worst that could happen here is + // some superfluous logging, it is acceptable. + if err.Error() == "use of closed network connection" { + return true + } + + return false +} diff --git a/internal/jsonrpc2_v2/serve_test.go b/internal/jsonrpc2_v2/serve_test.go new file mode 100644 index 00000000000..1b6b3b239a6 --- /dev/null +++ b/internal/jsonrpc2_v2/serve_test.go @@ -0,0 +1,144 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2_test + +import ( + "context" + "errors" + "testing" + "time" + + jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2" + "golang.org/x/tools/internal/stack/stacktest" +) + +func TestIdleTimeout(t *testing.T) { + stacktest.NoLeak(t) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + listener, err := jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{}) + if err != nil { + t.Fatal(err) + } + defer listener.Close() + + server, err := jsonrpc2.Serve(ctx, listener, jsonrpc2.ConnectionOptions{}, + jsonrpc2.ServeOptions{ + IdleTimeout: 100 * time.Millisecond, + }) + if err != nil { + t.Fatal(err) + } + + connect := func() *jsonrpc2.Connection { + client, err := jsonrpc2.Dial(ctx, + listener.Dialer(), + jsonrpc2.ConnectionOptions{}) + if err != nil { + t.Fatal(err) + } + return client + } + // Exercise some connection/disconnection patterns, and then assert that when + // our timer fires, the server exits. + conn1 := connect() + conn2 := connect() + if err := conn1.Close(); err != nil { + t.Fatalf("conn1.Close failed with error: %v", err) + } + if err := conn2.Close(); err != nil { + t.Fatalf("conn2.Close failed with error: %v", err) + } + conn3 := connect() + if err := conn3.Close(); err != nil { + t.Fatalf("conn3.Close failed with error: %v", err) + } + + serverError := server.Wait() + + if !errors.Is(serverError, jsonrpc2.ErrIdleTimeout) { + t.Errorf("run() returned error %v, want %v", serverError, jsonrpc2.ErrIdleTimeout) + } +} + +type msg struct { + Msg string +} + +type fakeHandler struct{} + +func (fakeHandler) Handle(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + switch req.Method { + case "ping": + return &msg{"pong"}, nil + default: + return nil, jsonrpc2.ErrNotHandled + } +} + +func TestServe(t *testing.T) { + stacktest.NoLeak(t) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + tests := []struct { + name string + factory func(context.Context) (jsonrpc2.Listener, error) + }{ + {"tcp", func(ctx context.Context) (jsonrpc2.Listener, error) { + return jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{}) + }}, + {"pipe", func(ctx context.Context) (jsonrpc2.Listener, error) { + return jsonrpc2.NetPipe(ctx) + }}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fake, err := test.factory(ctx) + if err != nil { + t.Fatal(err) + } + conn, shutdown, err := newFake(ctx, fake) + if err != nil { + t.Fatal(err) + } + defer shutdown(ctx) + var got msg + if err := conn.Call(ctx, "ping", &msg{"ting"}).Await(ctx, &got); err != nil { + t.Fatal(err) + } + if want := "pong"; got.Msg != want { + t.Errorf("conn.Call(...): returned %q, want %q", got, want) + } + }) + } +} + +func newFake(ctx context.Context, l jsonrpc2.Listener) (*jsonrpc2.Connection, func(context.Context), error) { + server, err := jsonrpc2.Serve(ctx, l, jsonrpc2.ConnectionOptions{ + Handler: fakeHandler{}, + }, jsonrpc2.ServeOptions{ + IdleTimeout: 100 * time.Millisecond, + }) + if err != nil { + return nil, nil, err + } + + client, err := jsonrpc2.Dial(ctx, + l.Dialer(), + jsonrpc2.ConnectionOptions{ + Handler: fakeHandler{}, + }) + if err != nil { + return nil, nil, err + } + return client, func(ctx context.Context) { + l.Close() + client.Close() + server.Wait() + }, nil +} diff --git a/internal/jsonrpc2_v2/wire.go b/internal/jsonrpc2_v2/wire.go new file mode 100644 index 00000000000..97b1ae8d621 --- /dev/null +++ b/internal/jsonrpc2_v2/wire.go @@ -0,0 +1,74 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2 + +import ( + "encoding/json" +) + +// This file contains the go forms of the wire specification. +// see http://www.jsonrpc.org/specification for details + +var ( + // ErrUnknown should be used for all non coded errors. + ErrUnknown = NewError(-32001, "JSON RPC unknown error") + // ErrParse is used when invalid JSON was received by the server. + ErrParse = NewError(-32700, "JSON RPC parse error") + // ErrInvalidRequest is used when the JSON sent is not a valid Request object. + ErrInvalidRequest = NewError(-32600, "JSON RPC invalid request") + // ErrMethodNotFound should be returned by the handler when the method does + // not exist / is not available. + ErrMethodNotFound = NewError(-32601, "JSON RPC method not found") + // ErrInvalidParams should be returned by the handler when method + // parameter(s) were invalid. + ErrInvalidParams = NewError(-32602, "JSON RPC invalid params") + // ErrInternal indicates a failure to process a call correctly + ErrInternal = NewError(-32603, "JSON RPC internal error") + + // The following errors are not part of the json specification, but + // compliant extensions specific to this implimentation. + + // ErrServerOverloaded is returned when a message was refused due to a + // server being temporarily unable to accept any new messages. + ErrServerOverloaded = NewError(-32000, "JSON RPC overloaded") +) + +const wireVersion = "2.0" + +// wireCombined has all the fields of both Request and Response. +// We can decode this and then work out which it is. +type wireCombined struct { + VersionTag string `json:"jsonrpc"` + ID interface{} `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *wireError `json:"error,omitempty"` +} + +// wireError represents a structured error in a Response. +type wireError struct { + // Code is an error code indicating the type of failure. + Code int64 `json:"code"` + // Message is a short description of the error. + Message string `json:"message"` + // Data is optional structured data containing additional information about the error. + Data json.RawMessage `json:"data,omitempty"` +} + +// NewError returns an error that will encode on the wire correctly. +// The standard codes are made available from this package, this function should +// only be used to build errors for application specific codes as allowed by the +// specification. +func NewError(code int64, message string) error { + return &wireError{ + Code: code, + Message: message, + } +} + +func (err *wireError) Error() string { + return err.Message +} diff --git a/internal/jsonrpc2_v2/wire_test.go b/internal/jsonrpc2_v2/wire_test.go new file mode 100644 index 00000000000..e9337373239 --- /dev/null +++ b/internal/jsonrpc2_v2/wire_test.go @@ -0,0 +1,118 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package jsonrpc2_test + +import ( + "bytes" + "encoding/json" + "reflect" + "testing" + + jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2" +) + +func TestWireMessage(t *testing.T) { + for _, test := range []struct { + name string + msg jsonrpc2.Message + encoded []byte + }{{ + name: "notification", + msg: newNotification("alive", nil), + encoded: []byte(`{"jsonrpc":"2.0","method":"alive"}`), + }, { + name: "call", + msg: newCall("msg1", "ping", nil), + encoded: []byte(`{"jsonrpc":"2.0","id":"msg1","method":"ping"}`), + }, { + name: "response", + msg: newResponse("msg2", "pong", nil), + encoded: []byte(`{"jsonrpc":"2.0","id":"msg2","result":"pong"}`), + }, { + name: "numerical id", + msg: newCall(1, "poke", nil), + encoded: []byte(`{"jsonrpc":"2.0","id":1,"method":"poke"}`), + }, { + // originally reported in #39719, this checks that result is not present if + // it is an error response + name: "computing fix edits", + msg: newResponse(3, nil, jsonrpc2.NewError(0, "computing fix edits")), + encoded: []byte(`{ + "jsonrpc":"2.0", + "id":3, + "error":{ + "code":0, + "message":"computing fix edits" + } + }`), + }} { + b, err := jsonrpc2.EncodeMessage(test.msg) + if err != nil { + t.Fatal(err) + } + checkJSON(t, b, test.encoded) + msg, err := jsonrpc2.DecodeMessage(test.encoded) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(msg, test.msg) { + t.Errorf("decoded message does not match\nGot:\n%+#v\nWant:\n%+#v", msg, test.msg) + } + } +} + +func newNotification(method string, params interface{}) jsonrpc2.Message { + msg, err := jsonrpc2.NewNotification(method, params) + if err != nil { + panic(err) + } + return msg +} + +func newID(id interface{}) jsonrpc2.ID { + switch v := id.(type) { + case nil: + return jsonrpc2.ID{} + case string: + return jsonrpc2.StringID(v) + case int: + return jsonrpc2.Int64ID(int64(v)) + case int64: + return jsonrpc2.Int64ID(v) + default: + panic("invalid ID type") + } +} + +func newCall(id interface{}, method string, params interface{}) jsonrpc2.Message { + msg, err := jsonrpc2.NewCall(newID(id), method, params) + if err != nil { + panic(err) + } + return msg +} + +func newResponse(id interface{}, result interface{}, rerr error) jsonrpc2.Message { + msg, err := jsonrpc2.NewResponse(newID(id), result, rerr) + if err != nil { + panic(err) + } + return msg +} + +func checkJSON(t *testing.T, got, want []byte) { + // compare the compact form, to allow for formatting differences + g := &bytes.Buffer{} + if err := json.Compact(g, []byte(got)); err != nil { + t.Fatal(err) + } + w := &bytes.Buffer{} + if err := json.Compact(w, []byte(want)); err != nil { + t.Fatal(err) + } + if g.String() != w.String() { + t.Errorf("encoded message does not match\nGot:\n%s\nWant:\n%s", g, w) + } +}