Skip to content

V1.0 deprecations #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,8 @@ while True:
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print("Consumer error: {}".format(msg.error()))
continue

print('Received message: {}'.format(msg.value().decode('utf-8')))

Expand Down Expand Up @@ -172,11 +169,8 @@ while True:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print("AvroConsumer error: {}".format(msg.error()))
continue

print(msg.value())

Expand Down
5 changes: 1 addition & 4 deletions confluent_kafka/kafkatest/verifiable_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ def set_config(conf, args):
continue

if n.startswith('topicconf_'):
# Apply topicconf_ properties on default.topic.config
if 'default.topic.config' not in conf:
conf['default.topic.config'] = dict()
conf['default.topic.config'][n[10:]] = v
conf[n[10:]] = v
continue

if not n.startswith('conf_'):
Expand Down
6 changes: 1 addition & 5 deletions confluent_kafka/kafkatest/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,7 @@ def do_commit(self, immediate=False, asynchronous=None):
def msg_consume(self, msg):
""" Handle consumed message (or error event) """
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# ignore EOF
pass
else:
self.err('Consume failed: %s' % msg.error(), term=False)
self.err('Consume failed: %s' % msg.error(), term=False)
return

if False:
Expand Down
20 changes: 10 additions & 10 deletions examples/confluent_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

import uuid

from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka import Producer, Consumer


p = Producer({
Expand All @@ -65,9 +65,9 @@
def acked(err, msg):
"""Delivery report callback called (from flush()) on successful or failed delivery of the message."""
if err is not None:
print("failed to deliver message: {0}".format(err.str()))
print("failed to deliver message: {}".format(err.str()))
else:
print("produced to: {0} [{1}] @ {2}".format(msg.topic(), msg.partition(), msg.offset()))
print("produced to: {} [{}] @ {}".format(msg.topic(), msg.partition(), msg.offset()))


p.produce('python-test-topic', value='python test value', callback=acked)
Expand Down Expand Up @@ -98,14 +98,14 @@ def acked(err, msg):
if msg is None:
# No message available within timeout.
# Initial message consumption may take up to `session.timeout.ms` for
# the group to rebalance and start consuming
# the group to rebalance and start consuming.
continue
elif not msg.error():
print('consumed: {0}'.format(msg.value()))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print('end of partition: {0} [{1}] @ {2}'.format(msg.topic(), msg.partition(), msg.offset()))
else:
print('error: {0}'.format(msg.error().str()))
if msg.error():
# Errors are typically temporary, print error and continue.
print("Consumer error: {}".format(msg.error()))
continue

print('consumed: {}'.format(msg.value()))

except KeyboardInterrupt:
pass
Expand Down
11 changes: 2 additions & 9 deletions examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#
# Example high-level Kafka 0.9 balanced Consumer
#
from confluent_kafka import Consumer, KafkaException, KafkaError
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
Expand Down Expand Up @@ -95,14 +95,7 @@ def print_assignment(consumer, partitions):
if msg is None:
continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
else:
# Error
raise KafkaException(msg.error())
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
Expand Down
4 changes: 2 additions & 2 deletions tests/avro/user_v2.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"aliases": ["UserKey"],
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["null", "int"], "default": null},
{"name": "favorite_color", "type": ["null", "string"], "default": null}
{"name": "favorite_number", "type": ["null", "int"], "default": 42},
{"name": "favorite_color", "type": ["null", "string"], "default": "purple"}
]
}
149 changes: 52 additions & 97 deletions tests/integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import json
import gc
import struct
from copy import copy

try:
# Memory tracker
Expand Down Expand Up @@ -368,7 +367,8 @@ def verify_consumer():
'enable.auto.commit': False,
'on_commit': print_commit_result,
'error_cb': error_cb,
'auto.offset.reset': 'earliest'}
'auto.offset.reset': 'earliest',
'enable.partition.eof': True}

# Create consumer
c = confluent_kafka.Consumer(conf)
Expand Down Expand Up @@ -535,11 +535,7 @@ def my_on_revoke(consumer, partitions):
(msgcnt, max_msgcnt))

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
# Reached EOF for a partition, ignore.
continue
else:
raise confluent_kafka.KafkaException(msg.error())
raise confluent_kafka.KafkaException(msg.error())

bytecnt += len(msg)
msgcnt += 1
Expand Down Expand Up @@ -714,11 +710,7 @@ def my_on_revoke(consumer, partitions):

for msg in msglist:
if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
# Reached EOF for a partition, ignore.
continue
else:
raise confluent_kafka.KafkaException(msg.error())
raise confluent_kafka.KafkaException(msg.error())

bytecnt += len(msg)
msgcnt += 1
Expand Down Expand Up @@ -1008,11 +1000,7 @@ def stats_cb(stats_json_str):
(msgcnt, max_msgcnt))

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
# Reached EOF for a partition, ignore.
continue
else:
raise confluent_kafka.KafkaException(msg.error())
raise confluent_kafka.KafkaException(msg.error())

bytecnt += len(msg)
msgcnt += 1
Expand Down Expand Up @@ -1173,100 +1161,67 @@ def verify_config(expconfig, configs):


def verify_avro_explicit_read_schema():
""" verify that reading Avro with explicit reader schema works"""
from confluent_kafka import avro
avsc_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'avro')

# Producer config
conf = {'bootstrap.servers': bootstrap_servers,
'error_cb': error_cb}
""" verify that reading Avro with explicit reader schema works"""
base_conf = {'bootstrap.servers': bootstrap_servers,
'error_cb': error_cb,
'schema.registry.url': schema_registry_url}

# Create producer
if schema_registry_url:
conf['schema.registry.url'] = schema_registry_url
p = avro.AvroProducer(conf)

key_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc"))
schema1 = avro.load(os.path.join(avsc_dir, "user_v1.avsc"))
schema2 = avro.load(os.path.join(avsc_dir, "user_v2.avsc"))
float_value = 32.
val = {
"name": "abc",
"favorite_number": 42,
"favorite_colo": "orange"
consumer_conf = dict(base_conf, **{
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'on_commit': print_commit_result,
'auto.offset.reset': 'earliest',
'schema.registry.url': schema_registry_url})

avsc_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'avro')
writer_schema = avro.load(os.path.join(avsc_dir, "user_v1.avsc"))
reader_schema = avro.load(os.path.join(avsc_dir, "user_v2.avsc"))

user_value1 = {
"name": " Rogers Nelson"
}
val1 = {
"name": "abc"

user_value2 = {
"name": "Kenny Loggins"
}

combinations = [
dict(value=val, value_schema=schema2, key=float_value, key_schema=key_schema,
reader_value_schema=schema1, reader_key_schema=key_schema),
dict(value=val1, value_schema=schema1, key=float_value, key_schema=key_schema,
reader_value_schema=schema2, reader_key_schema=key_schema),
dict(key=user_value1, key_schema=writer_schema, value=user_value2, value_schema=writer_schema),
dict(key=user_value2, key_schema=writer_schema, value=user_value1, value_schema=writer_schema)
]
avro_topic = topic + str(uuid.uuid4())

# Consumer config
cons_conf = {'bootstrap.servers': bootstrap_servers,
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'on_commit': print_commit_result,
'error_cb': error_cb,
'auto.offset.reset': 'earliest'}

p = avro.AvroProducer(base_conf)
for i, combo in enumerate(combinations):
reader_key_schema = combo.pop("reader_key_schema")
reader_value_schema = combo.pop("reader_value_schema")
combo['topic'] = str(uuid.uuid4())
p.produce(**combo)
p.poll(0)
p.flush()

# Create consumer
conf = copy(cons_conf)
if schema_registry_url:
conf['schema.registry.url'] = schema_registry_url
c = avro.AvroConsumer(
conf,
reader_key_schema=reader_key_schema,
reader_value_schema=reader_value_schema)
else:
raise ValueError("Property schema.registry.url must be set to run this test")
p.produce(topic=avro_topic, **combo)
p.flush()

c.subscribe([combo['topic']])
c = avro.AvroConsumer(consumer_conf, reader_key_schema=reader_schema, reader_value_schema=reader_schema)
c.subscribe([avro_topic])

while True:
msg = c.poll(0)
if msg is None:
continue

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
break
else:
continue
msgcount = 0
while msgcount < len(combinations):
msg = c.poll(1)

tstype, timestamp = msg.timestamp()
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' %
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value(), tstype, timestamp))
if msg is None:
continue
if msg.error():
print("Consumer error {}".format(msg.error()))
continue

# omit empty Avro fields from payload for comparison
record_key = msg.key()
record_value = msg.value()
if isinstance(msg.key(), dict):
record_key = {k: v for k, v in msg.key().items() if v is not None}

if isinstance(msg.value(), dict):
record_value = {k: v for k, v in msg.value().items() if v is not None}

assert combo.get('key') == record_key
assert combo.get('value')['name'] == record_value['name']
c.commit(msg, asynchronous=False)
# Close consumer
c.close()
pass
msgcount += 1
# Avro schema projection should return the two fields not present in the writer schema
try:
assert(msg.key().get('favorite_number') == 42)
assert(msg.key().get('favorite_color') == "purple")
assert(msg.value().get('favorite_number') == 42)
assert(msg.value().get('favorite_color') == "purple")
print("success: schema projection worked for explicit reader schema")
except KeyError:
raise confluent_kafka.avro.SerializerError("Schema projection failed when setting reader schema.")


default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin']
Expand Down
2 changes: 2 additions & 0 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def test_unordered_dict(init_func):


def test_topic_config_update():
# *NOTE* default.topic.config has been deprecated.
# This example remains to ensure backward-compatibility until its removal.
confs = [{"message.timeout.ms": 600000, "default.topic.config": {"message.timeout.ms": 1000}},
{"message.timeout.ms": 1000},
{"default.topic.config": {"message.timeout.ms": 1000}}]
Expand Down