Closed
Description
Description
We have some custom logic to acquire oauth tokens that need to be sent as a header to the schema registry. Unfortunately, the schema registry client only allows for basic auth and ssl auth. We worked around it like this:
class SchemaRegistryClient(ConfluentKafkaSchemaRegistryClient):
def __init__(self, config: SchemaRegistryConfig):
config_dict = {
"url": config.url
}
super().__init__(config_dict)
if config.use_oauth:
# override httpx client because we cannot configure a custom auth class
self._rest_client.session = httpx.Client(
verify=self._rest_client.verify,
cert=self._rest_client.cert,
# This is the property that can't be passed along
auth=AADAuth(OauthClient(scope=urlsplit(config.url).netloc)),
proxy=self._rest_client.proxy,
timeout=self._rest_client.timeout,
)
class AADAuth(httpx.Auth):
def __init__(self, oauth_client: OauthClient) -> None:
self._oauth_client = oauth_client
def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]:
token = self._oauth_client.get_token()
request.headers["Authorization"] = f"Bearer {token}"
yield request
Allowing the auth
property to be passed to the schema registry client would make it a lot more flexible for anything outside of basic auth and certificate auth.
How to reproduce
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): - Apache Kafka broker version:
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue