Skip to content

Commit

Permalink
Add support for python3.11 (faust-streaming#366)
Browse files Browse the repository at this point in the history
* add support for python3.11

* update to setup-python v4

* add 3.11 to tox.ini

* bump to 0.9.2 in preparation of next release

* fix dumb merge

* lint everything

* set aiohttp minimum to 3.8.3 and mode-streaming minimum to 0.3.0

* add removed test classes from mode into tests.helpers

* fix streams and topics tests

* just add rc0 i stopping caring lol

* add forgotten defs

* fix imports

* fix more dumb imports

* just import AsyncMock from tests.helpers for now

* add more checks for 3.10 and 3.11

* fix typo

* add 3.11 to envlist

* include custom Mock class to fix this absurd test

* fix asyncmock import

* remove unneeded import

* fix import

* fix import

* neverending import issues

* too many conftests

* fix test_replies so it doesnt hang anymore

* fix cache tests

* coro be gone

* add AsyncMock to __all__

* remove call.coro since deprecated behavior

* test_worker.py passes now

* basic fix for test agent

* fix test_agent.py

* update test_base.py

* fix more tests

* keep trying...

* remove loop kwarg due to deprecation in 0.8.0

* more remaining tests as needs fixing

* fix formatting

* fix formatting... again

* fix imports in test_events.py

* fix AsyncMock imports

* please let this be the last import fix

* change echoing function in streams.py for py 3.11 compatibility

* ensure futures for test_replies.py

* ensure table recovery futures

* ensure futures for all echo cors

* set aiokafka 0.8.0 as minimum

* apparently loops need to be passed

* i forgot to fix this lol

* this never ends ugh

* use internal cchardet library from hereon

* update 3.7 basepython

* hardcode flake8 to 5.0.4 for now

* update mode-streaming to >=0.3.0 and ignore rc's

* Start testing docs builds and only deploy on release
  • Loading branch information
wbarnha authored Nov 29, 2022
1 parent ca2e599 commit cf397bc
Show file tree
Hide file tree
Showing 76 changed files with 559 additions and 246 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dist.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- uses: actions/checkout@v3

- name: Build wheels
uses: pypa/cibuildwheel@v2.8.1
uses: pypa/cibuildwheel@v2.10.1
env:
CIBW_MANYLINUX_X86_64_IMAGE: 'manylinux2014'
CIBW_ARCHS: auto64
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/gh-pages.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
name: Pages

on:
# Only run when release is created in the master branch
release:
types: [created]
branches:
- 'master'
push:
branches: ["master"]
pull_request:
branches: ["master"]

jobs:
build:
Expand All @@ -27,6 +26,7 @@ jobs:
path: "./Documentation"
deploy:
name: "Deploy docs"
if: github.event_name == 'release' && github.event.action == 'published'
needs: build
runs-on: ubuntu-latest
# Grant GITHUB_TOKEN the permissions required to make a Pages deployment
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
# for example if a test fails only when Cython is enabled
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
use-cython: ["true", "false"]
env:
USE_CYTHON: ${{ matrix.use-cython }}
Expand All @@ -29,7 +29,7 @@ jobs:
with:
# You need to include this or setuptools_scm in GitHub runners won't detect the version
fetch-depth: 0
- uses: "actions/setup-python@v2"
- uses: "actions/setup-python@v4"
with:
python-version: "${{ matrix.python-version }}"
- name: "Install dependencies"
Expand Down
5 changes: 4 additions & 1 deletion faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,10 @@ def echo(self, *channels: Union[str, ChannelT]) -> StreamT:

async def echoing(value: T) -> T:
await asyncio.wait(
[maybe_forward(value, channel) for channel in _channels],
[
asyncio.ensure_future(maybe_forward(value, channel))
for channel in _channels
],
return_when=asyncio.ALL_COMPLETED,
)
return value
Expand Down
8 changes: 5 additions & 3 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,11 @@ async def on_recovery_completed(self, generation_id: int = 0) -> None:
self._set_recovery_ended()
# This needs to happen if all goes well
callback_coros = [
table.on_recovery_completed(
self.actives_for_table[table],
self.standbys_for_table[table],
asyncio.ensure_future(
table.on_recovery_completed(
self.actives_for_table[table],
self.standbys_for_table[table],
)
)
for table in self.tables.values()
]
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def __init__(
self.not_waiting_next_records = Event()
self.not_waiting_next_records.set()
self._reset_state()
super().__init__(loop=loop or self.transport.loop, **kwargs)
super().__init__(**kwargs)
self.transactions = self.transport.create_transaction_manager(
consumer=self,
producer=self.app.producer,
Expand Down
11 changes: 4 additions & 7 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ def __init__(
) -> None:
super().__init__(
executor=executor,
loop=loop,
thread_loop=thread_loop,
Worker=Worker,
**kwargs,
Expand Down Expand Up @@ -488,12 +487,12 @@ def _create_consumer(
) -> aiokafka.AIOKafkaConsumer:
transport = cast(Transport, self.transport)
if self.app.client_only:
return self._create_client_consumer(transport, loop=loop)
return self._create_client_consumer(transport)
else:
return self._create_worker_consumer(transport, loop=loop)
return self._create_worker_consumer(transport)

def _create_worker_consumer(
self, transport: "Transport", loop: asyncio.AbstractEventLoop
self, transport: "Transport"
) -> aiokafka.AIOKafkaConsumer:
isolation_level: str = "read_uncommitted"
conf = self.app.conf
Expand Down Expand Up @@ -521,7 +520,6 @@ def _create_worker_consumer(
)

return aiokafka.AIOKafkaConsumer(
loop=loop,
api_version=conf.consumer_api_version,
client_id=conf.broker_client_id,
group_id=conf.id,
Expand Down Expand Up @@ -551,15 +549,14 @@ def _create_worker_consumer(
)

def _create_client_consumer(
self, transport: "Transport", loop: asyncio.AbstractEventLoop
self, transport: "Transport"
) -> aiokafka.AIOKafkaConsumer:
conf = self.app.conf
auth_settings = credentials_to_aiokafka_auth(
conf.broker_credentials, conf.ssl_context
)
max_poll_interval = conf.broker_max_poll_interval or 0
return aiokafka.AIOKafkaConsumer(
loop=loop,
client_id=conf.broker_client_id,
bootstrap_servers=server_list(transport.url, transport.default_port),
request_timeout_ms=int(conf.broker_request_timeout * 1000.0),
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def __init__(
self.partitioner = conf.producer_partitioner
api_version = self._api_version = conf.producer_api_version
assert api_version is not None
super().__init__(loop=loop or self.transport.loop, **kwargs)
super().__init__(**kwargs)
self.buffer = ProducerBuffer(loop=self.loop, beacon=self.beacon)
if conf.producer_threaded:
self.threaded_producer = self.create_threaded_producer()
Expand Down
2 changes: 1 addition & 1 deletion requirements/extras/cchardet.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cchardet>=2.1
faust-cchardet
6 changes: 3 additions & 3 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
aiohttp>=3.5.2,<4.0
aiohttp>=3.8.3,<4.0
aiohttp_cors>=0.7,<2.0
aiokafka>=0.7.1,<0.9.0
aiokafka>=0.8.0,<0.9.0
click>=6.7,<8.2
mode-streaming>=0.2.0,<0.3.0
mode-streaming>=0.3.0
opentracing>=1.3.0,<=2.4.0
terminaltables>=3.1,<4.0
yarl>=1.0,<2.0
Expand Down
3 changes: 2 additions & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
black
isort
autoflake
flake8
flake8==5.0.4 # 5.0.4 can be upgraded to 6.0.0+ when we EOL Python 3.7
flake8-bugbear
flake8-comprehensions
hypothesis>=3.31
freezegun>=0.3.11
mock; python_version<'3.8' # backport that contains AsyncMock class, includes mock library as dependency
pre-commit
pytest-aiofiles>=0.2.0
pytest-aiohttp>=0.3.0
Expand Down
6 changes: 4 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import time
from http import HTTPStatus
from typing import Any, NamedTuple, Optional
from unittest.mock import MagicMock, Mock, patch

import pytest
from _pytest.assertion.util import _compare_eq_dict, _compare_eq_set
from aiohttp.client import ClientError, ClientSession
from aiohttp.web import Response
from mode.utils.futures import all_tasks
from mode.utils.mocks import AsyncContextManagerMock, AsyncMock, MagicMock, Mock, patch

from tests.helpers import AsyncContextManagerMock, AsyncMock

sentinel = object()

Expand Down Expand Up @@ -131,7 +133,7 @@ def raise_for_status():
if 400 <= options.status_code:
raise ClientError()

response = AsyncMock(
response = Mock(
autospec=Response,
text=AsyncMock(return_value=options.text),
read=AsyncMock(return_value=options.text),
Expand Down
3 changes: 2 additions & 1 deletion tests/functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import os
from copy import copy
from typing import IO, Dict, NamedTuple, Union
from unittest.mock import Mock

import pytest
from mode.utils.logging import setup_logging
from mode.utils.mocks import AsyncMock, Mock

import faust
from faust.utils.tracing import set_current_span
from faust.web.cache.backends.memory import CacheStorage
from tests.helpers import AsyncMock


class AppMarks(NamedTuple):
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/serializers/test_registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import typing
from decimal import Decimal
from unittest.mock import Mock

import pytest
from mode.utils.mocks import Mock

import faust
from faust.exceptions import KeyDecodeError, ValueDecodeError
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import sys
from pathlib import Path
from typing import Any, Mapping, NamedTuple
from unittest.mock import patch

import mode
import pytest
import pytz
from mode.supervisors import OneForAllSupervisor
from mode.utils.mocks import patch
from yarl import URL

import faust
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_channels.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
from unittest.mock import Mock

import pytest
from mode import label
from mode.utils.aiter import aiter, anext
from mode.utils.mocks import Mock
from mode.utils.queues import FlowControlQueue

import faust
Expand Down
3 changes: 2 additions & 1 deletion tests/functional/test_streams.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
from copy import copy
from unittest.mock import Mock

import pytest
from mode import label
from mode.utils.aiter import aiter, anext
from mode.utils.mocks import AsyncMock, Mock

import faust
from faust.exceptions import ImproperlyConfigured
from faust.streams import maybe_forward
from tests.helpers import AsyncMock

from .helpers import channel_empty, message, put

Expand Down
3 changes: 2 additions & 1 deletion tests/functional/web/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from unittest.mock import Mock

import pytest
from mode.utils.mocks import Mock

from faust.exceptions import SameNode

Expand Down
10 changes: 5 additions & 5 deletions tests/functional/web/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import aredis
import pytest
from mode.utils.mocks import Mock

import faust
from faust.exceptions import ImproperlyConfigured
from faust.web import Blueprint, View
from faust.web.cache import backends
from faust.web.cache.backends import redis
from tests.helpers import Mock

DEFAULT_TIMEOUT = 361.363
VIEW_B_TIMEOUT = 64.3
Expand Down Expand Up @@ -364,7 +364,7 @@ async def test_redis__start_twice_same_client(*, app, mocked_redis):
async def test_redis_get__irrecoverable_errors(*, app, mocked_redis):
from aredis.exceptions import AuthenticationError

mocked_redis.return_value.get.coro.side_effect = AuthenticationError()
mocked_redis.return_value.get.side_effect = AuthenticationError()

with pytest.raises(app.cache.Unavailable):
async with app.cache:
Expand All @@ -386,10 +386,10 @@ async def test_redis_invalidating_error(operation, delete_error, *, app, mocked_
from aredis.exceptions import DataError

mocked_op = getattr(mocked_redis.return_value, operation)
mocked_op.coro.side_effect = DataError()
mocked_op.side_effect = DataError()
if delete_error:
# then the delete fails
mocked_redis.return_value.delete.coro.side_effect = DataError()
mocked_redis.return_value.delete.side_effect = DataError()

with pytest.raises(app.cache.Unavailable):
async with app.cache:
Expand All @@ -416,7 +416,7 @@ async def test_memory_delete(*, app):
async def test_redis_get__operational_error(*, app, mocked_redis):
from aredis.exceptions import TimeoutError

mocked_redis.return_value.get.coro.side_effect = TimeoutError()
mocked_redis.return_value.get.side_effect = TimeoutError()

with pytest.raises(app.cache.Unavailable):
async with app.cache:
Expand Down
Loading

0 comments on commit cf397bc

Please sign in to comment.