Skip to content

Feature/explicit read #469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
20e7690
adding support for explicit read schema
Jun 22, 2018
166fa4f
changes in travis config
Jun 26, 2018
dc31b40
reformat code for flake8 acceptance
Jun 26, 2018
e2bc9a6
started integration tests with landoop docker
Jun 29, 2018
23f611c
edit docker run command
Jun 29, 2018
405676e
edit travis.yml for execution order
Jun 29, 2018
83be110
add service docker for all travis env's
Jun 29, 2018
7666925
remove osx build
Jun 29, 2018
71067de
work on integration test for explicit read
Jul 6, 2018
6a6b71d
work on integration test for explicit read
Jul 6, 2018
44c0019
work on integration test for explicit read
Jul 6, 2018
bb5655f
work on integration test for explicit read
Jul 6, 2018
d667ee2
work on integration test for explicit read
Jul 13, 2018
67edebe
work on integration test for explicit read
Jul 13, 2018
6b44d7a
work on integration test for explicit read
Jul 13, 2018
f0a8f51
work on integration test for explicit read
Jul 13, 2018
388e61e
work on integration test for explicit read
Jul 13, 2018
39a3fba
Merge branch 'master' into feature/explicit-read
martintaraz Jul 20, 2018
f78481b
adding support for explicit read schema
Jun 22, 2018
eb1fe22
changes in travis config
Jun 26, 2018
e7bc7e5
reformat code for flake8 acceptance
Jun 26, 2018
c76a773
work on integration test for explicit read
Jul 6, 2018
e5011c8
work on integration test for explicit read
Jul 6, 2018
7eb34f4
work on integration test for explicit read
Jul 6, 2018
79f49b7
work on integration test for explicit read
Jul 13, 2018
5aaadde
work on integration test for explicit read
Jul 13, 2018
1755d12
Merge branch 'master' of https://github.com/confluentinc/confluent-ka…
flxfbrx Oct 12, 2018
409fd34
Rebase travis.yml
fkaufer Oct 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ matrix:
install:
- pip install -U pip && pip install virtualenv
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then rvm get stable; fi
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then rvm get stable; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel; fi
Expand Down
10 changes: 7 additions & 3 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ class AvroConsumer(Consumer):
Constructor takes below parameters

:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al).
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
:param optional a read schema for the messages
"""
def __init__(self, config, schema_registry=None):
def __init__(self, config, schema_registry=None, read_schema=None):
<<<<<<< HEAD
=======

>>>>>>> 39a3fba55f7227001300745352f81e9f0dc1a93c
schema_registry_url = config.pop("schema.registry.url", None)
schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None)
schema_registry_certificate_location = config.pop("schema.registry.ssl.certificate.location", None)
Expand All @@ -119,7 +123,7 @@ def __init__(self, config, schema_registry=None):
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroConsumer, self).__init__(config)
self._serializer = MessageSerializer(schema_registry)
self._serializer = MessageSerializer(schema_registry, read_schema)

def poll(self, timeout=None):
"""
Expand Down
9 changes: 6 additions & 3 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ class MessageSerializer(object):
All decode_* methods expect a buffer received from kafka.
"""

def __init__(self, registry_client):
def __init__(self, registry_client, read_schema=None):
self.registry_client = registry_client
self.id_to_decoder_func = {}
self.id_to_writers = {}
self.read_schema = read_schema

'''

Expand Down Expand Up @@ -169,6 +170,7 @@ def _get_decoder_func(self, schema_id, payload):
# try to use fast avro
try:
schema_dict = schema.to_json()
reader_schema_dict = schema.to_json()
schemaless_reader(payload, schema_dict)

# If we reach this point, this means we have fastavro and it can
Expand All @@ -177,7 +179,8 @@ def _get_decoder_func(self, schema_id, payload):
# normal path.
payload.seek(curr_pos)

self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict)
self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict,
reader_schema=reader_schema_dict)
return self.id_to_decoder_func[schema_id]
except Exception:
# Fast avro failed, fall thru to standard avro below.
Expand All @@ -186,7 +189,7 @@ def _get_decoder_func(self, schema_id, payload):
# here means we should just delegate to slow avro
# rewind
payload.seek(curr_pos)
avro_reader = avro.io.DatumReader(schema)
avro_reader = avro.io.DatumReader(writers_schema=schema, readers_schema=self.read_schema)

def decoder(p):
bin_decoder = avro.io.BinaryDecoder(p)
Expand Down
104 changes: 102 additions & 2 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,9 +1202,105 @@ def verify_config(expconfig, configs):
print("Topic {} marked for deletion".format(our_topic))


# Exclude throttle since from default list
default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin']
def verify_explicit_read():
""" verify that the explicit reading schema works"""
from confluent_kafka import avro
avsc_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'tests', 'avro')

# Producer config
conf = {'bootstrap.servers': bootstrap_servers,
'error_cb': error_cb,
'api.version.request': api_version_request,
'default.topic.config': {'produce.offset.report': True}}

# Create producer
if schema_registry_url:
conf['schema.registry.url'] = schema_registry_url
p = avro.AvroProducer(conf)
else:
p = avro.AvroProducer(conf, schema_registry=InMemorySchemaRegistry())

key_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc"))
schema1 = avro.load(os.path.join(avsc_dir, "read_test_schema.avsc"))
schema2 = avro.load(os.path.join(avsc_dir, "incremented_read_test_schema.avsc"))
float_value = 32.
val = {
"name": "abc",
"favorite_number": 42,
"favorite_colo": "orange"
}
val1 = {
"name": "abc"
}

combinations = [
dict(value=val, value_schema=schema2, key=float_value, key_schema=key_schema,
read_schema=schema1),
dict(value=val1, value_schema=schema1, key=float_value, key_schema=key_schema,
read_schema=schema2),
]

# Consumer config
cons_conf = {'bootstrap.servers': bootstrap_servers,
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'api.version.request': api_version_request,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}

combo['topic'] = str(uuid.uuid4())
p.produce(**combo)
p.poll(0)
p.flush()

# Create consumer
conf = copy(cons_conf)
if schema_registry_url:
conf['schema.registry.url'] = schema_registry_url
c = avro.AvroConsumer(conf, read_schema=read_schema)
else:
c = avro.AvroConsumer(conf, schema_registry=InMemorySchemaRegistry(), read_schema=read_schema)

c.subscribe([combo['topic']])

while True:
msg = c.poll(0)
if msg is None:
continue

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
break
else:
continue

tstype, timestamp = msg.timestamp()
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' %
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value(), tstype, timestamp))

# omit empty Avro fields from payload for comparison
record_key = msg.key()
record_value = msg.value()
if isinstance(msg.key(), dict):
record_key = {k: v for k, v in msg.key().items() if v is not None}

if isinstance(msg.value(), dict):
record_value = {k: v for k, v in msg.value().items() if v is not None}

assert combo.get('key') == record_key
c.commit(msg, asynchronous=False)
# Close consumer
c.close()
pass

default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin', 'explicit-read']
all_modes = default_modes + ['throttle', 'avro-https', 'none']

"""All test modes"""


Expand Down Expand Up @@ -1323,6 +1419,10 @@ def resolve_envs(_conf):
print('=' * 30, 'Verifying Admin API', '=' * 30)
verify_admin()

if 'explicit-read' in modes:
print('=' * 30, 'Verifying Explicit Reading Schema', '=' * 30)
verify_explicit_read()

print('=' * 30, 'Done', '=' * 30)

if with_pympler:
Expand Down
9 changes: 9 additions & 0 deletions tests/avro/incremented_read_test_schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["null", "int"], "default": null},
{"name": "favorite_color", "type": ["null", "string"], "default": null}
]
}
7 changes: 7 additions & 0 deletions tests/avro/read_test_schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "record",
"name": "UserKey",
"fields": [
{"name": "name", "type": "string"}
]
}