Skip to content

Commit 0b2cce3

Browse files
rnpridgeonedenhill
authored andcommitted
Intercept and set debug context before interceptor
1 parent efd9e62 commit 0b2cce3

File tree

1 file changed

+20
-82
lines changed

1 file changed

+20
-82
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 20 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,7 +1506,6 @@ static int common_conf_set_special(PyObject *confdict, rd_kafka_conf_t *conf,
15061506
!= RD_KAFKA_CONF_OK) {
15071507
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
15081508
"%s", errstr);
1509-
15101509
Py_DECREF(vs);
15111510
Py_XDECREF(vs8);
15121511
return 0;
@@ -1585,8 +1584,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15851584
/*
15861585
* Set debug contexts first to capture all events including plugin loading
15871586
*/
1588-
if ((vo = PyDict_GetItemString(confdict, "debug")))
1589-
if (!common_conf_set_special(confdict, conf, "debug", vo))
1587+
if ((vo = PyDict_GetItemString(confdict, "debug")) &&
1588+
!common_conf_set_special(confdict, conf, "debug", vo))
15901589
goto outer_err;
15911590

15921591
/*
@@ -1598,40 +1597,15 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15981597
* correct order.
15991598
*/
16001599
if ((vo = PyDict_GetItemString(confdict, "plugin.library.paths"))) {
1601-
const char *v;
1602-
char errstr[256];
1603-
PyObject *resolved;
1604-
PyObject *vs = NULL, *vs8 = NULL;
1605-
16061600
/* Resolve plugin paths */
1601+
PyObject *resolved;
16071602
resolved = resolve_plugins(vo);
16081603
if (!resolved)
16091604
goto outer_err;
16101605

16111606
if (!common_conf_set_special(confdict, conf, "plugin.library.paths", vo)) {
16121607
Py_DECREF(resolved);
1613-
rd_kafka_conf_destroy(conf);
1614-
Py_DECREF(confdict);
1615-
1616-
return NULL;
1617-
}
1618-
1619-
Py_DECREF(resolved);
1620-
1621-
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
1622-
1623-
if (rd_kafka_conf_set(conf, "plugin.library.paths", v, errstr, sizeof(errstr))
1624-
!= RD_KAFKA_CONF_OK) {
1625-
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1626-
"%s", errstr);
1627-
1628-
rd_kafka_conf_destroy(conf);
1629-
Py_DECREF(confdict);
1630-
1631-
Py_XDECREF(vs8);
1632-
Py_XDECREF(vs);
1633-
1634-
return NULL;
1608+
goto outer_err;
16351609
}
16361610
Py_DECREF(resolved);
16371611
}
@@ -1643,16 +1617,15 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16431617
"set default topic configuration values in the global dict");
16441618
*/
16451619
if (PyDict_Update(confdict, vo) == -1) {
1646-
rd_kafka_conf_destroy(conf);
1647-
Py_DECREF(confdict);
1648-
return NULL;
1620+
goto outer_err;
16491621
}
16501622
PyDict_DelItemString(confdict, "default.topic.config");
16511623
}
16521624

16531625
/* Convert config dict to config key-value pairs. */
16541626
while (PyDict_Next(confdict, &pos, &ko, &vo)) {
1655-
PyObject *ks, *ks8;
1627+
PyObject *ks;
1628+
PyObject *ks8 = NULL;
16561629
PyObject *vs = NULL, *vs8 = NULL;
16571630
const char *k;
16581631
const char *v;
@@ -1663,10 +1636,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16631636
PyErr_SetString(PyExc_TypeError,
16641637
"expected configuration property name "
16651638
"as type unicode string");
1666-
rd_kafka_conf_destroy(conf);
1667-
Py_DECREF(confdict);
1668-
1669-
return NULL;
1639+
goto inner_err;
16701640
}
16711641

16721642
k = cfl_PyUnistr_AsUTF8(ks, &ks8);
@@ -1675,13 +1645,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16751645
PyErr_SetString(PyExc_TypeError,
16761646
"expected error_cb property "
16771647
"as a callable function");
1678-
rd_kafka_conf_destroy(conf);
1679-
Py_DECREF(confdict);
1680-
1681-
Py_XDECREF(ks8);
1682-
Py_DECREF(ks);
1683-
1684-
return NULL;
1648+
goto inner_err;
16851649
}
16861650
if (h->error_cb) {
16871651
Py_DECREF(h->error_cb);
@@ -1699,13 +1663,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16991663
PyErr_SetString(PyExc_ValueError,
17001664
"expected throttle_cb property "
17011665
"as a callable function");
1702-
rd_kafka_conf_destroy(conf);
1703-
Py_DECREF(confdict);
1704-
1705-
Py_XDECREF(ks8);
1706-
Py_DECREF(ks);
1707-
1708-
return NULL;
1666+
goto inner_err;
17091667
}
17101668
if (h->throttle_cb) {
17111669
Py_DECREF(h->throttle_cb);
@@ -1723,13 +1681,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17231681
PyErr_SetString(PyExc_TypeError,
17241682
"expected stats_cb property "
17251683
"as a callable function");
1726-
rd_kafka_conf_destroy(conf);
1727-
Py_DECREF(confdict);
1728-
1729-
Py_XDECREF(ks8);
1730-
Py_DECREF(ks);
1731-
1732-
return NULL;
1684+
goto inner_err;
17331685
}
17341686

17351687
if (h->stats_cb) {
@@ -1765,13 +1717,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17651717
r = consumer_conf_set_special(h, conf, k, vo);
17661718
if (r == -1) {
17671719
/* Error */
1768-
Py_XDECREF(ks8);
1769-
Py_DECREF(ks);
1770-
rd_kafka_conf_destroy(conf);
1771-
Py_DECREF(confdict);
1772-
1773-
return NULL;
1774-
1720+
goto inner_err;
17751721
} else if (r == 1) {
17761722
/* Handled */
17771723
continue;
@@ -1789,13 +1735,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17891735
"expected configuration "
17901736
"property value as type "
17911737
"unicode string");
1792-
rd_kafka_conf_destroy(conf);
1793-
Py_DECREF(confdict);
1794-
1795-
Py_XDECREF(ks8);
1796-
Py_DECREF(ks);
1797-
1798-
return NULL;
1738+
goto inner_err;
17991739
}
18001740
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
18011741
}
@@ -1804,15 +1744,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
18041744
RD_KAFKA_CONF_OK) {
18051745
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
18061746
"%s", errstr);
1807-
rd_kafka_conf_destroy(conf);
1808-
Py_DECREF(confdict);
1809-
1810-
Py_XDECREF(vs8);
1811-
Py_XDECREF(vs);
1812-
Py_XDECREF(ks8);
1813-
Py_DECREF(ks);
1814-
1815-
return NULL;
1747+
goto inner_err;
18161748
}
18171749

18181750
Py_XDECREF(vs8);
@@ -1863,6 +1795,12 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
18631795
h->initiated = 1;
18641796

18651797
return conf;
1798+
1799+
outer_err:
1800+
Py_DECREF(confdict);
1801+
rd_kafka_conf_destroy(conf);
1802+
1803+
return NULL;
18661804
}
18671805

18681806

0 commit comments

Comments
 (0)