Skip to content

Commit 960f4b0

Browse files
Added an example for OAUTH OIDC producer with support for confluent cloud
Co-authored-by: Sarwar Bhuiyan <sarwar.bhuiyan@gmail.com>
1 parent dd5e0b6 commit 960f4b0

File tree

2 files changed

+125
-0
lines changed

2 files changed

+125
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ v2.4.1 is a maintenance release with the following fixes and enhancements:
77
- Removed usage of `strcpy` to enhance security of the client (#1745)
88
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
99
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
10+
- Added an example for OAUTH OIDC producer with support for confluent cloud (#1769, @sarwarbhuiyan)
1011

1112
confluent-kafka-python is based on librdkafka v2.4.1, see the
1213
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
# This uses OAuth client credentials grant:
20+
# https://www.oauth.com/oauth2-servers/access-tokens/client-credentials/
21+
# where client_id and client_secret are passed as HTTP Authorization header
22+
23+
import logging
24+
import argparse
25+
from confluent_kafka import Producer
26+
from confluent_kafka.serialization import StringSerializer
27+
28+
29+
def producer_config(args):
30+
logger = logging.getLogger(__name__)
31+
logger.setLevel(logging.DEBUG)
32+
params = {
33+
'bootstrap.servers': args.bootstrap_servers,
34+
'security.protocol': 'SASL_SSL',
35+
'sasl.mechanisms': 'OAUTHBEARER',
36+
'sasl.oauthbearer.method': 'oidc',
37+
'sasl.oauthbearer.client.id': args.client_id,
38+
'sasl.oauthbearer.client.secret': args.client_secret,
39+
'sasl.oauthbearer.token.endpoint.url': args.token_url,
40+
'sasl.oauthbearer.scope': ' '.join(args.scopes)
41+
}
42+
# These two parameters are only applicable when producing to
43+
# confluent cloud where some sasl extensions are required.
44+
if args.logical_cluster and args.identity_pool_id:
45+
params['sasl.oauthbearer.extensions'] = 'logicalCluster=' + args.logical_cluster + \
46+
',identityPoolId=' + args.identity_pool_id
47+
48+
return params
49+
50+
def delivery_report(err, msg):
51+
"""
52+
Reports the failure or success of a message delivery.
53+
54+
Args:
55+
err (KafkaError): The error that occurred on None on success.
56+
57+
msg (Message): The message that was produced or failed.
58+
59+
Note:
60+
In the delivery report callback the Message.key() and Message.value()
61+
will be the binary format as encoded by any configured Serializers and
62+
not the same object that was passed to produce().
63+
If you wish to pass the original object(s) for key and value to delivery
64+
report callback we recommend a bound callback or lambda where you pass
65+
the objects along.
66+
67+
"""
68+
if err is not None:
69+
print('Delivery failed for User record {}: {}'.format(msg.key(), err))
70+
return
71+
print('User record {} successfully produced to {} [{}] at offset {}'.format(
72+
msg.key(), msg.topic(), msg.partition(), msg.offset()))
73+
74+
75+
def main(args):
76+
topic = args.topic
77+
delimiter = args.delimiter
78+
producer_conf = producer_config(args)
79+
producer = Producer(producer_conf)
80+
serializer = StringSerializer('utf_8')
81+
82+
print('Producing records to topic {}. ^C to exit.'.format(topic))
83+
while True:
84+
# Serve on_delivery callbacks from previous calls to produce()
85+
producer.poll(0.0)
86+
try:
87+
msg_data = input(">")
88+
msg = msg_data.split(delimiter)
89+
if len(msg) == 2:
90+
producer.produce(topic=topic,
91+
key=serializer(msg[0]),
92+
value=serializer(msg[1]),
93+
on_delivery=delivery_report)
94+
else:
95+
producer.produce(topic=topic,
96+
value=serializer(msg[0]),
97+
on_delivery=delivery_report)
98+
except KeyboardInterrupt:
99+
break
100+
101+
print('\nFlushing {} records...'.format(len(producer)))
102+
producer.flush()
103+
104+
105+
if __name__ == '__main__':
106+
parser = argparse.ArgumentParser(description="OAUTH example with client credentials grant")
107+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
108+
help="Bootstrap broker(s) (host[:port])")
109+
parser.add_argument('-t', dest="topic", default="example_producer_oauth",
110+
help="Topic name")
111+
parser.add_argument('-d', dest="delimiter", default="|",
112+
help="Key-Value delimiter. Defaults to '|'"),
113+
parser.add_argument('--client', dest="client_id", required=True,
114+
help="Client ID for client credentials flow")
115+
parser.add_argument('--secret', dest="client_secret", required=True,
116+
help="Client secret for client credentials flow.")
117+
parser.add_argument('--token-url', dest="token_url", required=True,
118+
help="Token URL.")
119+
parser.add_argument('--scopes', dest="scopes", required=True, nargs='+',
120+
help="Scopes requested from OAuth server.")
121+
parser.add_argument('--logical-cluster', dest="logical_cluster", required=False, help="Logical Cluster.")
122+
parser.add_argument('--identity-pool-id', dest="identity_pool_id", required=False, help="Identity Pool ID.")
123+
124+
main(parser.parse_args())

0 commit comments

Comments
 (0)