diff --git a/extra/tools/verify_doc_defaults.py b/extra/tools/verify_doc_defaults.py index 84f87266a..4385ed1f5 100644 --- a/extra/tools/verify_doc_defaults.py +++ b/extra/tools/verify_doc_defaults.py @@ -15,36 +15,36 @@ from yarl import URL -SETTINGS: Path = Path('docs/userguide/settings.rst') +SETTINGS: Path = Path("docs/userguide/settings.rst") -app = faust.App('verify_defaults') +app = faust.App("verify_defaults") ignore_settings: Set[str] = { - 'id', - 'tabledir', - 'reply_to', - 'broker_consumer', - 'broker_producer', + "id", + "tabledir", + "reply_to", + "broker_consumer", + "broker_producer", } builtin_locals: Dict[str, Any] = { - 'aiohttp': aiohttp, - 'app': app, - 'datetime': datetime, - 'datadir': app.conf.datadir, - 'faust': faust, - 'logging': logging, - 'mode': mode, - 'socket': socket, - 'timedelta': timedelta, - 'web_host': socket.gethostname(), - 'web_port': 6066, - 'VERSION': faust.__version__, - 'uuid': uuid, - 'URL': URL, + "aiohttp": aiohttp, + "app": app, + "datetime": datetime, + "datadir": app.conf.datadir, + "faust": faust, + "logging": logging, + "mode": mode, + "socket": socket, + "timedelta": timedelta, + "web_host": socket.gethostname(), + "web_port": 6066, + "VERSION": faust.__version__, + "uuid": uuid, + "URL": URL, } -RE_REF = re.compile(r'^:(\w+):`') +RE_REF = re.compile(r"^:(\w+):`") class Error(NamedTuple): @@ -63,7 +63,7 @@ def verify_settings(rst_path: Path) -> Iterator[Error]: actual = actual.value if actual != default: yield Error( - reason='mismatch', + reason="mismatch", setting=setting_name, default=default, actual=actual, @@ -74,14 +74,14 @@ def report_errors(errors: Iterator[Error]) -> int: num_errors: int = 0 for num_errors, e in enumerate(errors, start=1): if num_errors == 1: - carp(f'{sys.argv[0]}: Errors in docs/userguide/settings.rst:') - carp(f' + Setting {e.reason} {e.setting}:') - carp(f' documentation: {e.default!r}') - carp(f' actual: {e.actual!r}') + carp(f"{sys.argv[0]}: Errors in docs/userguide/settings.rst:") + carp(f" + Setting {e.reason} {e.setting}:") + carp(f" documentation: {e.default!r}") + carp(f" actual: {e.actual!r}") if num_errors: - carp(f'Found {num_errors} error(s).', file=sys.stderr) + carp(f"Found {num_errors} error(s).", file=sys.stderr) else: - print(f'{sys.argv[0]}: All OK :-)', file=sys.stdout) + print(f"{sys.argv[0]}: All OK :-)", file=sys.stdout) return num_errors @@ -89,38 +89,39 @@ def carp(msg, *, file: IO = sys.stderr, **kwargs: Any) -> None: print(msg, file=file, **kwargs) -def find_settings_in_rst(rst_path: Path, - locals: Dict[str, Any] = None, - builtin_locals: Dict[str, Any] = builtin_locals, - ignore_settings: Set[str] = ignore_settings): +def find_settings_in_rst( + rst_path: Path, + locals: Dict[str, Any] = None, + builtin_locals: Dict[str, Any] = builtin_locals, + ignore_settings: Set[str] = ignore_settings, +): setting: str = None default: Any = None - app = faust.App('_verify_doc_defaults') + app = faust.App("_verify_doc_defaults") _globals = dict(globals()) # Add setting default to globals # so that defaults referencing another setting work. # E.g.: # :default: :setting:`broker_api_version` - _globals.update({ - name: getattr(app.conf, name) - for name in app.conf.setting_names() - }) + _globals.update( + {name: getattr(app.conf, name) for name in app.conf.setting_names()} + ) local_ns: Dict[str, Any] = {**builtin_locals, **(locals or {})} for line in rst_path.read_text().splitlines(): - if line.startswith('.. setting::'): + if line.startswith(".. setting::"): if setting and not default and setting not in ignore_settings: - raise Exception(f'No default value for {setting}') - setting = line.split('::')[-1].strip() - elif ':default:' in line: - if '``' in line: - line, sep, rest = line.rpartition('``') - default = line.split(':default:')[-1].strip() - default = default.strip('`') - default = RE_REF.sub('', default) + raise Exception(f"No default value for {setting}") + setting = line.split("::")[-1].strip() + elif ":default:" in line: + if "``" in line: + line, sep, rest = line.rpartition("``") + default = line.split(":default:")[-1].strip() + default = default.strip("`") + default = RE_REF.sub("", default) default_value = eval(default, _globals, local_ns) if setting not in ignore_settings: yield setting, default_value -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(report_errors(verify_settings(SETTINGS))) diff --git a/faust/agents/agent.py b/faust/agents/agent.py index ba9ef5255..41bebfe6b 100644 --- a/faust/agents/agent.py +++ b/faust/agents/agent.py @@ -226,7 +226,7 @@ def __init__( "Agent concurrency must be 1 when using isolated partitions" ) self.use_reply_headers = use_reply_headers - Service.__init__(self) + Service.__init__(self, loop=app.loop) def on_init_dependencies(self) -> Iterable[ServiceT]: """Return list of services dependencies required to start agent.""" @@ -1090,7 +1090,6 @@ def shortlabel(self) -> str: class AgentTestWrapper(Agent, AgentTestWrapperT): # pragma: no cover - _stream: StreamT def __init__( diff --git a/faust/cli/base.py b/faust/cli/base.py index 4e27f2d50..cc5ef4f8b 100644 --- a/faust/cli/base.py +++ b/faust/cli/base.py @@ -527,6 +527,7 @@ class Command(abc.ABC): # noqa: B024 @classmethod def as_click_command(cls) -> Callable: # pragma: no cover """Convert command into :pypi:`click` command.""" + # This is what actually registers the commands into the # :pypi:`click` command-line interface (the ``def cli`` main above). # __init_subclass__ calls this for the side effect of being diff --git a/faust/livecheck/case.py b/faust/livecheck/case.py index 23056bff3..38061a729 100644 --- a/faust/livecheck/case.py +++ b/faust/livecheck/case.py @@ -176,7 +176,7 @@ def __init__( # signal attributes have the correct signal instance. self.__dict__.update(self.signals) - Service.__init__(self, **kwargs) + Service.__init__(self, loop=app.loop, **kwargs) @Service.timer(10.0) async def _sampler(self) -> None: diff --git a/faust/models/fields.py b/faust/models/fields.py index a51db66e0..e34b22a7d 100644 --- a/faust/models/fields.py +++ b/faust/models/fields.py @@ -331,7 +331,6 @@ def prepare_value( class NumberField(FieldDescriptor[T]): - max_value: Optional[int] min_value: Optional[int] @@ -439,7 +438,6 @@ def validate(self, value: Decimal) -> Iterable[ValidationError]: class CharField(FieldDescriptor[CharacterType]): - max_length: Optional[int] min_length: Optional[int] trim_whitespace: bool diff --git a/faust/models/tags.py b/faust/models/tags.py index ac87b0466..ed0a3cc7c 100644 --- a/faust/models/tags.py +++ b/faust/models/tags.py @@ -79,7 +79,6 @@ def __format__(self, format_spec: str) -> str: class _FrameLocal(UserString, Generic[T]): - _field_name: str _tag_type: str _frame: str diff --git a/faust/streams.py b/faust/streams.py index 482bffe6c..2b3f6f8a1 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -866,7 +866,6 @@ def get_key(withdrawal): if topic is not None: channel = topic else: - prefix = "" if self.prefix and not cast(TopicT, self.channel).has_prefix: prefix = self.prefix + "-" diff --git a/faust/tables/base.py b/faust/tables/base.py index 45f7f63f2..7f90672fa 100644 --- a/faust/tables/base.py +++ b/faust/tables/base.py @@ -125,7 +125,7 @@ def __init__( synchronize_all_active_partitions: bool = False, **kwargs: Any, ) -> None: - Service.__init__(self, **kwargs) + Service.__init__(self, loop=app.loop, **kwargs) self.app = app self.name = cast(str, name) # set lazily so CAN BE NONE! self.default = default diff --git a/faust/tables/objects.py b/faust/tables/objects.py index 33b2ecfa8..66a80d4ad 100644 --- a/faust/tables/objects.py +++ b/faust/tables/objects.py @@ -72,7 +72,7 @@ def __init__(self, table: Table, **kwargs: Any) -> None: self.table_name = self.table.name self.data = {} self._dirty = set() - Service.__init__(self, **kwargs) + Service.__init__(self, loop=table.loop, **kwargs) def send_changelog_event(self, key: Any, operation: int, value: Any) -> None: """Send changelog event to the tables changelog topic.""" diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index b4f13e8b4..0feba69c0 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -1363,7 +1363,6 @@ def verify_recovery_event_path(self, now: float, tp: TP) -> None: class ThreadDelegateConsumer(Consumer): - _thread: ConsumerThread #: Main thread method queue. diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index 9c94d7c35..a0642a281 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -1146,7 +1146,6 @@ async def commit_transaction(self, transactional_id: str) -> None: """Commit transaction by id.""" try: async with self._trn_locks[transactional_id]: - transaction_producer = self._transaction_producers.get(transactional_id) if transaction_producer: await transaction_producer.commit_transaction() @@ -1170,7 +1169,6 @@ async def abort_transaction(self, transactional_id: str) -> None: """Abort and rollback transaction by id.""" try: async with self._trn_locks[transactional_id]: - transaction_producer = self._transaction_producers.get(transactional_id) if transaction_producer: await transaction_producer.abort_transaction() @@ -1210,7 +1208,6 @@ async def commit_transactions( for transactional_id, offsets in tid_to_offset_map.items(): # get the producer async with self._trn_locks[transactional_id]: - transaction_producer = self._transaction_producers.get(transactional_id) if transaction_producer: logger.debug( @@ -1346,7 +1343,6 @@ async def send( try: if transactional_id: async with self._trn_locks[transactional_id]: - return cast( Awaitable[RecordMetadata], await transaction_producer.send( diff --git a/faust/types/agents.py b/faust/types/agents.py index 0e59c4a5d..102e05919 100644 --- a/faust/types/agents.py +++ b/faust/types/agents.py @@ -72,7 +72,6 @@ class _AppT: class ActorT(ServiceT, Generic[_T]): - agent: "AgentT" stream: StreamT it: _T @@ -122,7 +121,6 @@ class AwaitableActorT(ActorT[Awaitable], Awaitable): class AgentT(ServiceT, Generic[_T]): - name: str app: _AppT concurrency: int @@ -325,7 +323,6 @@ def human_tracebacks(self) -> str: class AgentTestWrapperT(AgentT, AsyncIterable): - new_value_processed: asyncio.Condition original_channel: ChannelT results: MutableMapping[int, Any] diff --git a/faust/types/assignor.py b/faust/types/assignor.py index c5fe66d93..d00ffd412 100644 --- a/faust/types/assignor.py +++ b/faust/types/assignor.py @@ -27,7 +27,6 @@ class _AppT: class PartitionAssignorT(abc.ABC): - replicas: int app: _AppT @@ -69,7 +68,6 @@ def tables_metadata(self) -> HostToPartitionMap: class LeaderAssignorT(ServiceT): - app: _AppT @abc.abstractmethod diff --git a/faust/types/events.py b/faust/types/events.py index 6894137dc..96f566ba7 100644 --- a/faust/types/events.py +++ b/faust/types/events.py @@ -28,7 +28,6 @@ class _SchemaT: class EventT(Generic[T], AsyncContextManager): - app: _AppT key: K value: V diff --git a/faust/types/fixups.py b/faust/types/fixups.py index a7855980f..1cbb5e3a2 100644 --- a/faust/types/fixups.py +++ b/faust/types/fixups.py @@ -14,7 +14,6 @@ class _AppT: class FixupT(abc.ABC): - app: _AppT @abc.abstractmethod diff --git a/faust/types/models.py b/faust/types/models.py index 0d1a2fd78..db9ab3f44 100644 --- a/faust/types/models.py +++ b/faust/types/models.py @@ -194,7 +194,6 @@ def validation_errors(self) -> List[ValidationError]: class FieldDescriptorT(Generic[T]): - field: str input_name: str output_name: str diff --git a/faust/types/sensors.py b/faust/types/sensors.py index 5132e1621..50975c99c 100644 --- a/faust/types/sensors.py +++ b/faust/types/sensors.py @@ -147,7 +147,6 @@ class SensorT(SensorInterfaceT, ServiceT): class SensorDelegateT(SensorInterfaceT, Iterable): - # Delegate calls to many sensors. @abc.abstractmethod diff --git a/faust/types/serializers.py b/faust/types/serializers.py index 4b434f31c..849a6a387 100644 --- a/faust/types/serializers.py +++ b/faust/types/serializers.py @@ -28,7 +28,6 @@ class _Message: class RegistryT(abc.ABC): - key_serializer: CodecArg value_serializer: CodecArg @@ -72,7 +71,6 @@ def dumps_value( class SchemaT(Generic[KT, VT]): - key_type: Optional[_ModelArg] = None value_type: Optional[_ModelArg] = None diff --git a/faust/types/stores.py b/faust/types/stores.py index 98c963001..051203111 100644 --- a/faust/types/stores.py +++ b/faust/types/stores.py @@ -33,7 +33,6 @@ class _CollectionT: class StoreT(ServiceT, FastUserDict[KT, VT]): - url: URL app: _AppT table: _CollectionT diff --git a/faust/types/streams.py b/faust/types/streams.py index 48875292d..7fef2704d 100644 --- a/faust/types/streams.py +++ b/faust/types/streams.py @@ -104,7 +104,6 @@ def _human_channel(self) -> str: class StreamT(AsyncIterable[T_co], JoinableT, ServiceT): - app: _AppT channel: AsyncIterator[T_co] outbox: Optional[asyncio.Queue] = None diff --git a/faust/types/topics.py b/faust/types/topics.py index 9d9e390df..2f5a67206 100644 --- a/faust/types/topics.py +++ b/faust/types/topics.py @@ -30,7 +30,6 @@ class _SchemaT: class TopicT(ChannelT): - #: Iterable/Sequence of topic names to subscribe to. topics: Sequence[str] diff --git a/faust/types/transports.py b/faust/types/transports.py index 9911b8b3a..68a2e8111 100644 --- a/faust/types/transports.py +++ b/faust/types/transports.py @@ -78,7 +78,6 @@ class _AppT: class ProducerBufferT(ServiceT): - max_messages: int pending: asyncio.Queue @@ -106,7 +105,6 @@ def size(self) -> int: class ProducerT(ServiceT): - #: The transport that created this Producer. transport: "TransportT" @@ -289,7 +287,6 @@ def iterate(self, records: Mapping[TP, List]) -> Iterator[Tuple[TP, Any]]: class ConsumerT(ServiceT): - #: The transport that created this Consumer. transport: "TransportT" @@ -451,7 +448,6 @@ def on_buffer_drop(self, tp: TP) -> None: class ConductorT(ServiceT, MutableSet[TopicT]): - # The topic conductor delegates messages from the Consumer # to the various Topic instances subscribed to a topic. @@ -492,7 +488,6 @@ def acking_topics(self) -> Set[str]: class TransportT(abc.ABC): - #: The Consumer class used for this type of transport. Consumer: ClassVar[Type[ConsumerT]] diff --git a/faust/types/tuples.py b/faust/types/tuples.py index bbd46c481..73142370f 100644 --- a/faust/types/tuples.py +++ b/faust/types/tuples.py @@ -113,7 +113,6 @@ def _get_len(s: Optional[bytes]) -> int: class Message: - __slots__ = ( "topic", "partition", diff --git a/faust/types/web.py b/faust/types/web.py index f42e7bd2e..31e998a09 100644 --- a/faust/types/web.py +++ b/faust/types/web.py @@ -96,7 +96,6 @@ class ResourceOptions(NamedTuple): class CacheBackendT(ServiceT): - Unavailable: Type[BaseException] @abc.abstractmethod @@ -121,7 +120,6 @@ async def delete(self, key: str) -> None: class CacheT(abc.ABC): - timeout: Optional[Seconds] include_headers: bool key_prefix: str diff --git a/faust/web/base.py b/faust/web/base.py index 79b21453d..809b1a065 100644 --- a/faust/web/base.py +++ b/faust/web/base.py @@ -186,7 +186,7 @@ def __init__(self, app: AppT, **kwargs: Any) -> None: else: blueprints.extend(self.production_blueprints) self.blueprints = BlueprintManager(blueprints) - Service.__init__(self, **kwargs) + Service.__init__(self, loop=app.loop, **kwargs) @abc.abstractmethod def text( diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index c5ce7792c..2c71f4a89 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -939,7 +939,6 @@ def __post_init__(self) -> None: self.data_store = None class AdjustData(Record): - activity_kind: str network_name: str adid: str @@ -998,7 +997,6 @@ def __post_init__(self) -> None: def test_overwrite_asdict(): - with pytest.raises(RuntimeError): class R(Record): @@ -1115,7 +1113,6 @@ class MyBase(faust.Record, abstract=True): def test_raises_when_defaults_in_wrong_order(): - with pytest.raises(TypeError): class X(Record): diff --git a/tests/stress/killer.py b/tests/stress/killer.py index 200bcfcd2..9d9a69f92 100644 --- a/tests/stress/killer.py +++ b/tests/stress/killer.py @@ -19,7 +19,6 @@ def seconds_to_sleep(self): class Chaos(Service): - schedule = [ # Signal -TERM/-INT between every 1 and 30 seconds. # This period lasts for at least half a minute, but never for more diff --git a/tests/stress/reports/logging.py b/tests/stress/reports/logging.py index b009a3d39..d0564cbe1 100644 --- a/tests/stress/reports/logging.py +++ b/tests/stress/reports/logging.py @@ -12,7 +12,6 @@ class LogPusher(Service): - app: faust.App queue: asyncio.Queue @@ -36,7 +35,6 @@ async def _flush(self) -> None: class LogHandler(logging.Handler): - app: faust.App def __init__(self, app: faust.App, *args: Any, **kwargs: Any) -> None: diff --git a/tests/stress/reports/models.py b/tests/stress/reports/models.py index d04482695..f6db46446 100644 --- a/tests/stress/reports/models.py +++ b/tests/stress/reports/models.py @@ -6,7 +6,6 @@ class Error(faust.Record): - #: Message (the actual formatted log message). message: str @@ -39,7 +38,6 @@ class Error(faust.Record): class Status(faust.Record): - #: The id of the app that is sending this. app_id: str diff --git a/tests/unit/agents/test_actor.py b/tests/unit/agents/test_actor.py index a71a505c7..9527b9b34 100644 --- a/tests/unit/agents/test_actor.py +++ b/tests/unit/agents/test_actor.py @@ -18,7 +18,6 @@ def traceback(self) -> str: class Test_Actor: - ActorType = FakeActor @pytest.fixture() @@ -94,7 +93,6 @@ def test_repr(self, *, actor): class Test_AsyncIterableActor(Test_Actor): - ActorType = AsyncIterableActor def test_aiter(self, *, actor, it): @@ -104,7 +102,6 @@ def test_aiter(self, *, actor, it): class Test_AwaitableActor(Test_Actor): - ActorType = AwaitableActor def test_await(self, *, actor, it): diff --git a/tests/unit/app/test_base.py b/tests/unit/app/test_base.py index fa4ffd3ba..057668f55 100644 --- a/tests/unit/app/test_base.py +++ b/tests/unit/app/test_base.py @@ -828,7 +828,6 @@ def test_SetGlobalTable(self, *, app): assert isinstance(app.tables.data["name"], GlobalTableT) def test_page(self, *, app): - with patch("faust.app.base.venusian") as venusian: @app.page("/foo") @@ -840,7 +839,6 @@ async def view(self, request): venusian.attach.assert_called_once_with(view, category=SCAN_PAGE) def test_page__with_cors_options(self, *, app): - with patch("faust.app.base.venusian") as venusian: @app.page( diff --git a/tests/unit/livecheck/patches/test_aiohttp.py b/tests/unit/livecheck/patches/test_aiohttp.py index 73c56bd7d..c73737a5a 100644 --- a/tests/unit/livecheck/patches/test_aiohttp.py +++ b/tests/unit/livecheck/patches/test_aiohttp.py @@ -9,7 +9,6 @@ @pytest.mark.asyncio async def test_patch_aiohttp_session(*, execution): - patch_aiohttp_session() from aiohttp.client import ClientSession diff --git a/tests/unit/livecheck/test_app.py b/tests/unit/livecheck/test_app.py index 1ecbbb745..2fe40b035 100644 --- a/tests/unit/livecheck/test_app.py +++ b/tests/unit/livecheck/test_app.py @@ -113,7 +113,6 @@ class SignalWithNoneOrigin(livecheck.Signal): @livecheck.case() class Test_foo: - signal1: livecheck.Signal signal2: SignalWithNoneOrigin signal3: livecheck.Signal = livecheck.Signal() diff --git a/tests/unit/stores/test_aerospike.py b/tests/unit/stores/test_aerospike.py index 55cdfb036..28e5d96da 100644 --- a/tests/unit/stores/test_aerospike.py +++ b/tests/unit/stores/test_aerospike.py @@ -100,7 +100,6 @@ def test_set_success( store, ): with patch("faust.stores.aerospike.aerospike", MagicMock()) as aero: - store.client.put = MagicMock() key = b"key" value = b"value" @@ -198,7 +197,6 @@ def test_iteritems_error(self, store): def test_iteritems_success(self, store): with patch("faust.stores.aerospike.aerospike", MagicMock()): - scan = MagicMock() store.client.scan = MagicMock(return_value=scan) scan_result = [ diff --git a/tests/unit/stores/test_rocksdb.py b/tests/unit/stores/test_rocksdb.py index 3977bb936..da7888a31 100644 --- a/tests/unit/stores/test_rocksdb.py +++ b/tests/unit/stores/test_rocksdb.py @@ -365,7 +365,6 @@ def new_db(self, name, exists=False): return db def test_get_bucket_for_key__not_in_index(self, *, store): - dbs = { 1: self.new_db(name="db1"), 2: self.new_db(name="db2"), diff --git a/tests/unit/tables/test_base.py b/tests/unit/tables/test_base.py index 588283a61..84e800083 100644 --- a/tests/unit/tables/test_base.py +++ b/tests/unit/tables/test_base.py @@ -188,7 +188,6 @@ def test_on_changelog_sent__transactions(self, *, table): @pytest.mark.asyncio async def test_last_closed_window(self, *, table): - assert table.last_closed_window == 0.0 table.window = Mock(name="window") diff --git a/tests/unit/transport/drivers/test_aiokafka.py b/tests/unit/transport/drivers/test_aiokafka.py index b7f6bd330..ff6109b8e 100644 --- a/tests/unit/transport/drivers/test_aiokafka.py +++ b/tests/unit/transport/drivers/test_aiokafka.py @@ -529,7 +529,6 @@ def test_timed_out(self, *, cthread, now, tp, logger): @pytest.mark.skip("Needs fixing") class Test_VEP_stream_idle_no_highwater(Test_verify_event_path_base): - highwater = 10 committed_offset = 10 diff --git a/tests/unit/transport/test_producer.py b/tests/unit/transport/test_producer.py index 368c6b0f6..0d1b6c3a3 100644 --- a/tests/unit/transport/test_producer.py +++ b/tests/unit/transport/test_producer.py @@ -121,7 +121,6 @@ def _put(self, buf, items): @pytest.mark.asyncio async def test_flush_atmost(self, *, buf): - sent_messages = 0 def create_send_pending_mock(max_messages):