Skip to content

Commit

Permalink
handle running loops in jupyter
Browse files Browse the repository at this point in the history
  • Loading branch information
timkpaine committed May 10, 2023
1 parent 74ad81d commit 917d1be
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
3 changes: 3 additions & 0 deletions tributary/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def __eq__(self, other):
__nonzero__ = all_un_ops
__len__ = all_un_ops

def __repr__(self) -> str:
return self.__class__.__name__


class StreamEnd(BaseNodeValue):
"""Indicates that a stream has nothing left in it"""
Expand Down
14 changes: 11 additions & 3 deletions tributary/streaming/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ..base import StreamEnd, StreamNone, StreamRepeat, TributaryException # noqa: F401


nest_asyncio.apply()
# nest_asyncio.apply()


class StreamingGraph(object):
Expand Down Expand Up @@ -90,18 +90,26 @@ def run(self, blocking=True, newloop=False, start=True):

else:
if newloop:
# create a new loop
loop = asyncio.new_event_loop()

else:
# get the current loop
loop = asyncio.get_event_loop()

asyncio.set_event_loop(loop)

task = loop.create_task(self._run())

if blocking:
# block until done
# if loop is already running, make reentrant
try:
if loop.is_running():

async def wait(task):
await task

return asyncio.run_coroutine_threadsafe(wait(task), loop)
# block until done
return loop.run_until_complete(task)
except KeyboardInterrupt:
return
Expand Down

0 comments on commit 917d1be

Please sign in to comment.