Closed
Description
Description
I'm using Azure Event Hubs with the confluent-kafka client with OAuth-Authentication. While the following snippet worked with confluent-kafka 2.4.0 and 2.5.0, it stopped working with 2.6.0 (librdkafka 2.6.0) with this error:
%6|1730980586.653|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://xxx.servicebus.windows.net:9093/bootstrap]: sasl_ssl://xxx.servicebus.windows.net:9093/0: Disconnected (after 3891ms in state UP)
Maybe I'm missing something. I would appreciate any hints!
How to reproduce
from functools import partial
from azure.identity import DefaultAzureCredential
from confluent_kafka import Consumer
def get_token(namespace: str, config: str) -> tuple[str, float]:
token = DefaultAzureCredential().get_token(f"https://{namespace}/.default")
return token.token, token.expires_on
def main():
namespace = "xxx.servicebus.windows.net"
topic = "test-hub"
conf = {
"bootstrap.servers": f"{namespace}:9093",
"group.id": "quix-consumer",
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "OAUTHBEARER",
"oauth_cb": partial(get_token, namespace),
"auto.offset.reset": "earliest",
}
consumer = Consumer(conf)
consumer.subscribe([topic])
while True:
try:
msg = consumer.poll(1.0)
if msg is None:
continue
print(msg.value())
except KeyboardInterrupt:
break
consumer.close()
if __name__ == "__main__":
main()
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): 2.6.0 - Apache Kafka broker version: Azure Event Hubs
- Client configuration:
{...}
- Operating system: Microsoft Windows 11 Business - 10.0.26100 Build 26100
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue
Full debug log:
%7|1730998906.887|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 1ms in state AUTH_HANDSHAKE) (_TRANSPORT)
%7|1730998906.887|TERM|rdkafka#consumer-1| [thrd:main]: Setting state to TERMINATED and signalling
%7|1730998906.887|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state AUTH_HANDSHAKE -> DOWN
%7|1730998906.887|TERM|rdkafka#consumer-1| [thrd:app]: Ended waiting for termination of telemetry.
%7|1730998906.889|TERMINATE|rdkafka#consumer-1| [thrd:app]: Interrupting timers
%7|1730998906.887|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1730998906.889|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 1 buffers
%7|1730998906.889|TERMINATE|rdkafka#consumer-1| [thrd:app]: Sending TERMINATE to internal main thread
%7|1730998906.889|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: SASL OAUTHBEARER mechanism handshake failed: Local: Broker transport failure: broker's supported mechanisms: (n/a) (after 0ms in state DOWN) (_AUTHENTICATION)
%7|1730998906.889|TERMINATE|rdkafka#consumer-1| [thrd:app]: Joining internal main thread
%7|1730998906.889|TERMINATE|rdkafka#consumer-1| [thrd:main]: Internal main thread terminating
%3|1730998906.889|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: SASL OAUTHBEARER mechanism handshake failed: Local: Broker transport failure: broker's supported mechanisms: (n/a) (after 0ms in state DOWN)
%7|1730998906.890|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1730998906.890|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1730998906.890|DESTROY|rdkafka#consumer-1| [thrd:main]: Destroy internal
%7|1730998906.890|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
%7|1730998906.890|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1730998906.890|BRKTERM|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: terminating: broker still has 4 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1730998906.890|DESTROY|rdkafka#consumer-1| [thrd:main]: Removing all topics
%7|1730998906.890|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1730998906.891|DESTROY|rdkafka#consumer-1| [thrd:main]: Sending TERMINATE to sasl_ssl://xxx.servicebus.windows.net:9093/0
%7|1730998906.891|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
%7|1730998906.891|DESTROY|rdkafka#consumer-1| [thrd:main]: Sending TERMINATE to GroupCoordinator
%7|1730998906.891|TERM|rdkafka#consumer-1| [thrd:sasl_ssl://xxx.servicebus.windows.net:9093/bootstrap]: sasl_ssl://xxx.servicebus.windows.net:9093/0: Received TERMINATE op in state UP: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1730998906.892|BRKTERM|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: terminating: broker still has 3 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1730998906.892|TERMINATE|rdkafka#consumer-1| [thrd:main]: Purging reply queue
%7|1730998906.892|TERMINATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Handle is terminating in state DOWN: 2 refcnts (000001834E99DE58), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1730998906.892|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://xxx.servicebus.windows.net:9093/bootstrap]: sasl_ssl://xxx.servicebus.windows.net:9093/0: Client is terminating (after 143ms in state UP) (_DESTROY)
%7|1730998906.892|TERMINATE|rdkafka#consumer-1| [thrd:main]: Decommissioning internal broker
```
Metadata
Metadata
Assignees
Labels
No labels