Skip to content

Commit ef0558e

Browse files
committed
Remove locking and get rid of deadlocks!
1 parent 0bc36f4 commit ef0558e

File tree

2 files changed

+58
-84
lines changed

2 files changed

+58
-84
lines changed

pyjdb/pyjdwp.py

Lines changed: 37 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import pkg_resources
33
import pyparsing
4+
import Queue
45
import re
56
import socket
67
import struct
@@ -80,29 +81,23 @@ def __init__(self, host="localhost", port=5005, timeout=10):
8081
self.__request_id_generator = RequestIdGenerator()
8182
self.__event_cbs = []
8283
self.__conn = JdwpConnection(host, port, self.handle_packet)
83-
self.__event_cv = threading.Condition()
84-
self.__reply_cv = threading.Condition()
8584
self.__replies = {}
86-
self.__events = []
85+
self.__events = Queue.Queue()
8786
# background thread for calling self.__event_cbs as new events come in.
8887
# we use a separate thread for this so that JdwpConnection's
8988
# __reader_thread need not block while we handle events.
9089
self.__notifier_thread = threading.Thread(
9190
target = self.__event_notify_loop, name = "jdwp_event_notifier")
9291
self.__notifier_thread.setDaemon(True)
93-
# lock for synchronizing access to self.__notifier_running
94-
self.__running_lock = threading.Lock()
9592
logging.info("Jdwp object created")
9693

9794
def register_event_callback(self, event_cb):
9895
logging.info("Register event callback")
99-
with self.__event_cv:
100-
self.__event_cbs.append(event_cb)
96+
self.__event_cbs.append(event_cb)
10197

10298
def unregister_event_callback(self, event_cb):
10399
logging.info("Unregister event callback")
104-
with self.__event_cv:
105-
self.__event_cbs.remove(event_cb)
100+
self.__event_cbs.remove(event_cb)
106101

107102
def initialize(self):
108103
logging.info("Unregister event callback")
@@ -132,91 +127,58 @@ def command_request(self, command_set_name, command_name, data):
132127
return command.decode(reply_payload)
133128

134129
def disconnect(self):
130+
self.__notifier_running = False;
135131
self.__conn.disconnect()
136132

137133
def handle_packet(self, req_id, flags, err, payload):
138-
print("PACKET: %d, %d, %d, %d" % (req_id, flags, err, len(payload)))
139134
if err == 0x4064:
140-
with self.__event_cv:
141-
self.__events.append((req_id, payload))
142-
self.__event_cv.notify()
135+
self.__events.put((req_id, payload))
143136
return
144-
with self.__reply_cv:
145-
if req_id in self.__replies:
146-
raise Error("More than one reply packet for req_id %d" % req_id)
147-
self.__replies[req_id] = (err, payload)
148-
self.__reply_cv.notify()
149-
150-
def await_event(self, matcher_fn):
151-
cv = threading.Condition()
152-
found_events = []
153-
def callback(event, found=found_events):
154-
with cv:
155-
if matcher_fn(event):
156-
found.append(event)
157-
cv.notify()
158-
self.register_event_callback(callback)
159-
with cv:
160-
while not found_events:
161-
cv.wait(.1)
162-
self.unregister_event_callback(callback)
163-
return found_events[0]
137+
if req_id in self.__replies:
138+
raise Error("More than one reply packet for req_id %d" % req_id)
139+
self.__replies[req_id] = (err, payload)
164140

165141
def __event_notify_loop(self):
166142
while True:
167-
with self.__running_lock:
168-
if not self.__notifier_running:
169-
return
170-
with self.__event_cv:
171-
while not self.__events:
172-
self.__event_cv.wait(.1)
143+
if not self.__notifier_running:
144+
return
145+
if not self.__events.empty():
173146
self.__event_notify()
174147

175-
# this must only ever be called with a lock on self.__event_cv already held.
176148
def __event_notify(self):
177-
notified = []
178-
command = self.jdwp_spec.lookup_command("Event", "Composite")
179-
for jvm_req_id, event_payload in self.__events:
180-
event = command.decode(event_payload)
149+
while not self.__events.empty():
150+
jvm_req_id, event_payload = self.__events.get()
151+
event = self.__decode_event(event_payload)
181152
for event_cb in self.__event_cbs:
182-
try:
183-
event_cb(event)
184-
except Exception as e:
185-
print("Event notification failed for %s " % event_cb)
186-
print(e)
187-
# TODO(cgs): should we remove event_cb? think about this
188-
continue
189-
notified.append((jvm_req_id, event_payload))
190-
for entry in notified:
191-
self.__events.remove(entry)
153+
event_cb(event)
154+
155+
def __decode_event(self, event_payload):
156+
command = self.jdwp_spec.lookup_command("Event", "Composite")
157+
return command.decode(event_payload)
192158

193159
def __await_reply(self, req_id):
194160
"""Blocks until a reply is received for "req_id"; raises pyjdwp.Error
195161
if err != 0, returns reply otherwise"""
196162
start_time = time.time()
197-
with self.__reply_cv:
198-
while req_id not in self.__replies:
199-
if time.time() >= start_time + self.__timeout:
200-
raise Timeout("Timed out")
201-
self.__reply_cv.wait(.1)
202-
err, reply = self.__replies[req_id]
203-
del self.__replies[req_id]
163+
while req_id not in self.__replies:
164+
if time.time() >= start_time + self.__timeout:
165+
raise Timeout("Timed out")
166+
err, reply = self.__replies[req_id]
167+
del self.__replies[req_id]
204168
if err != 0:
205169
raise Error("JDWP error: %s" % err)
206170
return reply
207171

208172
def __await_vm_start(self):
209173
found_event = False
210-
with self.__event_cv:
211-
while not found_event:
212-
for jvm_req_id, payload in self.__events:
213-
if len(payload) < 6:
214-
continue
215-
_, _, event_kind = struct.unpack(">BIB", payload[0 : 6])
216-
if event_kind == 90: # vm_start
217-
found_event = True
218-
break
219-
self.__event_cv.wait(.1)
174+
while not found_event:
175+
jvm_req_id, payload = self.__events.get()
176+
if len(payload) < 6:
177+
raise Error("Unexpected event before jvm start: %s" % payload)
178+
_, _, event_kind = struct.unpack(">BIB", payload[0 : 6])
179+
if event_kind == 90: # vm_start
180+
found_event = True
181+
break
220182

221183
def __hardcoded_version_request(self):
222184
req_id = self.__request_id_generator.next_id
@@ -261,8 +223,6 @@ def __init__(self, host, port, packet_callback=None):
261223
self.__reader_thread = threading.Thread(
262224
target = self.__listen, name = "jdwp_listener")
263225
self.__reader_thread.setDaemon(True)
264-
# lock for synchronizing access to self.__listening
265-
self.__listening_lock = threading.Lock()
266226

267227
def initialize(self):
268228
logging.info("Initializing socket connection to jdwp host")
@@ -304,15 +264,13 @@ def send(self, req_id, cmd_set_id, cmd_id, payload=None):
304264

305265
def disconnect(self):
306266
self.__socket.close();
307-
with self.__listening_lock:
308-
self.__listening = False
267+
self.__listening = False
309268
self.__reader_thread.join(1.0)
310269

311270
def __listen(self):
312271
while True:
313-
with self.__listening_lock:
314-
if not self.__listening:
315-
return
272+
if not self.__listening:
273+
return
316274
try:
317275
header = self.__socket.recv(JDWP_PACKET_HEADER_LENGTH);
318276
except socket.error as e:
@@ -329,7 +287,7 @@ def __listen(self):
329287
# TODO(cgs): why do we need to do this string voodoo?
330288
payload = "".join([chr(x) for x in msg])
331289
self.__packet_callback(req_id, flags, err, payload)
332-
290+
333291

334292
class JdwpSpec(object):
335293
def __init__(self, version, id_sizes):

pyjdb/pyjdwp_test.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import tempfile
99
import time
1010
import unittest
11+
import Queue
1112
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG)
1213

1314

@@ -119,15 +120,22 @@ def resume_and_await_class_load(self, class_name, suspend_policy = None):
119120
"modifiers": [{
120121
"modKind": 5,
121122
"classPattern": class_name}]})
122-
self.jdwp.VirtualMachine.Resume()
123123
def matcher(event_data):
124124
for event in event_data["events"]:
125125
if event["eventKind"] == self.jdwp.EventKind.CLASS_PREPARE:
126126
if event["ClassPrepare"]["signature"] == signature:
127127
return True
128128
return False
129-
test_class_prepare_event = self.jdwp.await_event(matcher)
130-
return test_class_prepare_event["events"][0]["ClassPrepare"]
129+
130+
found_events = Queue.Queue()
131+
def callback(event, found=found_events):
132+
if matcher(event):
133+
found.put(event)
134+
self.jdwp.register_event_callback(callback)
135+
self.jdwp.VirtualMachine.Resume()
136+
event = found_events.get()
137+
self.jdwp.unregister_event_callback(callback)
138+
return event["events"][0]["ClassPrepare"]
131139

132140
def set_breakpoint_in_method(self, class_name, method_name):
133141
self.resume_and_await_class_load(class_name, self.jdwp.SuspendPolicy.ALL)
@@ -157,9 +165,17 @@ def matcher(event_raw):
157165
for event in event["events"]:
158166
if event["eventKind"] == self.jdwp.EventKind.BREAKPOINT:
159167
return True
168+
found_events = Queue.Queue()
169+
def callback(event, found=found_events):
170+
if matcher(event):
171+
found.put(event)
172+
self.jdwp.register_event_callback(callback)
173+
160174
self.jdwp.VirtualMachine.Resume()
161-
breakpoint_events = self.jdwp.await_event(matcher)
162-
return breakpoint_events["events"][0]["Breakpoint"]
175+
event = found_events.get()
176+
177+
self.jdwp.unregister_event_callback(callback)
178+
return event["events"][0]["Breakpoint"]
163179

164180
def set_breakpoint_in_main(self, main_class_name):
165181
return self.set_breakpoint_in_method(main_class_name, "main")

0 commit comments

Comments
 (0)