From 80ee72bf9ab720383e73597184aacc5ba36821dc Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Thu, 5 Sep 2024 14:59:47 +0300 Subject: [PATCH] fix (#1759): correct ConfluentConfig with enums (#1762) * docs: remove confusing reply_to result * fix (#1759): cast Enums to str in ConfluentConfig * tests: add check for enum->str confluent config * lint: fix typo * docs: generate API References --------- Co-authored-by: Lancetnik --- docs/docs/SUMMARY.md | 1 + .../confluent/config/ConfluentFastConfig.md | 11 +++ docs/docs/en/nats/rpc.md | 2 +- docs/docs/en/rabbit/rpc.md | 2 +- faststream/confluent/broker/broker.py | 3 +- faststream/confluent/client.py | 39 +++++----- faststream/confluent/config.py | 78 +++++++++++++------ tests/brokers/confluent/test_connect.py | 34 +++++++- 8 files changed, 122 insertions(+), 48 deletions(-) create mode 100644 docs/docs/en/api/faststream/confluent/config/ConfluentFastConfig.md diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 74d02113ee..a5e41f0749 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -492,6 +492,7 @@ search: - [CompressionCodec](api/faststream/confluent/config/CompressionCodec.md) - [CompressionType](api/faststream/confluent/config/CompressionType.md) - [ConfluentConfig](api/faststream/confluent/config/ConfluentConfig.md) + - [ConfluentFastConfig](api/faststream/confluent/config/ConfluentFastConfig.md) - [Debug](api/faststream/confluent/config/Debug.md) - [GroupProtocol](api/faststream/confluent/config/GroupProtocol.md) - [IsolationLevel](api/faststream/confluent/config/IsolationLevel.md) diff --git a/docs/docs/en/api/faststream/confluent/config/ConfluentFastConfig.md b/docs/docs/en/api/faststream/confluent/config/ConfluentFastConfig.md new file mode 100644 index 0000000000..27861ffd5b --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/config/ConfluentFastConfig.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.config.ConfluentFastConfig diff --git a/docs/docs/en/nats/rpc.md b/docs/docs/en/nats/rpc.md index ff390750fd..97da003118 100644 --- a/docs/docs/en/nats/rpc.md +++ b/docs/docs/en/nats/rpc.md @@ -43,7 +43,7 @@ So, if you have such one, you can specify it with the `reply_to` argument. This async def consume_responses(msg): ... -msg = await broker.publish( +await broker.publish( "Hi!", subject="test", reply_to="response-subject", diff --git a/docs/docs/en/rabbit/rpc.md b/docs/docs/en/rabbit/rpc.md index 2a132a7705..31acd67b4b 100644 --- a/docs/docs/en/rabbit/rpc.md +++ b/docs/docs/en/rabbit/rpc.md @@ -40,7 +40,7 @@ So, if you have such one, you can specify it with the `reply_to` argument. This async def consume_responses(msg): ... -msg = await broker.publish( +await broker.publish( "Hi!", queue="test", reply_to="response-queue", diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index e5c9754e00..df97025e33 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -26,6 +26,7 @@ AsyncConfluentConsumer, AsyncConfluentProducer, ) +from faststream.confluent.config import ConfluentFastConfig from faststream.confluent.publisher.producer import AsyncConfluentFastProducer from faststream.confluent.schemas.params import ConsumerConnectionParams from faststream.confluent.security import parse_security @@ -394,7 +395,7 @@ def __init__( ) self.client_id = client_id self._producer = None - self.config = config + self.config = ConfluentFastConfig(config) async def _close( self, diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 7b412a2e17..9cab290fb9 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -16,7 +16,7 @@ from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer from confluent_kafka.admin import AdminClient, NewTopic -from faststream.confluent.config import ConfluentConfig +from faststream.confluent import config as config_module from faststream.confluent.schemas import TopicPartition from faststream.exceptions import SetupError from faststream.log import logger as faststream_logger @@ -34,6 +34,7 @@ def __init__( self, *, logger: Optional["LoggerProto"], + config: config_module.ConfluentFastConfig, bootstrap_servers: Union[str, List[str]] = "localhost", client_id: Optional[str] = None, metadata_max_age_ms: int = 300000, @@ -53,12 +54,9 @@ def __init__( sasl_mechanism: Optional[str] = None, sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, - config: Optional[ConfluentConfig] = None, ) -> None: self.logger = logger - self.config: Dict[str, Any] = {} if config is None else dict(config) - if isinstance(bootstrap_servers, Iterable) and not isinstance( bootstrap_servers, str ): @@ -89,10 +87,11 @@ def __init__( "connections.max.idle.ms": connections_max_idle_ms, "allow.auto.create.topics": allow_auto_create_topics, } - self.config = {**self.config, **config_from_params} + + final_config = {**config.as_config_dict(), **config_from_params} if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]: - self.config.update( + final_config.update( { "sasl.mechanism": sasl_mechanism, "sasl.username": sasl_plain_username, @@ -100,7 +99,7 @@ def __init__( } ) - self.producer = Producer(self.config, logger=self.logger) + self.producer = Producer(final_config, logger=self.logger) async def stop(self) -> None: """Stop the Kafka producer and flush remaining messages.""" @@ -180,6 +179,7 @@ def __init__( *topics: str, partitions: Sequence["TopicPartition"], logger: Optional["LoggerProto"], + config: config_module.ConfluentFastConfig, bootstrap_servers: Union[str, List[str]] = "localhost", client_id: Optional[str] = "confluent-kafka-consumer", group_id: Optional[str] = None, @@ -205,18 +205,9 @@ def __init__( sasl_mechanism: Optional[str] = None, sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, - config: Optional[ConfluentConfig] = None, ) -> None: self.logger = logger - self.config: Dict[str, Any] = {} if config is None else dict(config) - - if group_id is None: - group_id = self.config.get("group.id", "faststream-consumer-group") - - if group_instance_id is None: - group_instance_id = self.config.get("group.instance.id", None) - if isinstance(bootstrap_servers, Iterable) and not isinstance( bootstrap_servers, str ): @@ -232,13 +223,18 @@ def __init__( for x in partition_assignment_strategy ] ) + + final_config = config.as_config_dict() + config_from_params = { "allow.auto.create.topics": allow_auto_create_topics, "topic.metadata.refresh.interval.ms": 1000, "bootstrap.servers": bootstrap_servers, "client.id": client_id, - "group.id": group_id, - "group.instance.id": group_instance_id, + "group.id": group_id + or final_config.get("group.id", "faststream-consumer-group"), + "group.instance.id": group_instance_id + or final_config.get("group.instance.id", None), "fetch.wait.max.ms": fetch_max_wait_ms, "fetch.max.bytes": fetch_max_bytes, "fetch.min.bytes": fetch_min_bytes, @@ -259,10 +255,10 @@ def __init__( "isolation.level": isolation_level, } self.allow_auto_create_topics = allow_auto_create_topics - self.config = {**self.config, **config_from_params} + final_config.update(config_from_params) if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]: - self.config.update( + final_config.update( { "sasl.mechanism": sasl_mechanism, "sasl.username": sasl_plain_username, @@ -270,7 +266,8 @@ def __init__( } ) - self.consumer = Consumer(self.config, logger=self.logger) + self.config = final_config + self.consumer = Consumer(final_config, logger=self.logger) @property def topics_to_create(self) -> List[str]: diff --git a/faststream/confluent/config.py b/faststream/confluent/config.py index 218c2b9c5a..16de28b3f1 100644 --- a/faststream/confluent/config.py +++ b/faststream/confluent/config.py @@ -1,10 +1,13 @@ from enum import Enum -from typing import Any, Callable +from typing import TYPE_CHECKING, Any, Callable, Optional, Union from typing_extensions import TypedDict +if TYPE_CHECKING: + from faststream.types import AnyDict -class BuiltinFeatures(Enum): + +class BuiltinFeatures(str, Enum): gzip = "gzip" snappy = "snappy" ssl = "ssl" @@ -21,7 +24,7 @@ class BuiltinFeatures(Enum): oidc = "oidc" -class Debug(Enum): +class Debug(str, Enum): generic = "generic" broker = "broker" topic = "topic" @@ -44,41 +47,41 @@ class Debug(Enum): all = "all" -class BrokerAddressFamily(Enum): +class BrokerAddressFamily(str, Enum): any = "any" v4 = "v4" v6 = "v6" -class SecurityProtocol(Enum): +class SecurityProtocol(str, Enum): plaintext = "plaintext" ssl = "ssl" sasl_plaintext = "sasl_plaintext" sasl_ssl = "sasl_ssl" -class SASLOAUTHBearerMethod(Enum): +class SASLOAUTHBearerMethod(str, Enum): default = "default" oidc = "oidc" -class GroupProtocol(Enum): +class GroupProtocol(str, Enum): classic = "classic" consumer = "consumer" -class OffsetStoreMethod(Enum): +class OffsetStoreMethod(str, Enum): none = "none" file = "file" broker = "broker" -class IsolationLevel(Enum): +class IsolationLevel(str, Enum): read_uncommitted = "read_uncommitted" read_committed = "read_committed" -class CompressionCodec(Enum): +class CompressionCodec(str, Enum): none = "none" gzip = "gzip" snappy = "snappy" @@ -86,7 +89,7 @@ class CompressionCodec(Enum): zstd = "zstd" -class CompressionType(Enum): +class CompressionType(str, Enum): none = "none" gzip = "gzip" snappy = "snappy" @@ -94,7 +97,7 @@ class CompressionType(Enum): zstd = "zstd" -class ClientDNSLookup(Enum): +class ClientDNSLookup(str, Enum): use_all_dns_ips = "use_all_dns_ips" resolve_canonical_bootstrap_servers_only = ( "resolve_canonical_bootstrap_servers_only" @@ -104,7 +107,17 @@ class ClientDNSLookup(Enum): ConfluentConfig = TypedDict( "ConfluentConfig", { - "builtin.features": BuiltinFeatures, + "compression.codec": Union[CompressionCodec, str], + "compression.type": Union[CompressionType, str], + "client.dns.lookup": Union[ClientDNSLookup, str], + "offset.store.method": Union[OffsetStoreMethod, str], + "isolation.level": Union[IsolationLevel, str], + "sasl.oauthbearer.method": Union[SASLOAUTHBearerMethod, str], + "security.protocol": Union[SecurityProtocol, str], + "broker.address.family": Union[BrokerAddressFamily, str], + "builtin.features": Union[BuiltinFeatures, str], + "debug": Union[Debug, str], + "group.protocol": Union[GroupProtocol, str], "client.id": str, "metadata.broker.list": str, "bootstrap.servers": str, @@ -120,7 +133,6 @@ class ClientDNSLookup(Enum): "topic.metadata.refresh.sparse": bool, "topic.metadata.propagation.max.ms": int, "topic.blacklist": str, - "debug": Debug, "socket.timeout.ms": int, "socket.blocking.max.ms": int, "socket.send.buffer.bytes": int, @@ -129,7 +141,6 @@ class ClientDNSLookup(Enum): "socket.nagle.disable": bool, "socket.max.fails": int, "broker.address.ttl": int, - "broker.address.family": BrokerAddressFamily, "socket.connection.setup.timeout.ms": int, "connections.max.idle.ms": int, "reconnect.backoff.jitter.ms": int, @@ -160,7 +171,6 @@ class ClientDNSLookup(Enum): "api.version.fallback.ms": int, "broker.version.fallback": str, "allow.auto.create.topics": bool, - "security.protocol": SecurityProtocol, "ssl.cipher.suites": str, "ssl.curves.list": str, "ssl.sigalgs.list": str, @@ -197,7 +207,6 @@ class ClientDNSLookup(Enum): "sasl.oauthbearer.config": str, "enable.sasl.oauthbearer.unsecure.jwt": bool, "oauthbearer_token_refresh_cb": Callable[..., Any], - "sasl.oauthbearer.method": SASLOAUTHBearerMethod, "sasl.oauthbearer.client.id": str, "sasl.oauthbearer.client.secret": str, "sasl.oauthbearer.scope": str, @@ -211,7 +220,6 @@ class ClientDNSLookup(Enum): "session.timeout.ms": str, "heartbeat.interval.ms": str, "group.protocol.type": str, - "group.protocol": GroupProtocol, "group.remote.assignor": str, "coordinator.query.interval.ms": int, "max.poll.interval.ms": int, @@ -227,8 +235,6 @@ class ClientDNSLookup(Enum): "fetch.max.bytes": int, "fetch.min.bytes": int, "fetch.error.backoff.ms": int, - "offset.store.method": OffsetStoreMethod, - "isolation.level": IsolationLevel, "consume_cb": Callable[..., Any], "rebalance_cb": Callable[..., Any], "offset_commit_cb": Callable[..., Any], @@ -248,15 +254,41 @@ class ClientDNSLookup(Enum): "retry.backoff.ms": int, "retry.backoff.max.ms": int, "queue.buffering.backpressure.threshold": int, - "compression.codec": CompressionCodec, - "compression.type": CompressionType, "batch.num.messages": int, "batch.size": int, "delivery.report.only.error": bool, "dr_cb": Callable[..., Any], "dr_msg_cb": Callable[..., Any], "sticky.partitioning.linger.ms": int, - "client.dns.lookup": ClientDNSLookup, }, total=False, ) + + +class ConfluentFastConfig: + def __init__(self, config: Optional[ConfluentConfig]) -> None: + self.config = config + + def as_config_dict(self) -> "AnyDict": + if not self.config: + return {} + + data = dict(self.config) + + for key, enum in ( + ("compression.codec", CompressionCodec), + ("compression.type", CompressionType), + ("client.dns.lookup", ClientDNSLookup), + ("offset.store.method", OffsetStoreMethod), + ("isolation.level", IsolationLevel), + ("sasl.oauthbearer.method", SASLOAUTHBearerMethod), + ("security.protocol", SecurityProtocol), + ("broker.address.family", BrokerAddressFamily), + ("builtin.features", BuiltinFeatures), + ("debug", Debug), + ("group.protocol", GroupProtocol), + ): + if key in data: + data[key] = enum(data[key]).value + + return data diff --git a/tests/brokers/confluent/test_connect.py b/tests/brokers/confluent/test_connect.py index bcfcc71ea4..0861c8f9d5 100644 --- a/tests/brokers/confluent/test_connect.py +++ b/tests/brokers/confluent/test_connect.py @@ -1,9 +1,41 @@ import pytest -from faststream.confluent import KafkaBroker +from faststream.confluent import KafkaBroker, config from tests.brokers.base.connection import BrokerConnectionTestcase +def test_correct_config(): + broker = KafkaBroker( + config={ + "compression.codec": config.CompressionCodec.none, + "compression.type": config.CompressionType.none, + "client.dns.lookup": config.ClientDNSLookup.use_all_dns_ips, + "offset.store.method": config.OffsetStoreMethod.broker, + "isolation.level": config.IsolationLevel.read_uncommitted, + "sasl.oauthbearer.method": config.SASLOAUTHBearerMethod.default, + "security.protocol": config.SecurityProtocol.ssl, + "broker.address.family": config.BrokerAddressFamily.any, + "builtin.features": config.BuiltinFeatures.gzip, + "debug": config.Debug.broker, + "group.protocol": config.GroupProtocol.classic, + } + ) + + assert broker.config.as_config_dict() == { + "compression.codec": config.CompressionCodec.none.value, + "compression.type": config.CompressionType.none.value, + "client.dns.lookup": config.ClientDNSLookup.use_all_dns_ips.value, + "offset.store.method": config.OffsetStoreMethod.broker.value, + "isolation.level": config.IsolationLevel.read_uncommitted.value, + "sasl.oauthbearer.method": config.SASLOAUTHBearerMethod.default.value, + "security.protocol": config.SecurityProtocol.ssl.value, + "broker.address.family": config.BrokerAddressFamily.any.value, + "builtin.features": config.BuiltinFeatures.gzip.value, + "debug": config.Debug.broker.value, + "group.protocol": config.GroupProtocol.classic.value, + } + + @pytest.mark.confluent class TestConnection(BrokerConnectionTestcase): broker = KafkaBroker