Skip to content

Commit 824fc62

Browse files
authored
Use ConcurrentDictionary for storing reply packets and callbacks, to relax explicit lock usage (#416)
A PerfWatson issue shows a high amount of hits caused by the excessive locking when sending and receiving packets, that causes perf issues especially if a packet takes time to be received: https://devdiv.visualstudio.com/DevDiv/_workitems/edit/2400681 Using ConcurrentDictionary to store the packets and callbacks, let the dictionary implementation to handle the different access locks and ensure performance and efficiency. We still maintain the lock object but only for disconnection checks.
1 parent c035307 commit 824fc62

File tree

1 file changed

+76
-64
lines changed

1 file changed

+76
-64
lines changed

Mono.Debugger.Soft/Mono.Debugger.Soft/Connection.cs

Lines changed: 76 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Text;
88
using System.Diagnostics;
99
using Microsoft.FileFormats.PE;
10+
using System.Collections.Concurrent;
1011

1112
namespace Mono.Debugger.Soft
1213
{
@@ -1376,18 +1377,18 @@ public int Offset {
13761377

13771378
bool closed;
13781379
Thread receiver_thread;
1379-
Dictionary<int, byte[]> reply_packets;
1380-
Dictionary<int, ReplyCallback> reply_cbs;
1381-
Dictionary<int, int> reply_cb_counts;
1380+
ConcurrentDictionary<int, byte[]> reply_packets;
1381+
ConcurrentDictionary<int, ReplyCallback> reply_cbs;
1382+
ConcurrentDictionary<int, int> reply_cb_counts;
13821383
object reply_packets_monitor;
13831384

13841385
internal event EventHandler<ErrorHandlerEventArgs> ErrorHandler;
13851386

13861387
protected Connection () {
13871388
closed = false;
1388-
reply_packets = new Dictionary<int, byte[]> ();
1389-
reply_cbs = new Dictionary<int, ReplyCallback> ();
1390-
reply_cb_counts = new Dictionary<int, int> ();
1389+
reply_packets = new ConcurrentDictionary<int, byte[]> ();
1390+
reply_cbs = new ConcurrentDictionary<int, ReplyCallback> ();
1391+
reply_cb_counts = new ConcurrentDictionary<int, int> ();
13911392
reply_packets_monitor = new Object ();
13921393
if (EnableConnectionLogging) {
13931394
var path = Environment.GetEnvironmentVariable ("MONO_SDB_LOG");
@@ -1574,12 +1575,14 @@ void receiver_thread_main () {
15741575
}
15751576

15761577
void disconnected_check () {
1577-
if (!disconnected)
1578-
return;
1579-
else if (crashed != null)
1580-
throw crashed;
1581-
else
1582-
throw new VMDisconnectedException ();
1578+
lock (reply_packets_monitor) {
1579+
if (!disconnected)
1580+
return;
1581+
else if (crashed != null)
1582+
throw crashed;
1583+
else
1584+
throw new VMDisconnectedException ();
1585+
}
15831586
}
15841587

15851588
bool ReceivePacket () {
@@ -1591,24 +1594,25 @@ bool ReceivePacket () {
15911594

15921595
if (IsReplyPacket (packet)) {
15931596
int id = GetPacketId (packet);
1594-
ReplyCallback cb = null;
1595-
lock (reply_packets_monitor) {
1596-
reply_cbs.TryGetValue (id, out cb);
1597-
if (cb == null) {
1598-
reply_packets [id] = packet;
1599-
Monitor.PulseAll (reply_packets_monitor);
1600-
} else {
1601-
int c = reply_cb_counts [id];
1602-
c --;
1603-
if (c == 0) {
1604-
reply_cbs.Remove (id);
1605-
reply_cb_counts.Remove (id);
1606-
}
1597+
1598+
if (reply_cbs.TryGetValue (id, out var cb) && cb != null) {
1599+
reply_cb_counts.TryGetValue (id, out var c);
1600+
c--;
1601+
1602+
if (c == 0) {
1603+
reply_cbs.TryRemove (id, out _);
1604+
reply_cb_counts.TryRemove (id, out _);
16071605
}
1608-
}
16091606

1610-
if (cb != null)
16111607
cb.Invoke (id, packet);
1608+
} else {
1609+
//Better to do this way than AddOrUpdate, since the latter expects a value factory and value factories are not protected by the concurrent dictionary locks
1610+
if (reply_packets.TryGetValue (id, out var existingPacket)) {
1611+
reply_packets.TryUpdate (id, packet, comparisonValue: existingPacket);
1612+
} else {
1613+
reply_packets.TryAdd (id, packet);
1614+
}
1615+
}
16121616
} else {
16131617
PacketReader r = new PacketReader (this, packet);
16141618

@@ -1820,37 +1824,47 @@ public void StopBuffering () {
18201824
/* Send a request and call cb when a result is received */
18211825
int Send (CommandSet command_set, int command, PacketWriter packet, Action<PacketReader> cb, int count) {
18221826
int id = IdGenerator;
1823-
18241827
Stopwatch watch = null;
1828+
18251829
if (EnableConnectionLogging)
18261830
watch = Stopwatch.StartNew ();
18271831

18281832
byte[] encoded_packet;
1833+
18291834
if (packet == null)
18301835
encoded_packet = EncodePacket (id, (int)command_set, command, null, 0);
18311836
else
18321837
encoded_packet = EncodePacket (id, (int)command_set, command, packet.Data, packet.Offset);
18331838

18341839
if (cb != null) {
1835-
lock (reply_packets_monitor) {
1836-
reply_cbs [id] = delegate (int packet_id, byte[] p) {
1837-
if (EnableConnectionLogging)
1838-
LogPacket (packet_id, encoded_packet, p, command_set, command, watch);
1839-
/* Run the callback on a tp thread to avoid blocking the receive thread */
1840-
PacketReader r = new PacketReader (this, p);
1841-
ThreadPool.QueueUserWorkItem (s => {
1842-
// Catch all exceptions to revert to
1843-
// BeginInvoke behavior
1844-
try
1845-
{
1846-
cb (r);
1847-
}
1848-
catch
1849-
{
1850-
}
1851-
});
1852-
};
1853-
reply_cb_counts [id] = count;
1840+
ReplyCallback replyCb = delegate (int packet_id, byte[] p) {
1841+
if (EnableConnectionLogging)
1842+
LogPacket (packet_id, encoded_packet, p, command_set, command, watch);
1843+
/* Run the callback on a tp thread to avoid blocking the receive thread */
1844+
PacketReader r = new PacketReader (this, p);
1845+
1846+
ThreadPool.QueueUserWorkItem (s => {
1847+
// Catch all exceptions to revert to
1848+
// BeginInvoke behavior
1849+
try {
1850+
cb (r);
1851+
} catch {
1852+
}
1853+
});
1854+
};
1855+
1856+
//Better to do this way than AddOrUpdate, since the latter expects a value factory and value factories are not protected by the concurrent dictionary locks
1857+
1858+
if (reply_cbs.TryGetValue (id, out var currentCb)) {
1859+
reply_cbs.TryUpdate (id, replyCb, currentCb);
1860+
} else {
1861+
reply_cbs.TryAdd (id, replyCb);
1862+
}
1863+
1864+
if (reply_cb_counts.TryGetValue (id, out var currentCount)) {
1865+
reply_cb_counts.TryUpdate (id, count, currentCount);
1866+
} else {
1867+
reply_cb_counts.TryAdd (id, count);
18541868
}
18551869
}
18561870

@@ -1889,25 +1903,23 @@ PacketReader SendReceive (CommandSet command_set, int command, PacketWriter pack
18891903

18901904
/* Wait for the reply packet */
18911905
while (true) {
1892-
lock (reply_packets_monitor) {
1893-
byte[] reply;
1894-
if (reply_packets.TryGetValue (packetId, out reply)) {
1895-
reply_packets.Remove (packetId);
1896-
PacketReader r = new PacketReader (this, reply);
1897-
1898-
if (EnableConnectionLogging)
1899-
LogPacket (packetId, encoded_packet, reply, command_set, command, watch);
1900-
if (r.ErrorCode != 0) {
1901-
if (ErrorHandler != null)
1902-
ErrorHandler (this, new ErrorHandlerEventArgs () { ErrorCode = (ErrorCode)r.ErrorCode, ErrorMessage = r.ErrorMsg});
1903-
throw new NotImplementedException ("No error handler set.");
1904-
} else {
1905-
return r;
1906-
}
1906+
if (reply_packets.TryGetValue (packetId, out var reply)) {
1907+
reply_packets.TryRemove (packetId, out _);
1908+
1909+
PacketReader r = new PacketReader (this, reply);
1910+
1911+
if (EnableConnectionLogging)
1912+
LogPacket (packetId, encoded_packet, reply, command_set, command, watch);
1913+
if (r.ErrorCode != 0) {
1914+
if (ErrorHandler != null)
1915+
ErrorHandler (this, new ErrorHandlerEventArgs () { ErrorCode = (ErrorCode)r.ErrorCode, ErrorMessage = r.ErrorMsg });
1916+
1917+
throw new NotImplementedException ("No error handler set.");
19071918
} else {
1908-
disconnected_check ();
1909-
Monitor.Wait (reply_packets_monitor);
1919+
return r;
19101920
}
1921+
} else {
1922+
disconnected_check ();
19111923
}
19121924
}
19131925
}

0 commit comments

Comments
 (0)