Skip to content

[release/6.0] [wasm][debugger] Fixing the race condition while modifying pending_ops #80190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions src/mono/wasm/debugger/BrowserDebugProxy/DevToolsProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
Expand All @@ -24,14 +25,19 @@ internal class DevToolsProxy
private ClientWebSocket browser;
private WebSocket ide;
private int next_cmd_id;
private List<Task> pending_ops = new List<Task>();
private readonly ChannelWriter<Task> _channelWriter;
private readonly ChannelReader<Task> _channelReader;
private List<DevToolsQueue> queues = new List<DevToolsQueue>();

protected readonly ILogger logger;

public DevToolsProxy(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<DevToolsProxy>();

var channel = Channel.CreateUnbounded<Task>(new UnboundedChannelOptions { SingleReader = true });
_channelWriter = channel.Writer;
_channelReader = channel.Reader;
}

protected virtual Task<bool> AcceptEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
Expand Down Expand Up @@ -82,7 +88,7 @@ private DevToolsQueue GetQueueForTask(Task task)
return queues.FirstOrDefault(q => q.CurrentSend == task);
}

private void Send(WebSocket to, JObject o, CancellationToken token)
private async Task Send(WebSocket to, JObject o, CancellationToken token)
{
string sender = browser == to ? "Send-browser" : "Send-ide";

Expand All @@ -95,7 +101,7 @@ private void Send(WebSocket to, JObject o, CancellationToken token)

Task task = queue.Send(bytes, token);
if (task != null)
pending_ops.Add(task);
await _channelWriter.WriteAsync(task, token);
}

private async Task OnEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
Expand All @@ -105,7 +111,7 @@ private async Task OnEvent(SessionId sessionId, string method, JObject args, Can
if (!await AcceptEvent(sessionId, method, args, token))
{
//logger.LogDebug ("proxy browser: {0}::{1}",method, args);
SendEventInternal(sessionId, method, args, token);
await SendEventInternal(sessionId, method, args, token);
}
}
catch (Exception e)
Expand All @@ -121,7 +127,7 @@ private async Task OnCommand(MessageId id, string method, JObject args, Cancella
if (!await AcceptCommand(id, method, args, token))
{
Result res = await SendCommandInternal(id, method, args, token);
SendResponseInternal(id, res, token);
await SendResponseInternal(id, res, token);
}
}
catch (Exception e)
Expand All @@ -142,7 +148,7 @@ private void OnResponse(MessageId id, Result result)
logger.LogError("Cannot respond to command: {id} with result: {result} - command is not pending", id, result);
}

private void ProcessBrowserMessage(string msg, CancellationToken token)
private Task ProcessBrowserMessage(string msg, CancellationToken token)
{
var res = JObject.Parse(msg);

Expand All @@ -151,23 +157,30 @@ private void ProcessBrowserMessage(string msg, CancellationToken token)
Log("protocol", $"browser: {msg}");

if (res["id"] == null)
pending_ops.Add(OnEvent(res.ToObject<SessionId>(), res["method"].Value<string>(), res["params"] as JObject, token));
{
return OnEvent(res.ToObject<SessionId>(), res["method"].Value<string>(), res["params"] as JObject, token);
}
else
{
OnResponse(res.ToObject<MessageId>(), Result.FromJson(res));
return null;
}
}

private void ProcessIdeMessage(string msg, CancellationToken token)
private Task ProcessIdeMessage(string msg, CancellationToken token)
{
Log("protocol", $"ide: {msg}");
if (!string.IsNullOrEmpty(msg))
{
var res = JObject.Parse(msg);
var id = res.ToObject<MessageId>();
pending_ops.Add(OnCommand(
return OnCommand(
id,
res["method"].Value<string>(),
res["params"] as JObject, token));
res["params"] as JObject, token);
}

return null;
}

internal async Task<Result> SendCommand(SessionId id, string method, JObject args, CancellationToken token)
Expand All @@ -176,7 +189,7 @@ internal async Task<Result> SendCommand(SessionId id, string method, JObject arg
return await SendCommandInternal(id, method, args, token);
}

private Task<Result> SendCommandInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
private async Task<Result> SendCommandInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
{
int id = Interlocked.Increment(ref next_cmd_id);

Expand All @@ -194,17 +207,17 @@ private Task<Result> SendCommandInternal(SessionId sessionId, string method, JOb
//Log ("verbose", $"add cmd id {sessionId}-{id}");
pending_cmds[msgId] = tcs;

Send(this.browser, o, token);
return tcs.Task;
await Send(browser, o, token);
return await tcs.Task;
}

public void SendEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
public Task SendEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
{
//Log ("verbose", $"sending event {method}: {args}");
SendEventInternal(sessionId, method, args, token);
return SendEventInternal(sessionId, method, args, token);
}

private void SendEventInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
private Task SendEventInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
{
var o = JObject.FromObject(new
{
Expand All @@ -214,21 +227,21 @@ private void SendEventInternal(SessionId sessionId, string method, JObject args,
if (sessionId.sessionId != null)
o["sessionId"] = sessionId.sessionId;

Send(this.ide, o, token);
return Send(ide, o, token);
}

internal void SendResponse(MessageId id, Result result, CancellationToken token)
{
SendResponseInternal(id, result, token);
}

private void SendResponseInternal(MessageId id, Result result, CancellationToken token)
private Task SendResponseInternal(MessageId id, Result result, CancellationToken token)
{
JObject o = result.ToJObject(id);
if (result.IsErr)
logger.LogError($"sending error response for id: {id} -> {result}");

Send(this.ide, o, token);
return Send(this.ide, o, token);
}

// , HttpContext context)
Expand All @@ -248,10 +261,14 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
Log("verbose", $"DevToolsProxy: Client connected on {browserUri}");
var x = new CancellationTokenSource();

List<Task> pending_ops = new();

pending_ops.Add(ReadOne(browser, x.Token));
pending_ops.Add(ReadOne(ide, x.Token));
pending_ops.Add(side_exception.Task);
pending_ops.Add(client_initiated_close.Task);
Task<bool> readerTask = _channelReader.WaitToReadAsync(x.Token).AsTask();
pending_ops.Add(readerTask);

try
{
Expand All @@ -268,14 +285,26 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
break;
}

if (readerTask.IsCompleted)
{
while (_channelReader.TryRead(out Task newTask))
{
pending_ops.Add(newTask);
}

pending_ops[4] = _channelReader.WaitToReadAsync(x.Token).AsTask();
}

//logger.LogTrace ("pump {0} {1}", task, pending_ops.IndexOf (task));
if (task == pending_ops[0])
{
string msg = ((Task<string>)task).Result;
if (msg != null)
{
pending_ops[0] = ReadOne(browser, x.Token); //queue next read
ProcessBrowserMessage(msg, x.Token);
Task newTask = ProcessBrowserMessage(msg, x.Token);
if (newTask != null)
pending_ops.Add(newTask);
}
}
else if (task == pending_ops[1])
Expand All @@ -284,7 +313,9 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
if (msg != null)
{
pending_ops[1] = ReadOne(ide, x.Token); //queue next read
ProcessIdeMessage(msg, x.Token);
Task newTask = ProcessIdeMessage(msg, x.Token);
if (newTask != null)
pending_ops.Add(newTask);
}
}
else if (task == pending_ops[2])
Expand All @@ -304,10 +335,13 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
}
}
}

_channelWriter.Complete();
}
catch (Exception e)
{
Log("error", $"DevToolsProxy::Run: Exception {e}");
_channelWriter.Complete(e);
//throw;
}
finally
Expand Down
12 changes: 6 additions & 6 deletions src/mono/wasm/debugger/BrowserDebugProxy/MonoProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected override async Task<bool> AcceptEvent(SessionId sessionId, string meth

case "Runtime.executionContextCreated":
{
SendEvent(sessionId, method, args, token);
await SendEvent(sessionId, method, args, token);
JToken ctx = args?["context"];
var aux_data = ctx?["auxData"] as JObject;
int id = ctx["id"].Value<int>();
Expand Down Expand Up @@ -831,7 +831,7 @@ private async Task<bool> SendCallStack(SessionId sessionId, ExecutionContext con
await SendCommand(sessionId, "Debugger.resume", new JObject(), token);
return true;
}
SendEvent(sessionId, "Debugger.paused", o, token);
await SendEvent(sessionId, "Debugger.paused", o, token);

return true;
}
Expand Down Expand Up @@ -940,7 +940,7 @@ internal async Task<MethodInfo> LoadSymbolsOnDemand(AssemblyInfo asm, int method
foreach (SourceFile source in asm.Sources)
{
var scriptSource = JObject.FromObject(source.ToScriptSource(context.Id, context.AuxData));
SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
await SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
}
return asm.GetMethodByToken(method_token);
}
Expand Down Expand Up @@ -1171,7 +1171,7 @@ private async Task OnSourceFileAdded(SessionId sessionId, SourceFile source, Exe
{
JObject scriptSource = JObject.FromObject(source.ToScriptSource(context.Id, context.AuxData));
Log("debug", $"sending {source.Url} {context.Id} {sessionId.sessionId}");
SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
await SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);

foreach (var req in context.BreakpointRequests.Values)
{
Expand Down Expand Up @@ -1253,7 +1253,7 @@ private async Task<DebugStore> RuntimeReady(SessionId sessionId, CancellationTok

DebugStore store = await LoadStore(sessionId, token);
context.ready.SetResult(store);
SendEvent(sessionId, "Mono.runtimeReady", new JObject(), token);
await SendEvent(sessionId, "Mono.runtimeReady", new JObject(), token);
SdbHelper.ResetStore(store);
return store;
}
Expand Down Expand Up @@ -1340,7 +1340,7 @@ private async Task SetBreakpoint(SessionId sessionId, DebugStore store, Breakpoi
};

if (sendResolvedEvent)
SendEvent(sessionId, "Debugger.breakpointResolved", JObject.FromObject(resolvedLocation), token);
await SendEvent(sessionId, "Debugger.breakpointResolved", JObject.FromObject(resolvedLocation), token);
}

req.Locations.AddRange(breakpoints);
Expand Down