Skip to content

Commit 810ded1

Browse files
author
Chris Rossi
authored
fix: fix race condition in remote calls (#329)
The wrapper, `_remote.RemoteCall`, added to help attach debugging into to gRPC calls, inadvertantly introduced a race condition between the gRPC thread and the NDB thread, where the gRPC thread might call `RemoteCall._finish` *during* a call to `RemoteCall.add_done_callback`. The solution, here, is to turn `RemoteCall.add_done_callback` into a direct pass-through to `grpc.Future.add_done_callback` on the wrapped future. The callback which is eventually executed in the gRPC thread, only pushes the finished RPC onto a `queue.Queue` which is eventually consumed by the event loop running in the NDB thread. Fixes #302.
1 parent 30d15da commit 810ded1

File tree

1 file changed

+8
-17
lines changed
  • packages/google-cloud-ndb/google/cloud/ndb

1 file changed

+8
-17
lines changed

packages/google-cloud-ndb/google/cloud/ndb/_remote.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,12 @@ class RemoteCall(object):
3939
def __init__(self, future, info):
4040
self.future = future
4141
self.info = info
42-
self._callbacks = []
43-
44-
future.add_done_callback(self._finish)
4542

4643
def __repr__(self):
4744
return self.info
4845

4946
def exception(self):
50-
"""Calls :meth:`grpc.Future.exception` on attr:`future`."""
47+
"""Calls :meth:`grpc.Future.exception` on :attr:`future`."""
5148
# GRPC will actually raise FutureCancelledError.
5249
# We'll translate that to our own Cancelled exception and *return* it,
5350
# which is far more polite for a method that *returns exceptions*.
@@ -57,7 +54,7 @@ def exception(self):
5754
return exceptions.Cancelled()
5855

5956
def result(self):
60-
"""Calls :meth:`grpc.Future.result` on attr:`future`."""
57+
"""Calls :meth:`grpc.Future.result` on :attr:`future`."""
6158
return self.future.result()
6259

6360
def add_done_callback(self, callback):
@@ -67,19 +64,13 @@ def add_done_callback(self, callback):
6764
Args:
6865
callback (Callable): The function to execute.
6966
"""
70-
if self.future.done():
71-
callback(self)
72-
else:
73-
self._callbacks.append(callback)
67+
remote = self
68+
69+
def wrapper(rpc):
70+
return callback(remote)
71+
72+
self.future.add_done_callback(wrapper)
7473

7574
def cancel(self):
7675
"""Calls :meth:`grpc.Future.cancel` on attr:`cancel`."""
7776
return self.future.cancel()
78-
79-
def _finish(self, rpc):
80-
"""Called when remote future is finished.
81-
82-
Used to call our own done callbacks.
83-
"""
84-
for callback in self._callbacks:
85-
callback(self)

0 commit comments

Comments
 (0)