Skip to content

Feature/explicit read #427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,17 @@ matrix:
language: python
dist: trusty
python: "2.7"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.11.5
# Source package verification with Python 3.6 and librdkafka v0.11.5
services: docker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changes to .travis.yml should be removed from this commit

env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=master
# Source package verification with Python 3.6 and librdkafka master

- os: linux
language: python
dist: trusty
python: "3.6"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.11.5
# Source package verification with Python 2.7 and librdkafka v0.11.5
- os: osx
python: "2.7"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.11.5
before_install:
- brew update && brew upgrade pyenv
- pyenv install -f 2.7.15
- virtualenv -p ~/.pyenv/versions/2.7.15/bin/python ./env
- source env/bin/activate
# Source package verification with Python 3.6 and librdkafka v0.11.5
- os: osx
python: "3.6"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.11.5
before_install:
- brew update && brew upgrade pyenv
- pyenv install -f 3.6.5
- virtualenv -p ~/.pyenv/versions/3.6.5/bin/python ./env
- source env/bin/activate
# cibuildwheel for osx
- os: osx
env: CIBW_BEFORE_BUILD="tools/bootstrap-librdkafka.sh --require-ssl v0.11.5 tmp" CFLAGS="-Itmp/include" LDFLAGS="-Ltmp/lib"
services: docker
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=master

# cibuildwheel for manylinux
- os: linux
dist: trusty
Expand All @@ -42,6 +25,10 @@ matrix:
python: "2.7"
services: docker

before_install:
- docker pull landoop/fast-data-dev
- docker run -d -p 2181:2181 -p 3030:3030 -p 8081:8081 -p 8082:8082 -p 8083:8083 -p 9092:9092 -e ADV_HOST=127.0.0.1 landoop/fast-data-dev

install:
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
- pip install -U pip
Expand All @@ -54,10 +41,10 @@ script:
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro] ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then flake8 ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then py.test -v --timeout 20 --ignore=tmp-build --import-mode append ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then python examples/integration_test.py localhost:9092 confluent-kafka-testing http://localhost:8081; fi
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then cibuildwheel --output-dir wheelhouse ; ls -la wheelhouse/ ; fi
- if [[ -n $TRAVIS_TAG && $TRAVIS_OS_NAME == linux && -n $CIBW_BEFORE_BUILD ]]; then tools/test-manylinux.sh ; fi


deploy:
provider: s3
access_key_id:
Expand All @@ -71,6 +58,6 @@ deploy:
acl: public_read
skip_cleanup: true
on:
repo: confluentinc/confluent-kafka-python
repo: martintaraz/confluent-kafka-python
tags: true
condition: "-n $CIBW_BEFORE_BUILD"
7 changes: 4 additions & 3 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ class AvroConsumer(Consumer):
Constructor takes below parameters

:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al).
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
:param optional a read schema for the messages
"""
def __init__(self, config, schema_registry=None):
def __init__(self, config, schema_registry=None, read_schema=None):

schema_registry_url = config.pop("schema.registry.url", None)
schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None)
Expand All @@ -119,7 +120,7 @@ def __init__(self, config, schema_registry=None):
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroConsumer, self).__init__(config)
self._serializer = MessageSerializer(schema_registry)
self._serializer = MessageSerializer(schema_registry, read_schema)

def poll(self, timeout=None):
"""
Expand Down
9 changes: 6 additions & 3 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ class MessageSerializer(object):
All decode_* methods expect a buffer received from kafka.
"""

def __init__(self, registry_client):
def __init__(self, registry_client, read_schema=None):
self.registry_client = registry_client
self.id_to_decoder_func = {}
self.id_to_writers = {}
self.read_schema = read_schema

'''

Expand Down Expand Up @@ -169,6 +170,7 @@ def _get_decoder_func(self, schema_id, payload):
# try to use fast avro
try:
schema_dict = schema.to_json()
reader_schema_dict = schema.to_json()
schemaless_reader(payload, schema_dict)

# If we reach this point, this means we have fastavro and it can
Expand All @@ -177,7 +179,8 @@ def _get_decoder_func(self, schema_id, payload):
# normal path.
payload.seek(curr_pos)

self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict)
self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict,
reader_schema=reader_schema_dict)
return self.id_to_decoder_func[schema_id]
except Exception:
# Fast avro failed, fall thru to standard avro below.
Expand All @@ -186,7 +189,7 @@ def _get_decoder_func(self, schema_id, payload):
# here means we should just delegate to slow avro
# rewind
payload.seek(curr_pos)
avro_reader = avro.io.DatumReader(schema)
avro_reader = avro.io.DatumReader(writers_schema=schema, readers_schema=self.read_schema)

def decoder(p):
bin_decoder = avro.io.BinaryDecoder(p)
Expand Down
145 changes: 125 additions & 20 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
""" Test script for confluent_kafka module """

import confluent_kafka
from confluent_kafka import admin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove any integration test changes not related to the pull request itself. This helps to keep the commit history clean. If you wish to submit a PR which cleans up formatting, white space and imports I'd be happy to review and merge however :)

import os
import time
import uuid
Expand All @@ -34,13 +33,14 @@
try:
# Memory tracker
from pympler import tracker

with_pympler = True
except Exception as e:
with_pympler = False


try:
from progress.bar import Bar

with_progress = True
except ImportError as e:
with_progress = False
Expand Down Expand Up @@ -106,7 +106,6 @@ def throttle_cb(throttle_event):


class InMemorySchemaRegistry(object):

schemas = {}
next_idx = 0

Expand Down Expand Up @@ -522,9 +521,9 @@ def verify_producer_performance(with_dr_cb=True):
bar.finish()

print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
(msgs_produced, bytecnt / (1024 * 1024), t_produce_spent,
msgs_produced / t_produce_spent,
(bytecnt/t_produce_spent) / (1024*1024)))
(bytecnt / t_produce_spent) / (1024 * 1024)))
print('# %d temporary produce() failures due to backpressure (local queue full)' % msgs_backpressure)

print('waiting for %d/%d deliveries' % (len(p), msgs_produced))
Expand All @@ -533,9 +532,9 @@ def verify_producer_performance(with_dr_cb=True):
t_delivery_spent = time.time() - t_produce_start

print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
(msgs_produced, bytecnt / (1024 * 1024), t_produce_spent,
msgs_produced / t_produce_spent,
(bytecnt/t_produce_spent) / (1024*1024)))
(bytecnt / t_produce_spent) / (1024 * 1024)))

# Fake numbers if not using a dr_cb
if not with_dr_cb:
Expand All @@ -544,9 +543,9 @@ def verify_producer_performance(with_dr_cb=True):
dr.bytes_delivered = bytecnt

print('# delivering %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
(dr.msgs_delivered, dr.bytes_delivered / (1024*1024), t_delivery_spent,
(dr.msgs_delivered, dr.bytes_delivered / (1024 * 1024), t_delivery_spent,
dr.msgs_delivered / t_delivery_spent,
(dr.bytes_delivered/t_delivery_spent) / (1024*1024)))
(dr.bytes_delivered / t_delivery_spent) / (1024 * 1024)))
print('# post-produce delivery wait took %.3fs' %
(t_delivery_spent - t_produce_spent))

Expand Down Expand Up @@ -671,7 +670,7 @@ def print_wmark(consumer, parts):
elif (msg.offset() % 4) == 0:
offsets = c.commit(msg, asynchronous=False)
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
assert offsets[0].offset == msg.offset()+1, \
assert offsets[0].offset == msg.offset() + 1, \
'expected offset %d to be committed, not %s' % \
(msg.offset(), offsets)
print('Sync committed offset: %s' % offsets)
Expand Down Expand Up @@ -789,8 +788,8 @@ def my_on_revoke(consumer, partitions):
if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024 * 1024)))

print('closing consumer')
c.close()
Expand Down Expand Up @@ -844,7 +843,7 @@ def verify_batch_consumer():
elif (msg.offset() % 4) == 0:
offsets = c.commit(msg, asynchronous=False)
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
assert offsets[0].offset == msg.offset()+1, \
assert offsets[0].offset == msg.offset() + 1, \
'expected offset %d to be committed, not %s' % \
(msg.offset(), offsets)
print('Sync committed offset: %s' % offsets)
Expand Down Expand Up @@ -945,8 +944,8 @@ def my_on_revoke(consumer, partitions):
if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024 * 1024)))

print('closing consumer')
c.close()
Expand Down Expand Up @@ -1075,8 +1074,8 @@ def stats_cb(stats_json_str):
if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))
(msgcnt, bytecnt / (1024 * 1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024 * 1024)))

print('closing consumer')
c.close()
Expand Down Expand Up @@ -1121,8 +1120,9 @@ def verify_topic_metadata(client, exp_topics):


def verify_admin():
""" Verify Admin API """

""" Verify Admin API """
from confluent_kafka import admin
a = admin.AdminClient({'bootstrap.servers': bootstrap_servers})
our_topic = topic + '_admin_' + str(uuid.uuid4())
num_partitions = 2
Expand Down Expand Up @@ -1216,9 +1216,110 @@ def verify_config(expconfig, configs):
print("Topic {} marked for deletion".format(our_topic))


def verify_explicit_read():
""" verify that the explicit reading schema works"""
from confluent_kafka import avro
avsc_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'tests', 'avro')

# Producer config
conf = {'bootstrap.servers': bootstrap_servers,
'error_cb': error_cb,
'api.version.request': api_version_request,
'default.topic.config': {'produce.offset.report': True}}

# Create producer
if schema_registry_url:
conf['schema.registry.url'] = schema_registry_url
p = avro.AvroProducer(conf)
else:
p = avro.AvroProducer(conf, schema_registry=InMemorySchemaRegistry())

key_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc"))
schema1 = avro.load(os.path.join(avsc_dir, "read_test_schema.avsc"))
schema2 = avro.load(os.path.join(avsc_dir, "incremented_read_test_schema.avsc"))
float_value = 32.
val = {
"name": "abc",
"favorite_number": 42,
"favorite_colo": "orange"
}
val1 = {
"name": "abc"
}

combinations = [
dict(value=val, value_schema=schema2, key=float_value, key_schema=key_schema,
read_schema=schema1),
dict(value=val1, value_schema=schema1, key=float_value, key_schema=key_schema,
read_schema=schema2),
]

# Consumer config
cons_conf = {'bootstrap.servers': bootstrap_servers,
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'api.version.request': api_version_request,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}

for (i, combo) in enumerate(combinations):
read_schema = combo.pop("read_schema")
combo['topic'] = str(uuid.uuid4())
p.produce(**combo)
p.poll(0)
p.flush()

# Create consumer
conf = copy(cons_conf)
if schema_registry_url:
conf['schema.registry.url'] = schema_registry_url
c = avro.AvroConsumer(conf, read_schema=read_schema)
else:
c = avro.AvroConsumer(conf, schema_registry=InMemorySchemaRegistry(), read_schema=read_schema)
c.subscribe([combo['topic']])

while True:
msg = c.poll(0)
if msg is None:
continue

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
break
else:
continue

tstype, timestamp = msg.timestamp()
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' %
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value(), tstype, timestamp))

# omit empty Avro fields from payload for comparison
record_key = msg.key()
record_value = msg.value()
if isinstance(msg.key(), dict):
record_key = {k: v for k, v in msg.key().items() if v is not None}

if isinstance(msg.value(), dict):
record_value = {k: v for k, v in msg.value().items() if v is not None}

assert combo.get('key') == record_key
assert combo.get('value')['name'] == record_value['name']
c.commit(msg, asynchronous=False)
# Close consumer
c.close()
pass


# Exclude throttle since from default list
default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin']
all_modes = default_modes + ['throttle', 'avro-https', 'none']
# default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin', 'explicit-read']
default_modes = ['consumer', 'producer', 'performance', 'explicit-read']
all_modes = default_modes + ['throttle', 'none']

"""All test modes"""


Expand Down Expand Up @@ -1337,6 +1438,10 @@ def resolve_envs(_conf):
print('=' * 30, 'Verifying Admin API', '=' * 30)
verify_admin()

if 'explicit-read' in modes:
print('=' * 30, 'Verifying Explicit Reading Schema', '=' * 30)
verify_explicit_read()

print('=' * 30, 'Done', '=' * 30)

if with_pympler:
Expand Down
9 changes: 9 additions & 0 deletions tests/avro/incremented_read_test_schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["null", "int"], "default": null},
{"name": "favorite_color", "type": ["null", "string"], "default": null}
]
}
Loading