Skip to content

Commit df649a4

Browse files
rnpridgeonRyan P
authored andcommitted
Add avro-cli example, rethrow deserialization exception with message contexts.
1 parent 4308df4 commit df649a4

File tree

4 files changed

+205
-189
lines changed

4 files changed

+205
-189
lines changed

confluent_kafka/avro/__init__.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class AvroConsumer(Consumer):
103103
:param schema reader_value_schema: a reader schema for the message value
104104
:raises ValueError: For invalid configurations
105105
"""
106+
106107
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):
107108

108109
sr_conf = {key.replace("schema.registry.", ""): value
@@ -138,13 +139,19 @@ def poll(self, timeout=None):
138139
message = super(AvroConsumer, self).poll(timeout)
139140
if message is None:
140141
return None
141-
if not message.value() and not message.key():
142-
return message
142+
143143
if not message.error():
144-
if message.value() is not None:
145-
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
146-
message.set_value(decoded_value)
147-
if message.key() is not None:
148-
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
149-
message.set_key(decoded_key)
144+
try:
145+
if message.value() is not None:
146+
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
147+
message.set_value(decoded_value)
148+
if message.key() is not None:
149+
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
150+
message.set_key(decoded_key)
151+
except SerializerError as e:
152+
raise SerializerError("Message deserialization failed for message at {} [{}] offset {}: {}".format(
153+
message.topic(),
154+
message.partition(),
155+
message.offset(),
156+
e))
150157
return message

confluent_kafka/avro/serializer/message_serializer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def decode_message(self, message, is_key=False):
214214
"""
215215
Decode a message from kafka that has been encoded for use with
216216
the schema registry.
217-
:param str|bytes or None message: message key or value to be decoded
217+
:param str|bytes or None message: message key or value to be decoded
218218
:returns: Decoded message contents.
219219
:rtype dict:
220220
"""

examples/avro-cli.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2018 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import argparse
19+
from uuid import uuid4
20+
21+
from six.moves import input
22+
23+
from confluent_kafka import avro
24+
25+
# Parse Schema used for serializing User class
26+
record_schema = avro.loads("""
27+
{
28+
"namespace": "confluent.io.examples.serialization.avro",
29+
"name": "User",
30+
"type": "record",
31+
"fields": [
32+
{"name": "name", "type": "string"},
33+
{"name": "favorite_number", "type": "int"},
34+
{"name": "favorite_color", "type": "string"}
35+
]
36+
}
37+
""")
38+
39+
40+
class User(object):
41+
"""
42+
User stores the deserialized user Avro record.
43+
"""
44+
45+
# Use __slots__ to explicitly declare all data members.
46+
__slots__ = ["name", "favorite_number", "favorite_color", "id"]
47+
48+
def __init__(self, name=None, favorite_number=None, favorite_color=None):
49+
self.name = name
50+
self.favorite_number = favorite_number
51+
self.favorite_color = favorite_color
52+
# Unique id used to track produce request success/failures.
53+
# Do *not* include in the serialized object.
54+
self.id = uuid4()
55+
56+
def to_dict(self):
57+
"""
58+
The Avro Python library does not support code generation.
59+
For this reason we must provide a dict representation of our class for serialization.
60+
"""
61+
return {
62+
"name": self.name,
63+
"favorite_number": self.favorite_number,
64+
"favorite_color": self.favorite_color
65+
}
66+
67+
68+
def on_delivery(err, msg, obj):
69+
"""
70+
Handle delivery reports served from producer.poll.
71+
This callback takes an extra argument, obj.
72+
This allows the original contents to be included for debugging purposes.
73+
"""
74+
if err is not None:
75+
print('Message {} delivery failed for user {} with error {}'.format(
76+
obj.id, obj.name, err))
77+
else:
78+
print('Message {} successfully produced to {} [{}] at offset {}'.format(
79+
obj.id, msg.topic(), msg.partition(), msg.offset()))
80+
81+
82+
def produce(topic, conf):
83+
"""
84+
Produce User records
85+
"""
86+
87+
from confluent_kafka.avro import AvroProducer
88+
89+
producer = AvroProducer(conf, default_value_schema=record_schema)
90+
91+
print("Producing user records to topic {}. ^c to exit.".format(topic))
92+
while True:
93+
# Instantiate new User, populate fields, produce record, execute callbacks.
94+
record = User()
95+
try:
96+
record.name = input("Enter name: ")
97+
record.favorite_number = int(input("Enter favorite number: "))
98+
record.favorite_color = input("Enter favorite color: ")
99+
100+
# The message passed to the delivery callback will already be serialized.
101+
# To aid in debugging we provide the original object to the delivery callback.
102+
producer.produce(topic=topic, value=record.to_dict(),
103+
callback=lambda err, msg, obj=record: on_delivery(err, msg, obj))
104+
# Serve on_delivery callbacks from previous asynchronous produce()
105+
producer.poll(0)
106+
except KeyboardInterrupt:
107+
break
108+
except ValueError:
109+
print("Invalid input, discarding record...")
110+
continue
111+
112+
print("\nFlushing records...")
113+
producer.flush()
114+
115+
116+
def consume(topic, conf):
117+
"""
118+
Consume User records
119+
"""
120+
from confluent_kafka.avro import AvroConsumer
121+
from confluent_kafka.avro.serializer import SerializerError
122+
123+
print("Consuming user records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"]))
124+
125+
c = AvroConsumer(conf, reader_value_schema=record_schema)
126+
c.subscribe([topic])
127+
128+
while True:
129+
try:
130+
msg = c.poll(1)
131+
132+
# There were no messages on the queue, continue polling
133+
if msg is None:
134+
continue
135+
136+
if msg.error():
137+
print("Consumer error: {}".format(msg.error()))
138+
continue
139+
140+
record = User(msg.value())
141+
print("name: {}\n\tfavorite_number: {}\n\tfavorite_color: {}\n".format(
142+
record.name, record.favorite_number, record.favorite_color))
143+
except SerializerError as e:
144+
# Report malformed record, discard results, continue polling
145+
print("Message deserialization failed {}".format(e))
146+
continue
147+
except KeyboardInterrupt:
148+
break
149+
150+
print("Shutting down consumer..")
151+
c.close()
152+
153+
154+
def main(args):
155+
# handle common configs
156+
conf = {'bootstrap.servers': args.bootstrap_servers,
157+
'schema.registry.url': args.schema_registry}
158+
159+
if args.userinfo:
160+
conf['schema.registry.basic.auth.credentials.source'] = 'USER_INFO'
161+
conf['schema.registry.basic.auth.user.info'] = args.userinfo
162+
163+
if args.mode == "produce":
164+
produce(args.topic, conf)
165+
else:
166+
# Fallback to earliest to ensure all messages are consumed
167+
conf['group.id'] = args.group
168+
conf['auto.offset.reset'] = "earliest"
169+
consume(args.topic, conf)
170+
171+
172+
if __name__ == '__main__':
173+
# To use the provided cluster execute <source root>/tests/docker/bin/cluster_up.sh.
174+
# Defaults assume the use of the provided test cluster.
175+
parser = argparse.ArgumentParser(description="Example client for handling Avro data")
176+
parser.add_argument('-b', dest="bootstrap_servers",
177+
default="localhost:29092", help="Bootstrap broker(s) (host[:port])")
178+
parser.add_argument('-s', dest="schema_registry",
179+
default="http://localhost:8083", help="Schema Registry (http(s)://host[:port]")
180+
parser.add_argument('-t', dest="topic", default="example_avro",
181+
help="Topic name")
182+
parser.add_argument('-u', dest="userinfo", default="ckp_tester:test_secret",
183+
help="Userinfo (username:password); requires Schema Registry with HTTP basic auth enabled")
184+
parser.add_argument('mode', choices=['produce', 'consume'],
185+
help="Execution mode (produce | consume)")
186+
parser.add_argument('-g', dest="group", default="example_avro",
187+
help="Consumer group; required if running 'consumer' mode")
188+
189+
main(parser.parse_args())

0 commit comments

Comments
 (0)