Skip to content

Commit 496cbb0

Browse files
committed
feat: Support configure startMessageIdInclusive for the reader
1 parent 8d77f74 commit 496cbb0

File tree

3 files changed

+44
-2
lines changed

3 files changed

+44
-2
lines changed

pulsar/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,8 @@ def create_reader(self, topic, start_message_id,
966966
reader_name=None,
967967
subscription_role_prefix=None,
968968
is_read_compacted=False,
969-
crypto_key_reader=None
969+
crypto_key_reader=None,
970+
start_message_id_inclusive=False
970971
):
971972
"""
972973
Create a reader on a particular topic
@@ -1025,6 +1026,8 @@ def my_listener(reader, message):
10251026
crypto_key_reader: CryptoKeyReader, optional
10261027
Symmetric encryption class implementation, configuring public key encryption messages for the producer
10271028
and private key decryption messages for the consumer
1029+
start_message_id_inclusive: bool, default=False
1030+
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
10281031
"""
10291032

10301033
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -1039,6 +1042,7 @@ def my_listener(reader, message):
10391042
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
10401043
_check_type(bool, is_read_compacted, 'is_read_compacted')
10411044
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
1045+
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
10421046

10431047
conf = _pulsar.ReaderConfiguration()
10441048
if reader_listener:
@@ -1052,6 +1056,7 @@ def my_listener(reader, message):
10521056
conf.read_compacted(is_read_compacted)
10531057
if crypto_key_reader:
10541058
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
1059+
conf.start_message_id_inclusive(start_message_id_inclusive)
10551060

10561061
c = Reader()
10571062
c._reader = self._client.create_reader(topic, start_message_id, conf)

src/config.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,5 +318,7 @@ void export_config(py::module_& m) {
318318
.def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
319319
.def("read_compacted", &ReaderConfiguration::isReadCompacted)
320320
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
321-
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference);
321+
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
322+
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
323+
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
322324
}

tests/pulsar_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,41 @@ def test_reader_is_connected(self):
685685
self.assertFalse(reader.is_connected())
686686
client.close()
687687

688+
def test_reader_seek_for_message_id(self):
689+
client = pulsar.Client(self.serviceUrl)
690+
691+
topic = "test-seek-for-message-id-" + str(int(time.time()))
692+
693+
producer = client.create_producer(topic)
694+
695+
readerExclusive = client.create_reader(topic, MessageId.latest())
696+
readerInclusive = client.create_reader(topic, MessageId.latest(), start_message_id_inclusive=True)
697+
698+
numMessages = 100
699+
seekMessageId = None
700+
701+
r = random.randint(0, numMessages - 2)
702+
for i in range(numMessages):
703+
msg_content = "msg-" + str(i)
704+
id = producer.send(msg_content)
705+
706+
if i == r:
707+
seekMessageId = id
708+
709+
readerExclusive.seek(seekMessageId)
710+
msg0 = readerExclusive.read_next(timeout_millis=3000)
711+
712+
readerInclusive.seek(seekMessageId)
713+
msg1 = readerInclusive.read_next(timeout_millis=3000)
714+
715+
self.assertEqual(msg0.data(), "msg-" + str(r + 1))
716+
self.assertEqual(msg1.data(), "msg-" + str(r))
717+
718+
readerExclusive.close()
719+
readerInclusive.close()
720+
producer.close()
721+
client.close()
722+
688723
def test_producer_sequence_after_reconnection(self):
689724
# Enable deduplication on namespace
690725
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")

0 commit comments

Comments
 (0)