Skip to content

Commit 2ebf874

Browse files
venthuredenhill
authored andcommitted
Added logging parameter for Avro(Consumer|Producer).
The patch introduces **kwargs and just passes them to the underlying super calls. Tests are included. Closes: #698
1 parent 67d4fb4 commit 2ebf874

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

src/confluent_kafka/avro/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class AvroProducer(Producer):
4444
"""
4545

4646
def __init__(self, config, default_key_schema=None,
47-
default_value_schema=None, schema_registry=None):
47+
default_value_schema=None, schema_registry=None, **kwargs):
4848

4949
sr_conf = {key.replace("schema.registry.", ""): value
5050
for key, value in config.items() if key.startswith("schema.registry")}
@@ -64,7 +64,7 @@ def __init__(self, config, default_key_schema=None,
6464
elif sr_conf.get("url", None) is not None:
6565
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
6666

67-
super(AvroProducer, self).__init__(ap_conf)
67+
super(AvroProducer, self).__init__(ap_conf, **kwargs)
6868
self._serializer = MessageSerializer(schema_registry)
6969
self._key_schema = default_key_schema
7070
self._value_schema = default_value_schema
@@ -123,7 +123,7 @@ class AvroConsumer(Consumer):
123123
:raises ValueError: For invalid configurations
124124
"""
125125

126-
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):
126+
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None, **kwargs):
127127

128128
sr_conf = {key.replace("schema.registry.", ""): value
129129
for key, value in config.items() if key.startswith("schema.registry")}
@@ -142,7 +142,7 @@ def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_
142142
elif sr_conf.get("url", None) is not None:
143143
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
144144

145-
super(AvroConsumer, self).__init__(ap_conf)
145+
super(AvroConsumer, self).__init__(ap_conf, **kwargs)
146146
self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema)
147147

148148
def poll(self, timeout=None):

tests/test_log.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python
22

33
import confluent_kafka
4+
import confluent_kafka.avro
45
import logging
56

67

@@ -34,6 +35,26 @@ def test_logging_consumer():
3435
kc.close()
3536

3637

38+
def test_logging_avro_consumer():
39+
""" Tests that logging works """
40+
41+
logger = logging.getLogger('avroconsumer')
42+
logger.setLevel(logging.DEBUG)
43+
f = CountingFilter('avroconsumer')
44+
logger.addFilter(f)
45+
46+
kc = confluent_kafka.avro.AvroConsumer({'schema.registry.url': 'http://example.com',
47+
'group.id': 'test',
48+
'debug': 'all'},
49+
logger=logger)
50+
while f.cnt == 0:
51+
kc.poll(timeout=0.5)
52+
53+
print('%s: %d log messages seen' % (f.name, f.cnt))
54+
55+
kc.close()
56+
57+
3758
def test_logging_producer():
3859
""" Tests that logging works """
3960

@@ -50,6 +71,24 @@ def test_logging_producer():
5071
print('%s: %d log messages seen' % (f.name, f.cnt))
5172

5273

74+
def test_logging_avro_producer():
75+
""" Tests that logging works """
76+
77+
logger = logging.getLogger('avroproducer')
78+
logger.setLevel(logging.DEBUG)
79+
f = CountingFilter('avroproducer')
80+
logger.addFilter(f)
81+
82+
p = confluent_kafka.avro.AvroProducer({'schema.registry.url': 'http://example.com',
83+
'debug': 'all'},
84+
logger=logger)
85+
86+
while f.cnt == 0:
87+
p.poll(timeout=0.5)
88+
89+
print('%s: %d log messages seen' % (f.name, f.cnt))
90+
91+
5392
def test_logging_constructor():
5493
""" Verify different forms of constructors """
5594

0 commit comments

Comments
 (0)