Skip to content

Commit 86ae277

Browse files
committed
Requested Changes
1 parent 654e069 commit 86ae277

File tree

6 files changed

+75
-35
lines changed

6 files changed

+75
-35
lines changed

examples/adminapi.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -862,19 +862,13 @@ def example_delete_records(a, args):
862862
for partition, fut in futmap.items():
863863
try:
864864
result = fut.result()
865-
if result.error:
866-
print(f"Error deleting records in topic {partition.topic} partition {partition.partition}" +
867-
f" before offset {partition.offset}: {result.error.str()}")
868-
else:
869-
print(
870-
f"All records deleted before offset {partition.offset} in topic {partition.topic}" +
871-
f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}")
865+
print(
866+
f"All records deleted before offset {partition.offset} in topic {partition.topic}" +
867+
f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}")
872868
except KafkaException as e:
873869
print(
874870
f"Error deleting records in topic {partition.topic} partition {partition.partition}" +
875871
f" before offset {partition.offset}: {e}")
876-
except Exception:
877-
raise
878872

879873

880874
if __name__ == '__main__':

src/confluent_kafka/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
ConsumerGroupState,
2525
TopicCollection,
2626
TopicPartitionInfo,
27-
IsolationLevel)
27+
IsolationLevel,
28+
DeleteRecordsResult )
2829

2930
from .cimpl import (Producer,
3031
Consumer,
@@ -49,7 +50,7 @@
4950
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
5051
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
5152
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
52-
'IsolationLevel']
53+
'IsolationLevel', 'DeleteRecordsResult']
5354

5455
__version__ = version()[0]
5556

src/confluent_kafka/_model/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,16 @@ def __lt__(self, other):
149149
if self.__class__ != other.__class__:
150150
return NotImplemented
151151
return self.value < other.value
152+
153+
class DeleteRecordsResult:
154+
"""
155+
DeleteRecordsResult
156+
Result of a `AdminClient.delete_records` call associated to a partition.
157+
158+
Parameters
159+
----------
160+
offset: int
161+
The offset returned by the delete_records call.
162+
"""
163+
def __init__(self, offset):
164+
self.offset = offset

src/confluent_kafka/admin/__init__.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -543,10 +543,8 @@ def _check_delete_records(request):
543543
for req in request:
544544
if not isinstance(req, _TopicPartition):
545545
raise TypeError("Element of the request list must be of type 'TopicPartition' ")
546-
if req is None:
547-
raise ValueError("Individual request in the request list cannot be 'None'")
548546
if req.partition < 0:
549-
raise ValueError("Elements of the list must not have negative value for 'partition' field")
547+
raise ValueError(" 'partition' cannot be negative")
550548

551549
def create_topics(self, new_topics, **kwargs):
552550
"""
@@ -1235,10 +1233,8 @@ def delete_records(self, topic_partition_offsets_list, **kwargs):
12351233
in the cluster. A value of 0 returns immediately. Default: 0
12361234
12371235
:returns: A dict of futures keyed by the TopicPartition.
1238-
The future result() method returns a TopicPartition list indicating that
1239-
deletion operation have been performed till the specified Topic Partition
1240-
and error if any has occured. User has to check if any error has occured
1241-
during deletion in each partition.
1236+
The future result() method returns DeleteRecordsResult
1237+
or raises KafkaException
12421238
12431239
:rtype: dict[TopicPartition, future]
12441240
@@ -1249,7 +1245,7 @@ def delete_records(self, topic_partition_offsets_list, **kwargs):
12491245
AdminClient._check_delete_records(topic_partition_offsets_list)
12501246

12511247
f, futmap = AdminClient._make_futures_v2(
1252-
topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result_from_list)
1248+
topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result)
12531249

12541250
super(AdminClient, self).delete_records(topic_partition_offsets_list, f, **kwargs)
12551251
return futmap

src/confluent_kafka/src/Admin.c

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4470,6 +4470,50 @@ static PyObject *Admin_c_ListOffsetsResultInfos_to_py (const rd_kafka_ListOffset
44704470
return NULL;
44714471
}
44724472

4473+
static PyObject *Admin_c_DeleteRecordsResult_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions) {
4474+
4475+
size_t c_topic_partition_cnt = c_topic_partitions->cnt;
4476+
PyObject *result = NULL;
4477+
PyObject *DeleteRecordsResult_type = NULL;
4478+
size_t i;
4479+
4480+
DeleteRecordsResult_type = cfl_PyObject_lookup("confluent_kafka", "DeleteRecordsResult");
4481+
4482+
if(!DeleteRecordsResult_type){
4483+
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecordsResult type");
4484+
return NULL;
4485+
}
4486+
4487+
result = PyDict_New();
4488+
for(i=0; i<c_topic_partition_cnt; i++){
4489+
PyObject *value = NULL;
4490+
rd_kafka_topic_partition_t *c_topic_partition = &c_topic_partitions->elems[i];
4491+
if (c_topic_partitions->elems[i].err) {
4492+
value = KafkaError_new_or_None(c_topic_partitions->elems[i].err, rd_kafka_err2str(c_topic_partitions->elems[i].err));
4493+
} else {
4494+
PyObject *args = NULL;
4495+
PyObject *kwargs = NULL;
4496+
kwargs = PyDict_New();
4497+
cfl_PyDict_SetLong(kwargs, "offset", c_topic_partitions->elems[i].offset);
4498+
args = PyTuple_New(0);
4499+
value = PyObject_Call(DeleteRecordsResult_type, args, kwargs);
4500+
Py_DECREF(args);
4501+
Py_DECREF(kwargs);
4502+
if (value == NULL)
4503+
goto raise;
4504+
}
4505+
PyDict_SetItem(result, c_part_to_py(c_topic_partition), value);
4506+
Py_DECREF(value);
4507+
}
4508+
4509+
Py_DECREF(DeleteRecordsResult_type);
4510+
return result;
4511+
raise:
4512+
Py_DECREF(result);
4513+
Py_DECREF(DeleteRecordsResult_type);
4514+
return NULL;
4515+
}
4516+
44734517
/**
44744518
* @brief Event callback triggered from librdkafka's background thread
44754519
* when Admin API results are ready.
@@ -4818,7 +4862,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
48184862
const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev);
48194863
const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res);
48204864

4821-
result = c_parts_to_py(c_delete_records_res_list);
4865+
result = Admin_c_DeleteRecordsResult_to_py(c_delete_records_res_list);
48224866
break;
48234867
}
48244868

tests/integration/admin/test_delete_records.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec
17-
from confluent_kafka import TopicPartition
16+
from confluent_kafka.admin import OffsetSpec
17+
from confluent_kafka import TopicPartition, DeleteRecordsResult
1818

1919

2020
def test_delete_records(kafka_cluster):
@@ -43,29 +43,21 @@ def test_delete_records(kafka_cluster):
4343

4444
# Check if the earliest avilable offset for this topic partition is 0
4545
fs = admin_client.list_offsets(requests)
46-
for _, fut in fs.items():
47-
result = fut.result()
48-
assert isinstance(result, ListOffsetsResultInfo)
49-
assert (result.offset == 0)
46+
result = list(fs.values())[0].result()
47+
assert (result.offset == 0)
5048

5149
topic_partition_offset = TopicPartition(topic, 0, 2)
5250

5351
# Delete the records
5452
fs1 = admin_client.delete_records([topic_partition_offset])
55-
earliest_offset_available = 0
5653

5754
# Find the earliest available offset for that specific topic partition after deletion has been done
5855
fs2 = admin_client.list_offsets(requests)
59-
for _, fut in fs2.items():
60-
result = fut.result()
61-
assert isinstance(result, ListOffsetsResultInfo)
62-
earliest_offset_available = result.offset
6356

6457
# Check if the earliest available offset is equal to the offset passed to the delete records function
65-
for _, fut in fs1.items():
66-
result = fut.result()
67-
assert isinstance(result, TopicPartition)
68-
assert (result.offset == earliest_offset_available)
58+
res = list(fs1.values())[0].result()
59+
assert isinstance(res, DeleteRecordsResult)
60+
assert (res.offset == list(fs2.values())[0].result().offset)
6961

7062
# Delete created topic
7163
fs = admin_client.delete_topics([topic])

0 commit comments

Comments
 (0)