Skip to content

Commit 65ab14c

Browse files
Added a delete records api (#1710)
Added a delete records api --------- Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
1 parent 858347c commit 65ab14c

File tree

9 files changed

+487
-35
lines changed

9 files changed

+487
-35
lines changed

CHANGELOG.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
# Confluent's Python client for Apache Kafka
22

3-
## v2.4.1
3+
## v2.5.0
44

5-
v2.4.1 is a maintenance release with the following fixes and enhancements:
5+
v2.5.0 is a feature release with the following features, fixes and enhancements:
66

7+
- [KIP-107](https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient) Added delete_records API (#1710)
78
- Added an example to show the usage of the custom logger with `AdminClient`
89
- Removed usage of `strcpy` to enhance security of the client (#1745)
910
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
11+
- Fixed documentation for default value of `operation_timeout` and `request_timeout` in various Admin apis (#1710)
1012
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
1113
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object
1214
- Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property
15+
- Fixed some memory leaks related to `PyDict_SetItem`.
1316

14-
confluent-kafka-python is based on librdkafka v2.4.1, see the
15-
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
17+
confluent-kafka-python is based on librdkafka v2.5.0, see the
18+
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.5.0)
1619
for a complete list of changes, enhancements, fixes and upgrade considerations.
1720

1821

docs/index.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ Supporting classes
7777
- :ref:`MemberAssignment <pythonclient_member_assignment>`
7878
- :ref:`MemberDescription <pythonclient_member_description>`
7979
- :ref:`ConsumerGroupDescription <pythonclient_consumer_group_description>`
80+
- :ref:`DeletedRecords <pythonclient_deleted_records>`
8081

8182
Experimental
8283
These classes are experimental and are likely to be removed, or subject to incompatible
@@ -387,6 +388,15 @@ ConsumerGroupDescription
387388
.. autoclass:: confluent_kafka.admin.ConsumerGroupDescription
388389
:members:
389390

391+
.. _pythonclient_deleted_records:
392+
393+
**************
394+
DeletedRecords
395+
**************
396+
397+
.. autoclass:: confluent_kafka.admin.DeletedRecords
398+
:members:
399+
390400
.. _pythonclient_member_assignment:
391401

392402
****************

examples/adminapi.py

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -808,24 +808,24 @@ def example_list_offsets(a, args):
808808
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 3," +
809809
f" got {len(args) - i}")
810810
topic = args[i]
811-
partition = int(args[i+1])
811+
partition = int(args[i + 1])
812812
topic_partition = TopicPartition(topic, partition)
813813

814-
if "EARLIEST" == args[i+2]:
814+
if "EARLIEST" == args[i + 2]:
815815
offset_spec = OffsetSpec.earliest()
816816

817-
elif "LATEST" == args[i+2]:
817+
elif "LATEST" == args[i + 2]:
818818
offset_spec = OffsetSpec.latest()
819819

820-
elif "MAX_TIMESTAMP" == args[i+2]:
820+
elif "MAX_TIMESTAMP" == args[i + 2]:
821821
offset_spec = OffsetSpec.max_timestamp()
822822

823-
elif "TIMESTAMP" == args[i+2]:
823+
elif "TIMESTAMP" == args[i + 2]:
824824
if i + 4 > len(args):
825825
raise ValueError(
826826
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 4" +
827827
f", got {len(args) - i}")
828-
offset_spec = OffsetSpec.for_timestamp(int(args[i+3]))
828+
offset_spec = OffsetSpec.for_timestamp(int(args[i + 3]))
829829
i += 1
830830
else:
831831
raise ValueError("Invalid OffsetSpec, must be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP")
@@ -845,6 +845,39 @@ def example_list_offsets(a, args):
845845
.format(partition.topic, partition.partition, e))
846846

847847

848+
def example_delete_records(a, args):
849+
if len(args) == 0:
850+
raise ValueError(
851+
"Invalid number of arguments for delete_records, expected at least 3 " +
852+
"(Usage: delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..])")
853+
if len(args) % 3 != 0:
854+
raise ValueError(
855+
"Invalid number of arguments for delete_records " +
856+
"(Usage: delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..])")
857+
858+
topic_partition_offsets = [
859+
TopicPartition(topic, int(partition), int(offset))
860+
for topic, partition, offset in zip(args[::3], args[1::3], args[2::3])
861+
]
862+
863+
futmap = a.delete_records(topic_partition_offsets)
864+
for partition, fut in futmap.items():
865+
try:
866+
result = fut.result()
867+
if partition.offset == -1:
868+
print(f"All records deleted in topic {partition.topic} partition {partition.partition}." +
869+
f"The minimum offset in this partition is now {result.low_watermark}")
870+
else:
871+
print(
872+
f"All records deleted before offset {partition.offset} in topic {partition.topic}" +
873+
f" partition {partition.partition}. The minimum offset in this partition" +
874+
f" is now {result.low_watermark}")
875+
except KafkaException as e:
876+
print(
877+
f"Error deleting records in topic {partition.topic} partition {partition.partition}" +
878+
f" before offset {partition.offset}: {e}")
879+
880+
848881
if __name__ == '__main__':
849882
if len(sys.argv) < 3:
850883
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
@@ -883,7 +916,7 @@ def example_list_offsets(a, args):
883916
' <password2> <salt2> DELETE <user3> <mechanism3> ..]\n')
884917
sys.stderr.write(' list_offsets <isolation_level> <topic1> <partition1> <offset_spec1> ' +
885918
'[<topic2> <partition2> <offset_spec2> ..]\n')
886-
919+
sys.stderr.write(' delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..]\n')
887920
sys.exit(1)
888921

889922
broker = sys.argv[1]
@@ -913,7 +946,8 @@ def example_list_offsets(a, args):
913946
'alter_consumer_group_offsets': example_alter_consumer_group_offsets,
914947
'describe_user_scram_credentials': example_describe_user_scram_credentials,
915948
'alter_user_scram_credentials': example_alter_user_scram_credentials,
916-
'list_offsets': example_list_offsets}
949+
'list_offsets': example_list_offsets,
950+
'delete_records': example_delete_records}
917951

918952
if operation not in opsmap:
919953
sys.stderr.write('Unknown operation: %s\n' % operation)

0 commit comments

Comments
 (0)