Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'cannot reuse already awaited coroutine' with timed_window on Python 3.7.5 #284

Open
balajirrao opened this issue Nov 29, 2019 · 0 comments

Comments

@balajirrao
Copy link

Using timed_window in Python 3.7.5 can sometimes lead to a coroutine being awaited more than once.

Here's a sample program:

async def my_sink(x):
    print(x)
    await sleep(2)
    # raise Exception("Blah!")


async def main():
    source = Stream(asynchronous=True)
    source.timed_window(interval=1).sink(my_sink)

    for x in range(100):
        await source.emit(x)
        await sleep(0.2)


if __name__ == "__main__":
    run(main(), debug=True)

I narrowed down the reason to the same coroutine being possibly returned more than once in https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L917.

A way to work around this issue is to create a task that runs the coroutine. Modifying the update method of the Sink class to return a task fixes the issue. The tests in tests/core.py continue pass.

diff --git a/streamz/core.py b/streamz/core.py
index fe588ed..6916f19 100644
--- a/streamz/core.py
+++ b/streamz/core.py
@@ -535,7 +535,7 @@ class sink(Stream):
     def update(self, x, who=None):
         result = self.func(x, *self.args, **self.kwargs)
         if gen.isawaitable(result):
-            return result
+            return gen.convert_yielded(result)
         else:
             return []

I'm quite sure this can break something else. What am I missing?

Also, how can we go about writing a test case that demonstrates the issue?

@balajirrao balajirrao changed the title 'cannot reuse already awaited coroutine' error with timed_window on Python 3.7.5 'cannot reuse already awaited coroutine' with timed_window on Python 3.7.5 Nov 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant