Skip to content

Commit

Permalink
dap: handle reverse request flow (e.g. RunInTerminal)
Browse files Browse the repository at this point in the history
This PR adds support for reverse requests that are
sent from the debug adapter to the client. Currently,
RunInTerminal is the only such reverse request.

This is a pre-work for the 'console' support
(golang/vscode-go#124) - we plan to use RunInTerminal
to ask the editor to run a command in the integrated or
external terminal while handling a launch request
with the console attribute.

Launch request was classified as a synchronous request
and was blocking ServeDAPCodec loop, which means the
response message from the client couldn't be read until
onLaunchRequest returns. This PR adds two goroutines -
one to handle requests from the client (loopHandleRequests),
and another to handle responses (loopHandleResponses).
serveDAPCodec will keep read DAP messages from the net.Conn,
but delegate message handling to the goroutines through
buffered channels.

Alternatively, I tried to avoid introducing goroutines
by making onLaunchRequest asynchronously complete the launch
if integrated or external console mode is chosen. I.e.,
make onLaunchRequest return without sending LaunchResponse,
free the ServeDAPCodec loop, and return LaunchResponse (or
ErrorResponse) when RunInTerminal response is received.
But it was hard to follow, didn't look like a typical
go program, and wasn't extensible when DAP adds more
reverse requests or we ever want to utilize RunInTerminal
while handling other requests.

For reverse requests, we maintain a pendingReverseReq map for
requests sent to the client. When response messages arrive
we look up the map, and notify the waiters using a buffered
channel.

onLaunchRequest uses RunInTerminal if the launch request
has "integrated" or "external" console attribute and asks
the client to run a bogus command - this is for testing.
The follow up PRs will implement the real command that
starts a delve and use the command from the integrated
or external terminal.
  • Loading branch information
hyangah committed Jul 2, 2021
1 parent 9dfd164 commit 27c67e2
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 16 deletions.
37 changes: 37 additions & 0 deletions service/dap/daptest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,40 @@ func (c *Client) newRequest(command string) *dap.Request {
c.seq++
return request
}

func (c *Client) newResponse(command string, reqSeq int, err error) *dap.Response {
response := &dap.Response{}
response.Type = "response"
response.Command = command
response.RequestSeq = reqSeq
response.Success = err == nil
response.Message = err.Error()
return response
}

// ExpectRunInTerminalRequest reads a protocol message from the connection
// and fails the test if the read message is not *RunInTerminalRequest.
func (c *Client) ExpectRunInTerminalRequest(t *testing.T) *dap.RunInTerminalRequest {
t.Helper()
m := c.ExpectMessage(t)
return c.CheckRunInTerminalRequest(t, m)
}

// CheckRunInTerminalResponse fails the test if m is not *RunInTerminalResponse.
func (c *Client) CheckRunInTerminalRequest(t *testing.T, m dap.Message) *dap.RunInTerminalRequest {
t.Helper()
r, ok := m.(*dap.RunInTerminalRequest)
if !ok {
t.Fatalf("got %#v, want *dap.RunInTerminalRequest", m)
}
return r
}

func (c *Client) RunInTerminalResponse(seq, processId int, err error) {
c.send(&dap.RunInTerminalResponse{
Response: *c.newResponse("runInTerminal", seq, err),
Body: dap.RunInTerminalResponseBody{
ProcessId: processId,
},
})
}
163 changes: 151 additions & 12 deletions service/dap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package dap
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -27,6 +28,8 @@ import (
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/go-delve/delve/pkg/gobuild"
"github.com/go-delve/delve/pkg/goversion"
Expand Down Expand Up @@ -125,9 +128,15 @@ type Server struct {
// noDebugProcess is set for the noDebug launch process.
noDebugProcess *exec.Cmd

// sendingMu synchronizes writing to net.Conn
// sendingMu synchronizes writing to net.Conn and pendingReverseReq
// to ensure that messages do not get interleaved
sendingMu sync.Mutex

// map sequence number to the pending reverse request notification channel
pendingReverseReq map[int]chan<- dap.ResponseMessage

// sequence number of the last sent request. Incremented atomically.
reverseReqSeq int32
}

// launchAttachArgs captures arguments from launch/attach request that
Expand Down Expand Up @@ -209,6 +218,7 @@ func NewServer(config *service.Config) *Server {
variableHandles: newVariablesHandlesMap(),
args: defaultArgs,
exceptionErr: nil,
pendingReverseReq: make(map[int]chan<- dap.ResponseMessage),
}
}

Expand Down Expand Up @@ -351,13 +361,41 @@ func (s *Server) Run() {
}()
}

func (s *Server) loopHandleRequests(requests <-chan dap.RequestMessage) {
for {
select {
case req := <-requests:
s.handleRequest(req)
case <-s.stopTriggered:
return
}
}
}

func (s *Server) loopHandleResponses(responses <-chan dap.ResponseMessage) {
for {
select {
case res := <-responses:
s.handleResponse(res)
case <-s.stopTriggered:
return
}
}
}

// serveDAPCodec reads and decodes requests from the client
// until it encounters an error or EOF, when it sends
// a disconnect signal and returns.
func (s *Server) serveDAPCodec() {
requests := make(chan dap.RequestMessage, 10)
responses := make(chan dap.ResponseMessage, 10)

go s.loopHandleRequests(requests)
go s.loopHandleResponses(responses)

s.reader = bufio.NewReader(s.conn)
for {
request, err := dap.ReadProtocolMessage(s.reader)
message, err := dap.ReadProtocolMessage(s.reader)
// TODO(polina): Differentiate between errors and handle them
// gracefully. For example,
// -- "Request command 'foo' is not supported" means we
Expand All @@ -376,7 +414,20 @@ func (s *Server) serveDAPCodec() {
}
return
}
s.handleRequest(request)

jsonmsg, _ := json.Marshal(message)
s.log.Debug("[<- from client]", string(jsonmsg))

switch m := message.(type) {
case dap.RequestMessage:
requests <- m
case dap.ResponseMessage:
responses <- m
case dap.EventMessage:
s.sendInternalErrorResponse(message.GetSeq(), fmt.Sprintf("Unable to process event message %#v\n", m))
default:
s.sendInternalErrorResponse(message.GetSeq(), fmt.Sprintf("Unable to process message (type %T) %#v", m, m))
}
}
}

Expand All @@ -390,16 +441,19 @@ func (s *Server) recoverPanic(request dap.Message) {
}
}

func (s *Server) handleRequest(request dap.Message) {
defer s.recoverPanic(request)

jsonmsg, _ := json.Marshal(request)
s.log.Debug("[<- from client]", string(jsonmsg))
func (s *Server) handleResponse(response dap.ResponseMessage) {
defer s.recoverPanic(response)

if _, ok := request.(dap.RequestMessage); !ok {
s.sendInternalErrorResponse(request.GetSeq(), fmt.Sprintf("Unable to process non-request %#v\n", request))
resCh, ok := s.lookupPendingRequest(response)
if !ok {
s.log.Errorf("Dropping unexpected response message %#v\n", response)
return
}
resCh <- response
}

func (s *Server) handleRequest(request dap.RequestMessage) {
defer s.recoverPanic(request)

// These requests, can be handled regardless of whether the targret is running
switch request := request.(type) {
Expand Down Expand Up @@ -485,8 +539,8 @@ func (s *Server) handleRequest(request dap.Message) {
}
s.onSetFunctionBreakpointsRequest(request)
default:
r := request.(dap.RequestMessage).GetRequest()
s.sendErrorResponse(*r, DebuggeeIsRunning, fmt.Sprintf("Unable to process `%s`", r.Command), "debuggee is running")
r := request.GetRequest()
s.sendErrorResponse(*r, DebuggeeIsRunning, fmt.Sprintf("Unable to process %q", r.Command), "debuggee is running")
}
return
}
Expand Down Expand Up @@ -665,6 +719,34 @@ func (s *Server) send(message dap.Message) {
_ = dap.WriteProtocolMessage(s.conn, message)
}

// sendRequest sends a request to the client. The caller can wait on the
// returned buffered channel for the response.
func (s *Server) sendRequest(req dap.RequestMessage) <-chan dap.ResponseMessage {
res := make(chan dap.ResponseMessage, 1)
seq := int(atomic.AddInt32(&s.reverseReqSeq, 1))
req.GetRequest().Seq = seq

jsonmsg, _ := json.Marshal(req)
s.log.Debug("[-> to client]", string(jsonmsg))

s.sendingMu.Lock()
defer s.sendingMu.Unlock()
s.pendingReverseReq[seq] = res
_ = dap.WriteProtocolMessage(s.conn, req)
return res
}

func (s *Server) lookupPendingRequest(res dap.ResponseMessage) (chan<- dap.ResponseMessage, bool) {
seq := res.GetResponse().RequestSeq
s.sendingMu.Lock()
defer s.sendingMu.Unlock()
ch, ok := s.pendingReverseReq[seq]
if ok {
delete(s.pendingReverseReq, seq)
}
return ch, ok
}

func (s *Server) logToConsole(msg string) {
s.send(&dap.OutputEvent{
Event: *newEvent("output"),
Expand Down Expand Up @@ -734,7 +816,35 @@ func cleanExeName(name string) string {
return name
}

func (s *Server) runInTerminal(ctx context.Context, args dap.RunInTerminalRequestArguments) (*dap.RunInTerminalResponse, error) {
respCh := s.sendRequest(&dap.RunInTerminalRequest{
Request: dap.Request{
ProtocolMessage: dap.ProtocolMessage{Type: "request"},
Command: "runInTerminal",
},
Arguments: args,
})
select {
case resp := <-respCh:
switch r := resp.(type) {
case *dap.RunInTerminalResponse: // Success == true
return r, nil
case *dap.ErrorResponse: // Success == false
return nil, fmt.Errorf("run in terminal command failed: %v (%v)", r.Message, r.Body.Error.Format)
default:
return nil, fmt.Errorf("got an unexpected response: %#v", resp)
}
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (s *Server) onLaunchRequest(request *dap.LaunchRequest) {
name, ok := request.Arguments["name"].(string)
if !ok || name == "" {
name = "Go Debug"
}

// Validate launch request mode
mode, ok := request.Arguments["mode"]
if !ok || mode == "" {
Expand All @@ -746,6 +856,35 @@ func (s *Server) onLaunchRequest(request *dap.LaunchRequest) {
fmt.Sprintf("Unsupported 'mode' value %q in debug configuration.", mode))
return
}
// If 'console' is set to integrated or external, we will launch
// the debugger&debugee using RunInTerminal.
if console, ok := request.Arguments["console"].(string); ok && console != "" {
if console != "integrated" && console != "external" {
s.sendErrorResponse(request.Request,
FailedToLaunch, "Failed to launch",
fmt.Sprintf("invalid value for console attribute: %q", console))
return
}
if !s.clientCapabilities.supportsRunInTerminalRequest {
s.sendErrorResponse(request.Request,
FailedToLaunch, "Failed to launch",
fmt.Sprintf("client does not support RunInTerminal feature necessary for console=%q", console))
return
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
// TODO(hyangah): finish implementing the 'console' mode.
_, err := s.runInTerminal(ctx, dap.RunInTerminalRequestArguments{
Kind: console,
Title: name,
Args: []string{"console_attribute_is_not_supported"},
})
if err != nil {
s.sendErrorResponse(request.Request, FailedToLaunch, "Failed to launch",
fmt.Sprintf("could not set up debugging using 'console' attribute: %v", err))
return
}
}

// TODO(polina): Respond with an error if debug session is in progress?
program, ok := request.Arguments["program"].(string)
Expand Down
Loading

0 comments on commit 27c67e2

Please sign in to comment.