Skip to content

Commit 8e791ca

Browse files
author
Chris Rossi
authored
EventLoop and tasklets refactor. (#73)
It was found, through working query iterators, that when a TaskletFuture calls ``_advance_tasklet`` in its done callback, if it calls it directly, we can get a really deep call stack that eventually reaches the limit for maximum recursion. This refactors that to add the ``_advance_tasklet`` call to the eventloop to be run soon, rather than calling it directly, which fixes this issue by keeping the call stack shallow. The legacy NDB code was actually already using this indirection and I had changed it to a direct call because I couldn't figure out why the legacy code didn't just do that, already. I knew there was a chance that the reason would reveal itself through trial and error, and it eventually did. I also performed a minor refactor on the eventloop itself, adding the ``call_soon`` method to handle the case that was previously handled by passing ``None`` for the ``delay`` argument to ``queue_call``. This just makes that API a little bit cleaner.
1 parent 23c46cf commit 8e791ca

File tree

5 files changed

+61
-42
lines changed

5 files changed

+61
-42
lines changed

packages/google-cloud-ndb/src/google/cloud/ndb/_eventloop.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
__all__ = [
2727
"add_idle",
28+
"call_soon",
2829
"EventLoop",
2930
"get_event_loop",
3031
"queue_call",
@@ -80,25 +81,14 @@ class EventLoop:
8081
other futures were waiting on those results and results derived from those
8182
results.
8283
83-
This is somewhat of a work in progress. Initially this was ported (cargo
84-
culted) from legacy NDB without a clear understanding of how all the pieces
85-
would fit together or what all the different features were actually for. As
86-
we've been forced to do some things a little differently with the rewrite,
87-
it's not entirely clear that all of the features here have a purpose in the
88-
rewrite, but it's still early to say definitively.
89-
9084
Currently, these are the seperate queues used by the event loop in the
9185
order they are checked by :meth:`~EventLoop.run1`. For each call to
9286
:meth:`~EventLoop.run1`, the first thing it finds is called:
9387
94-
current: These callbacks are called first, if there are any. In legacy
95-
NDB, these were used by tasklets to queue calls to
96-
``_help_tasklet_along`` when a result from a yielded future was
97-
ready. With the rewrite, I haven't seen any reason not to just go
98-
ahead and call :meth:`~tasklets.TaskletFuture._advance_tasklet`
99-
immediately when a result is available. If a good reason becomes
100-
apparent in the course of the rewrite, this is subject to change.
101-
Currently, nothing uses this.
88+
current: These callbacks are called first, if there are any. Currently
89+
this is used to schedule calls to
90+
:meth:`tasklets.TaskletFuture._advance_tasklet` when it's time to
91+
send a tasklet a value that it was previously waiting on.
10292
10393
idlers: Effectively, these are the same as ``current``, but just get
10494
called afterwards. These currently are used for batching certain
@@ -113,8 +103,7 @@ class EventLoop:
113103
time.
114104
115105
queue: These are callbacks that are supposed to be run at (or after) a
116-
certain time. Nothing uses these currently. It's not clear, yet,
117-
what the use case was in legacy NDB.
106+
certain time. This is used by :function:`tasklets.sleep`.
118107
119108
rpcs: If all other queues are empty, and we are waiting on results of a
120109
gRPC call, then we'll call :method:`queue.Queue.get` on the
@@ -125,7 +114,8 @@ class EventLoop:
125114
126115
Atrributes:
127116
current (deque): a FIFO list of (callback, args, kwds). These callbacks
128-
run immediately when the eventloop runs. Not currently used.
117+
run immediately when the eventloop runs. Used by tasklets to
118+
schedule calls to :meth:`tasklets.TaskletFuture._advance_tasklet`.
129119
idlers (deque): a FIFO list of (callback, args, kwds). Thes callbacks
130120
run only when no other RPCs need to be fired first. Used for
131121
batching calls to the Datastore back end.
@@ -205,6 +195,16 @@ def insort_event_right(self, event):
205195
low = mid + 1
206196
queue.insert(low, event)
207197

198+
def call_soon(self, callback, *args, **kwargs):
199+
"""Schedule a function to be called soon, without a delay.
200+
201+
Arguments:
202+
callback (callable): The function to eventually call.
203+
*args: Positional arguments to be passed to callback.
204+
**kwargs: Keyword arguments to be passed to callback.
205+
"""
206+
self.current.append((callback, args, kwargs))
207+
208208
def queue_call(self, delay, callback, *args, **kwargs):
209209
"""Schedule a function call at a specific time in the future.
210210
@@ -216,10 +216,6 @@ def queue_call(self, delay, callback, *args, **kwargs):
216216
*args: Positional arguments to be passed to callback.
217217
**kwargs: Keyword arguments to be passed to callback.
218218
"""
219-
if delay is None:
220-
self.current.append((callback, args, kwargs))
221-
return
222-
223219
when = time.time() + delay if delay < 1e9 else delay
224220
event = _Event(when, callback, args, kwargs)
225221
self.insort_event_right(event)
@@ -379,6 +375,12 @@ def add_idle(callback, *args, **kwargs):
379375
loop.add_idle(callback, *args, **kwargs)
380376

381377

378+
def call_soon(callback, *args, **kwargs):
379+
"""Calls :method:`EventLoop.call_soon` on current event loop. """
380+
loop = get_event_loop()
381+
loop.call_soon(callback, *args, **kwargs)
382+
383+
382384
def queue_call(delay, callback, *args, **kwargs):
383385
"""Calls :method:`EventLoop.queue_call` on current event loop. """
384386
loop = get_event_loop()

packages/google-cloud-ndb/src/google/cloud/ndb/tasklets.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ class _TaskletFuture(Future):
234234
completed and either returned a value or raised an exception.
235235
236236
Args:
237-
typing.Generator[Union[tasklets.Future, _remote.RemoteCall], Any, Any]: The
238-
generator.
237+
typing.Generator[Union[tasklets.Future, _remote.RemoteCall], Any, Any]:
238+
The generator.
239239
"""
240240

241241
def __init__(self, generator, context, info="Unknown"):
@@ -276,17 +276,15 @@ def done_callback(yielded):
276276
# To be called when a future dependency has completed. Advance the
277277
# tasklet with the yielded value or error.
278278
#
279-
# It might be worth noting that legacy NDB added a callback to the
280-
# event loop which, in turn, called _help_tasklet_along. I don't
281-
# see a compelling reason not to go ahead and call _advance_tasklet
282-
# immediately here, rather than queue it up to be called soon by
283-
# the event loop. This is subject to change if the reason for the
284-
# indirection in the original implementation becomes apparent.
279+
# It was tempting to call `_advance_tasklet` (`_help_tasklet_along`
280+
# in Legacy) directly. Doing so, it has been found, can lead to
281+
# exceeding the maximum recursion depth. Queing it up to run on the
282+
# event loop avoids this issue by keeping the call stack shallow.
285283
error = yielded.exception()
286284
if error:
287-
self._advance_tasklet(error=error)
285+
_eventloop.call_soon(self._advance_tasklet, error=error)
288286
else:
289-
self._advance_tasklet(yielded.result())
287+
_eventloop.call_soon(self._advance_tasklet, yielded.result())
290288

291289
if isinstance(yielded, Future):
292290
yielded.add_done_callback(done_callback)

packages/google-cloud-ndb/tests/conftest.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,18 @@
2424

2525
from google.cloud import environment_vars
2626
from google.cloud.ndb import context as context_module
27+
from google.cloud.ndb import _eventloop
2728
from google.cloud.ndb import model
2829

2930
import pytest
3031

3132

33+
class TestingEventLoop(_eventloop.EventLoop):
34+
def call_soon(self, callback, *args, **kwargs):
35+
"""For testing, call the callback immediately."""
36+
callback(*args, **kwargs)
37+
38+
3239
@pytest.fixture(autouse=True)
3340
def reset_state(environ):
3441
"""Reset module and class level runtime state.
@@ -76,7 +83,9 @@ def context():
7683
client = mock.Mock(
7784
project="testing", namespace=None, spec=("project", "namespace")
7885
)
79-
context = context_module.Context(client, stub=mock.Mock(spec=()))
86+
context = context_module.Context(
87+
client, stub=mock.Mock(spec=()), eventloop=TestingEventLoop()
88+
)
8089
return context
8190

8291

packages/google-cloud-ndb/tests/unit/test__datastore_api.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,8 @@ def test_commit(datastore_commit, process_commit, in_context):
764764
rpc = tasklets.Future("_datastore_commit")
765765
datastore_commit.return_value = rpc
766766

767-
eventloop = mock.Mock(spec=("queue_rpc", "run"))
767+
eventloop = mock.Mock(spec=("queue_rpc", "run", "call_soon"))
768+
eventloop.call_soon = lambda f, *args, **kwargs: f(*args, **kwargs)
768769
with in_context.new(eventloop=eventloop).use():
769770
future = batch.commit()
770771

@@ -788,7 +789,8 @@ def test_commit_error(datastore_commit, process_commit, in_context):
788789
rpc = tasklets.Future("_datastore_commit")
789790
datastore_commit.return_value = rpc
790791

791-
eventloop = mock.Mock(spec=("queue_rpc", "run"))
792+
eventloop = mock.Mock(spec=("queue_rpc", "run", "call_soon"))
793+
eventloop.call_soon = lambda f, *args, **kwargs: f(*args, **kwargs)
792794
with in_context.new(eventloop=eventloop).use():
793795
future = batch.commit()
794796

@@ -824,7 +826,8 @@ def test_commit_allocating_ids(
824826
rpc = tasklets.Future("_datastore_commit")
825827
datastore_commit.return_value = rpc
826828

827-
eventloop = mock.Mock(spec=("queue_rpc", "run"))
829+
eventloop = mock.Mock(spec=("queue_rpc", "run", "call_soon"))
830+
eventloop.call_soon = lambda f, *args, **kwargs: f(*args, **kwargs)
828831
with in_context.new(eventloop=eventloop).use():
829832
future = batch.commit()
830833

packages/google-cloud-ndb/tests/unit/test__eventloop.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,14 @@ def test_insert_event_right_collision(self):
122122
_Event(2, "baz"),
123123
]
124124

125-
def test_queue_call_now(self):
125+
def test_call_soon(self):
126126
loop = self._make_one()
127-
loop.queue_call(None, "foo", "bar", baz="qux")
127+
loop.call_soon("foo", "bar", baz="qux")
128128
assert list(loop.current) == [("foo", ("bar",), {"baz": "qux"})]
129129
assert not loop.queue
130130

131131
@unittest.mock.patch("google.cloud.ndb._eventloop.time")
132-
def test_queue_call_soon(self, time):
132+
def test_queue_call_delay(self, time):
133133
loop = self._make_one()
134134
time.time.return_value = 5
135135
loop.queue_call(5, "foo", "bar", baz="qux")
@@ -214,7 +214,7 @@ def test_run0_nothing_to_do(self):
214214
def test_run0_current(self):
215215
callback = unittest.mock.Mock(__name__="callback")
216216
loop = self._make_one()
217-
loop.queue_call(None, callback, "foo", bar="baz")
217+
loop.call_soon(callback, "foo", bar="baz")
218218
loop.inactive = 88
219219
assert loop.run0() == 0
220220
callback.assert_called_once_with("foo", bar="baz")
@@ -275,7 +275,7 @@ def test_run1_nothing_to_do(self):
275275
def test_run1_has_work_now(self, time):
276276
callback = unittest.mock.Mock(__name__="callback")
277277
loop = self._make_one()
278-
loop.queue_call(None, callback)
278+
loop.call_soon(callback)
279279
assert loop.run1() is True
280280
time.sleep.assert_not_called()
281281
callback.assert_called_once_with()
@@ -304,7 +304,7 @@ def mock_sleep(seconds):
304304
runlater = unittest.mock.Mock(__name__="runlater")
305305
loop = self._make_one()
306306
loop.add_idle(idler)
307-
loop.queue_call(None, runnow)
307+
loop.call_soon(runnow)
308308
loop.queue_call(5, runlater)
309309
loop.run()
310310
idler.assert_called_once_with()
@@ -328,6 +328,13 @@ def test_add_idle(context):
328328
loop.add_idle.assert_called_once_with("foo", "bar", baz="qux")
329329

330330

331+
def test_call_soon(context):
332+
loop = unittest.mock.Mock(spec=("run", "call_soon"))
333+
with context.new(eventloop=loop).use():
334+
_eventloop.call_soon("foo", "bar", baz="qux")
335+
loop.call_soon.assert_called_once_with("foo", "bar", baz="qux")
336+
337+
331338
def test_queue_call(context):
332339
loop = unittest.mock.Mock(spec=("run", "queue_call"))
333340
with context.new(eventloop=loop).use():

0 commit comments

Comments
 (0)