Skip to content

Commit

Permalink
Add get_event_loop_policy() to all get_event_loop() calls (faust-stre…
Browse files Browse the repository at this point in the history
…aming#415)

* add support for python3.11

* update to setup-python v4

* add 3.11 to tox.ini

* bump to 0.9.2 in preparation of next release

* fix dumb merge

* lint everything

* set aiohttp minimum to 3.8.3 and mode-streaming minimum to 0.3.0

* add removed test classes from mode into tests.helpers

* fix streams and topics tests

* just add rc0 i stopping caring lol

* add forgotten defs

* fix imports

* fix more dumb imports

* just import AsyncMock from tests.helpers for now

* add more checks for 3.10 and 3.11

* fix typo

* add 3.11 to envlist

* include custom Mock class to fix this absurd test

* fix asyncmock import

* remove unneeded import

* fix import

* fix import

* neverending import issues

* too many conftests

* fix test_replies so it doesnt hang anymore

* fix cache tests

* coro be gone

* add AsyncMock to __all__

* remove call.coro since deprecated behavior

* test_worker.py passes now

* basic fix for test agent

* fix test_agent.py

* update test_base.py

* fix more tests

* keep trying...

* Add get_event_loop_policy() to all get_event_loop() calls

* remove loop kwarg due to deprecation in 0.8.0

* more remaining tests as needs fixing

* fix formatting

* fix formatting... again

* fix imports in test_events.py

* fix AsyncMock imports

* please let this be the last import fix

* change echoing function in streams.py for py 3.11 compatibility

* ensure futures for test_replies.py

* ensure table recovery futures

* ensure futures for all echo cors
  • Loading branch information
wbarnha authored Nov 29, 2022
1 parent cf397bc commit 7c7fa84
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions faust/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ def __call__(self, *args: Any, **kwargs: Any) -> NoReturn:

def run_using_worker(self, *args: Any, **kwargs: Any) -> NoReturn:
"""Execute command using :class:`faust.Worker`."""
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop_policy().get_event_loop()
args = self.args + args
kwargs = {**self.kwargs, **kwargs}
service = self.as_service(loop, *args, **kwargs)
Expand All @@ -640,7 +640,7 @@ def as_service(
return Service.from_awaitable(
self.execute(*args, **kwargs),
name=type(self).__name__,
loop=loop or asyncio.get_event_loop(),
loop=loop or asyncio.get_event_loop_policy().get_event_loop(),
)

def worker_for_service(
Expand All @@ -659,7 +659,7 @@ def worker_for_service(
console_port=self.console_port,
redirect_stdouts=self.redirect_stdouts or False,
redirect_stdouts_level=self.redirect_stdouts_level,
loop=loop or asyncio.get_event_loop(),
loop=loop or asyncio.get_event_loop_policy().get_event_loop(),
daemon=self.daemon,
)

Expand Down
2 changes: 1 addition & 1 deletion faust/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(
) -> None:
self.url = url
self.app = app
self.loop = loop or asyncio.get_event_loop()
self.loop = loop or asyncio.get_event_loop_policy().get_event_loop()

def create_consumer(self, callback: ConsumerCallback, **kwargs: Any) -> ConsumerT:
"""Create new consumer."""
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,7 @@ async def _create_topic(
topic,
partitions,
replication,
loop=asyncio.get_event_loop(),
loop=asyncio.get_event_loop_policy().get_event_loop(),
**kwargs,
)
try:
Expand Down
2 changes: 1 addition & 1 deletion faust/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async def start_worker(worker: Worker) -> None:
await worker.start()
def manage_loop():
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop_policy().get_event_loop()
worker = Worker(app, loop=loop)
try:
loop.run_until_complete(start_worker(worker)
Expand Down

0 comments on commit 7c7fa84

Please sign in to comment.