Skip to content

Commit c391879

Browse files
authored
Fix calls with empty lists (#1662)
* Empty topic collection allowed * Empty topic names disallowed * Test with different options * New _make_futmap_result_from_list method
1 parent aaeca9a commit c391879

File tree

3 files changed

+97
-59
lines changed

3 files changed

+97
-59
lines changed

src/confluent_kafka/admin/__init__.py

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -199,31 +199,6 @@ def _make_consumer_groups_result(f, futmap):
199199
for _, fut in futmap.items():
200200
fut.set_exception(e)
201201

202-
@staticmethod
203-
def _make_describe_topics_result(f, futmap):
204-
"""
205-
Map per-topic results to per-topic futures in futmap.
206-
"""
207-
try:
208-
209-
results = f.result()
210-
futmap_values = list(futmap.values())
211-
len_results = len(results)
212-
len_futures = len(futmap_values)
213-
if len_results != len_futures:
214-
raise RuntimeError(
215-
"Results length {} is different from future-map length {}".format(len_results, len_futures))
216-
for i, result in enumerate(results):
217-
fut = futmap_values[i]
218-
if isinstance(result, KafkaError):
219-
fut.set_exception(KafkaException(result))
220-
else:
221-
fut.set_result(result)
222-
except Exception as e:
223-
# Request-level exception, raise the same for all topics
224-
for _, fut in futmap.items():
225-
fut.set_exception(e)
226-
227202
@staticmethod
228203
def _make_consumer_group_offsets_result(f, futmap):
229204
"""
@@ -298,6 +273,28 @@ def _make_user_scram_credentials_result(f, futmap):
298273
for _, fut in futmap.items():
299274
fut.set_exception(e)
300275

276+
@staticmethod
277+
def _make_futmap_result_from_list(f, futmap):
278+
try:
279+
280+
results = f.result()
281+
futmap_values = list(futmap.values())
282+
len_results = len(results)
283+
len_futures = len(futmap_values)
284+
if len_results != len_futures:
285+
raise RuntimeError(
286+
"Results length {} is different from future-map length {}".format(len_results, len_futures))
287+
for i, result in enumerate(results):
288+
fut = futmap_values[i]
289+
if isinstance(result, KafkaError):
290+
fut.set_exception(KafkaException(result))
291+
else:
292+
fut.set_result(result)
293+
except Exception as e:
294+
# Request-level exception, raise the same for all topics
295+
for _, fut in futmap.items():
296+
fut.set_exception(e)
297+
301298
@staticmethod
302299
def _make_futmap_result(f, futmap):
303300
try:
@@ -948,11 +945,8 @@ def describe_topics(self, topics, **kwargs):
948945
if not isinstance(topic_names, list):
949946
raise TypeError("Expected list of topic names to be described")
950947

951-
if len(topic_names) == 0:
952-
raise ValueError("Expected at least one topic to be described")
953-
954-
f, futmap = AdminClient._make_futures(topic_names, None,
955-
AdminClient._make_describe_topics_result)
948+
f, futmap = AdminClient._make_futures_v2(topic_names, None,
949+
AdminClient._make_futmap_result_from_list)
956950

957951
super(AdminClient, self).describe_topics(topic_names, f, **kwargs)
958952

src/confluent_kafka/src/Admin.c

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2326,36 +2326,43 @@ PyObject *Admin_describe_topics (Handle *self, PyObject *args, PyObject *kwargs)
23262326
&options.include_authorized_operations))
23272327
goto err;
23282328

2329-
if (!PyList_Check(topics) || (topics_cnt = (int)PyList_Size(topics)) < 1) {
2330-
PyErr_SetString(PyExc_ValueError,
2331-
"Expected non-empty list of topics");
2329+
if (!PyList_Check(topics)) {
2330+
PyErr_SetString(PyExc_TypeError,
2331+
"Expected a list of topics");
23322332
goto err;
23332333
}
23342334

2335-
c_topics = malloc(sizeof(char *) * topics_cnt);
2335+
topics_cnt = PyList_Size(topics);
23362336

2337-
for (i = 0 ; i < topics_cnt ; i++) {
2338-
PyObject *topic = PyList_GET_ITEM(topics, i);
2339-
PyObject *utopic;
2340-
PyObject *uotopic = NULL;
2337+
if (topics_cnt) {
2338+
c_topics = malloc(sizeof(char *) * topics_cnt);
2339+
for (i = 0 ; i < topics_cnt ; i++) {
2340+
PyObject *topic = PyList_GET_ITEM(topics, i);
2341+
PyObject *uotopic = NULL;
23412342

2342-
if (topic == Py_None ||
2343-
!(utopic = cfl_PyObject_Unistr(topic))) {
2344-
PyErr_Format(PyExc_ValueError,
2345-
"Expected list of topics strings, "
2346-
"not %s",
2347-
((PyTypeObject *)PyObject_Type(topic))->
2348-
tp_name);
2349-
goto err;
2350-
}
2343+
if (topic == Py_None ||
2344+
!PyUnicode_Check(topic)) {
2345+
PyErr_Format(PyExc_TypeError,
2346+
"Expected list of topics strings, "
2347+
"not %s",
2348+
((PyTypeObject *)PyObject_Type(topic))->
2349+
tp_name);
2350+
goto err;
2351+
}
23512352

2352-
c_topics[i] = cfl_PyUnistr_AsUTF8(utopic, &uotopic);
2353+
c_topics[i] = cfl_PyUnistr_AsUTF8(topic, &uotopic);
2354+
Py_XDECREF(uotopic);
23532355

2354-
Py_XDECREF(utopic);
2355-
Py_XDECREF(uotopic);
2356+
if (!c_topics[i][0]) {
2357+
PyErr_Format(PyExc_ValueError,
2358+
"Empty topic name at index %d isn't "
2359+
"allowed", i);
2360+
goto err;
2361+
}
2362+
}
23562363
}
2357-
c_topic_collection = rd_kafka_TopicCollection_of_topic_names(c_topics, topics_cnt);
23582364

2365+
c_topic_collection = rd_kafka_TopicCollection_of_topic_names(c_topics, topics_cnt);
23592366
c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBETOPICS,
23602367
&options, future);
23612368
if (!c_options) {

tests/test_Admin.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -637,18 +637,55 @@ def test_describe_consumer_groups_api():
637637
def test_describe_topics_api():
638638
a = AdminClient({"socket.timeout.ms": 10})
639639

640-
topic_names = ["test-topic-1", "test-topic-2"]
640+
# Wrong option types
641+
for kwargs in [{"include_authorized_operations": "wrong_type"},
642+
{"request_timeout": "wrong_type"}]:
643+
with pytest.raises(TypeError):
644+
a.describe_topics(TopicCollection([]), **kwargs)
641645

642-
a.describe_topics(TopicCollection(topic_names))
646+
# Wrong option values
647+
for kwargs in [{"request_timeout": -1}]:
648+
with pytest.raises(ValueError):
649+
a.describe_topics(TopicCollection([]), **kwargs)
643650

644-
with pytest.raises(TypeError):
645-
a.describe_topics(topic_names)
651+
# Test with different options
652+
for kwargs in [{},
653+
{"include_authorized_operations": True},
654+
{"request_timeout": 0.01},
655+
{"include_authorized_operations": False,
656+
"request_timeout": 0.01}]:
646657

647-
with pytest.raises(TypeError):
648-
a.describe_topics("test-topic-1")
658+
topic_names = ["test-topic-1", "test-topic-2"]
649659

650-
with pytest.raises(ValueError):
651-
a.describe_topics(TopicCollection([]))
660+
# Empty TopicCollection returns empty futures
661+
fs = a.describe_topics(TopicCollection([]), **kwargs)
662+
assert len(fs) == 0
663+
664+
# Normal call
665+
fs = a.describe_topics(TopicCollection(topic_names), **kwargs)
666+
for f in concurrent.futures.as_completed(iter(fs.values())):
667+
e = f.exception(timeout=1)
668+
assert isinstance(e, KafkaException)
669+
assert e.args[0].code() == KafkaError._TIMED_OUT
670+
671+
# Wrong argument type
672+
for args in [
673+
[topic_names],
674+
["test-topic-1"],
675+
[TopicCollection([3])],
676+
[TopicCollection(["correct", 3])],
677+
[TopicCollection([None])]
678+
]:
679+
with pytest.raises(TypeError):
680+
a.describe_topics(*args, **kwargs)
681+
682+
# Wrong argument value
683+
for args in [
684+
[TopicCollection([""])],
685+
[TopicCollection(["correct", ""])]
686+
]:
687+
with pytest.raises(ValueError):
688+
a.describe_topics(*args, **kwargs)
652689

653690

654691
def test_describe_cluster():

0 commit comments

Comments
 (0)