Skip to content

Commit 34baea6

Browse files
author
Ryan P
authored
deprecate default.topic.configuration (confluentinc#446)
1 parent c71e075 commit 34baea6

File tree

10 files changed

+81
-140
lines changed

10 files changed

+81
-140
lines changed

.travis.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ matrix:
1717
python: "2.7"
1818
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.11.5
1919
before_install:
20+
- pip install -U pip && pip install virtualenv
2021
- brew update && brew upgrade pyenv
2122
- pyenv install -f 2.7.15
2223
- pip install virtualenv
@@ -27,6 +28,7 @@ matrix:
2728
python: "3.6"
2829
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.11.5
2930
before_install:
31+
- pip install -U pip && pip install virtualenv
3032
- brew update && brew upgrade pyenv
3133
- pyenv install -f 3.6.5
3234
- pip install virtualenv
@@ -48,8 +50,8 @@ matrix:
4850
services: docker
4951

5052
install:
53+
- pip install -U pip && pip install virtualenv
5154
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
52-
- pip install -U pip
5355
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 ; fi
5456
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
5557
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel; fi

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,7 @@ from confluent_kafka import Consumer, KafkaError
7474
c = Consumer({
7575
'bootstrap.servers': 'mybroker',
7676
'group.id': 'mygroup',
77-
'default.topic.config': {
78-
'auto.offset.reset': 'smallest'
79-
}
77+
'auto.offset.reset': 'earliest'
8078
})
8179

8280
c.subscribe(['mytopic'])

confluent_kafka/src/confluent_kafka.c

Lines changed: 21 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,79 +1369,12 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) {
13691369
return 0;
13701370
}
13711371

1372-
1373-
1374-
/**
1375-
* Populate topic conf from provided dict.
1376-
*
1377-
* Will raise an exception on error and return -1, or returns 0 on success.
1378-
*/
1379-
static int populate_topic_conf (rd_kafka_topic_conf_t *tconf, const char *what,
1380-
PyObject *dict) {
1381-
Py_ssize_t pos = 0;
1382-
PyObject *ko, *vo;
1383-
1384-
if (!PyDict_Check(dict)) {
1385-
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1386-
"%s: requires a dict", what);
1387-
return -1;
1388-
}
1389-
1390-
while (PyDict_Next(dict, &pos, &ko, &vo)) {
1391-
PyObject *ks, *ks8;
1392-
PyObject *vs, *vs8;
1393-
const char *k;
1394-
const char *v;
1395-
char errstr[256];
1396-
1397-
if (!(ks = cfl_PyObject_Unistr(ko))) {
1398-
PyErr_SetString(PyExc_TypeError,
1399-
"expected configuration property "
1400-
"value as type unicode string");
1401-
return -1;
1402-
}
1403-
1404-
if (!(vs = cfl_PyObject_Unistr(vo))) {
1405-
PyErr_SetString(PyExc_TypeError,
1406-
"expected configuration property "
1407-
"value as type unicode string");
1408-
Py_DECREF(ks);
1409-
return -1;
1410-
}
1411-
1412-
k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1413-
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
1414-
1415-
if (rd_kafka_topic_conf_set(tconf, k, v,
1416-
errstr, sizeof(errstr)) !=
1417-
RD_KAFKA_CONF_OK) {
1418-
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1419-
"%s: %s", what, errstr);
1420-
Py_XDECREF(ks8);
1421-
Py_XDECREF(vs8);
1422-
Py_DECREF(ks);
1423-
Py_DECREF(vs);
1424-
return -1;
1425-
}
1426-
1427-
Py_XDECREF(ks8);
1428-
Py_XDECREF(vs8);
1429-
Py_DECREF(ks);
1430-
Py_DECREF(vs);
1431-
}
1432-
1433-
return 0;
1434-
}
1435-
1436-
1437-
14381372
/**
14391373
* @brief Set single special producer config value.
14401374
*
14411375
* @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
14421376
*/
14431377
static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
1444-
rd_kafka_topic_conf_t *tconf,
14451378
const char *name, PyObject *valobj) {
14461379

14471380
if (!strcmp(name, "on_delivery")) {
@@ -1483,7 +1416,6 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
14831416
* @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
14841417
*/
14851418
static int consumer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
1486-
rd_kafka_topic_conf_t *tconf,
14871419
const char *name, PyObject *valobj) {
14881420

14891421
if (!strcmp(name, "on_commit")) {
@@ -1516,7 +1448,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15161448
PyObject *args,
15171449
PyObject *kwargs) {
15181450
rd_kafka_conf_t *conf;
1519-
rd_kafka_topic_conf_t *tconf;
15201451
Py_ssize_t pos = 0;
15211452
PyObject *ko, *vo;
15221453
PyObject *confdict = NULL;
@@ -1569,14 +1500,13 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15691500
}
15701501

15711502
conf = rd_kafka_conf_new();
1572-
tconf = rd_kafka_topic_conf_new();
15731503

15741504
/*
15751505
* Default config (overridable by user)
15761506
*/
15771507

15781508
/* Enable valid offsets in delivery reports */
1579-
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);
1509+
rd_kafka_conf_set(conf, "produce.offset.report", "true", NULL, 0);
15801510

15811511
/*
15821512
* Plugins must be configured prior to handling any of their configuration properties.
@@ -1592,7 +1522,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15921522
PyErr_SetString(PyExc_TypeError,
15931523
"expected configuration property name "
15941524
"as type unicode string");
1595-
rd_kafka_topic_conf_destroy(tconf);
15961525
rd_kafka_conf_destroy(conf);
15971526
Py_DECREF(confdict);
15981527

@@ -1606,7 +1535,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16061535
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
16071536
"%s", errstr);
16081537

1609-
rd_kafka_topic_conf_destroy(tconf);
16101538
rd_kafka_conf_destroy(conf);
16111539
Py_DECREF(confdict);
16121540

@@ -1622,6 +1550,20 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16221550
PyDict_DelItemString(confdict, "plugin.library.paths");
16231551
}
16241552

1553+
if ((vo = PyDict_GetItemString(confdict, "default.topic.config"))) {
1554+
/* TODO: uncomment for 1.0 release
1555+
PyErr_Warn(PyExc_DeprecationWarning,
1556+
"default.topic.config has being deprecated, "
1557+
"set default topic configuration values in the global dict");
1558+
*/
1559+
if (PyDict_Update(confdict, vo) == -1) {
1560+
rd_kafka_conf_destroy(conf);
1561+
Py_DECREF(confdict);
1562+
return NULL;
1563+
}
1564+
PyDict_DelItemString(confdict, "default.topic.config");
1565+
}
1566+
16251567
/* Convert config dict to config key-value pairs. */
16261568
while (PyDict_Next(confdict, &pos, &ko, &vo)) {
16271569
PyObject *ks, *ks8;
@@ -1632,35 +1574,21 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16321574
int r = 0;
16331575

16341576
if (!(ks = cfl_PyObject_Unistr(ko))) {
1635-
PyErr_SetString(PyExc_TypeError,
1636-
"expected configuration property name "
1637-
"as type unicode string");
1638-
rd_kafka_topic_conf_destroy(tconf);
1577+
PyErr_SetString(PyExc_TypeError,
1578+
"expected configuration property name "
1579+
"as type unicode string");
16391580
rd_kafka_conf_destroy(conf);
16401581
Py_DECREF(confdict);
16411582

16421583
return NULL;
16431584
}
16441585

16451586
k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1646-
if (!strcmp(k, "default.topic.config")) {
1647-
if (populate_topic_conf(tconf, k, vo) == -1) {
1648-
Py_DECREF(ks);
1649-
rd_kafka_topic_conf_destroy(tconf);
1650-
rd_kafka_conf_destroy(conf);
1651-
Py_DECREF(confdict);
1652-
return NULL;
1653-
}
1654-
Py_XDECREF(ks8);
1655-
Py_DECREF(ks);
1656-
continue;
1657-
1658-
} else if (!strcmp(k, "error_cb")) {
1587+
if (!strcmp(k, "error_cb")) {
16591588
if (!PyCallable_Check(vo)) {
16601589
PyErr_SetString(PyExc_TypeError,
16611590
"expected error_cb property "
16621591
"as a callable function");
1663-
rd_kafka_topic_conf_destroy(tconf);
16641592
rd_kafka_conf_destroy(conf);
16651593
Py_DECREF(confdict);
16661594

@@ -1685,7 +1613,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16851613
PyErr_SetString(PyExc_ValueError,
16861614
"expected throttle_cb property "
16871615
"as a callable function");
1688-
rd_kafka_topic_conf_destroy(tconf);
16891616
rd_kafka_conf_destroy(conf);
16901617
Py_DECREF(confdict);
16911618

@@ -1710,7 +1637,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17101637
PyErr_SetString(PyExc_TypeError,
17111638
"expected stats_cb property "
17121639
"as a callable function");
1713-
rd_kafka_topic_conf_destroy(tconf);
17141640
rd_kafka_conf_destroy(conf);
17151641
Py_DECREF(confdict);
17161642

@@ -1748,14 +1674,13 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17481674

17491675
/* Special handling for certain config keys. */
17501676
if (ktype == RD_KAFKA_PRODUCER)
1751-
r = producer_conf_set_special(h, conf, tconf, k, vo);
1677+
r = producer_conf_set_special(h, conf, k, vo);
17521678
else if (ktype == RD_KAFKA_CONSUMER)
1753-
r = consumer_conf_set_special(h, conf, tconf, k, vo);
1679+
r = consumer_conf_set_special(h, conf, k, vo);
17541680
if (r == -1) {
17551681
/* Error */
17561682
Py_XDECREF(ks8);
17571683
Py_DECREF(ks);
1758-
rd_kafka_topic_conf_destroy(tconf);
17591684
rd_kafka_conf_destroy(conf);
17601685
Py_DECREF(confdict);
17611686

@@ -1778,7 +1703,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17781703
"expected configuration "
17791704
"property value as type "
17801705
"unicode string");
1781-
rd_kafka_topic_conf_destroy(tconf);
17821706
rd_kafka_conf_destroy(conf);
17831707
Py_DECREF(confdict);
17841708

@@ -1794,7 +1718,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17941718
RD_KAFKA_CONF_OK) {
17951719
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
17961720
"%s", errstr);
1797-
rd_kafka_topic_conf_destroy(tconf);
17981721
rd_kafka_conf_destroy(conf);
17991722
Py_DECREF(confdict);
18001723

@@ -1830,9 +1753,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
18301753
rd_kafka_conf_set_log_cb(conf, log_cb);
18311754
}
18321755

1833-
rd_kafka_topic_conf_set_opaque(tconf, h);
1834-
rd_kafka_conf_set_default_topic_conf(conf, tconf);
1835-
18361756
rd_kafka_conf_set_opaque(conf, h);
18371757

18381758
#ifdef WITH_PY_TSS

docs/index.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ providing a dict of configuration properties to the instance constructor, e.g.::
100100
conf = {'bootstrap.servers': 'mybroker.com',
101101
'group.id': 'mygroup', 'session.timeout.ms': 6000,
102102
'on_commit': my_commit_callback,
103-
'default.topic.config': {'auto.offset.reset': 'smallest'}}
103+
'auto.offset.reset': 'earliest'}
104104
consumer = confluent_kafka.Consumer(conf)
105105

106106
The supported configuration values are dictated by the underlying
@@ -111,7 +111,8 @@ https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
111111
The Python bindings also provide some additional configuration properties:
112112

113113
* ``default.topic.config``: value is a dict of client topic-level configuration
114-
properties that are applied to all used topics for the instance.
114+
properties that are applied to all used topics for the instance. **DEPRECATED: **
115+
topic configuration should now be specified in the global top-level configuration.
115116
116117
* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served upon calling
117118
``client.poll()`` or ``producer.flush()``.

examples/confluent_cloud.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def acked(err, msg):
8787
'sasl.username': '<ccloud key>',
8888
'sasl.password': '<ccloud secret>',
8989
'group.id': str(uuid.uuid1()), # this will create a new consumer group on each invocation.
90-
'default.topic.config': {'auto.offset.reset': 'smallest'}
90+
'auto.offset.reset': 'earliest'
9191
})
9292

9393
c.subscribe(['python-test-topic'])

examples/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def print_usage_and_exit(program_name):
5252
# Consumer configuration
5353
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
5454
conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
55-
'default.topic.config': {'auto.offset.reset': 'smallest'}}
55+
'auto.offset.reset': 'earliest'}
5656

5757
# Check to see if -T option exists
5858
for opt in optlist:

0 commit comments

Comments
 (0)