Skip to content

Commit a194432

Browse files
committed
feat: Support configure startMessageIdInclusive for the reader
1 parent 766db9e commit a194432

File tree

3 files changed

+44
-2
lines changed

3 files changed

+44
-2
lines changed

pulsar/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,8 @@ def create_reader(self, topic, start_message_id,
861861
reader_name=None,
862862
subscription_role_prefix=None,
863863
is_read_compacted=False,
864-
crypto_key_reader=None
864+
crypto_key_reader=None,
865+
start_message_id_inclusive=False
865866
):
866867
"""
867868
Create a reader on a particular topic
@@ -920,6 +921,8 @@ def my_listener(reader, message):
920921
crypto_key_reader: CryptoKeyReader, optional
921922
Symmetric encryption class implementation, configuring public key encryption messages for the producer
922923
and private key decryption messages for the consumer
924+
start_message_id_inclusive: bool, default=False
925+
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
923926
"""
924927

925928
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -934,6 +937,7 @@ def my_listener(reader, message):
934937
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
935938
_check_type(bool, is_read_compacted, 'is_read_compacted')
936939
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
940+
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
937941

938942
conf = _pulsar.ReaderConfiguration()
939943
if reader_listener:
@@ -947,6 +951,7 @@ def my_listener(reader, message):
947951
conf.read_compacted(is_read_compacted)
948952
if crypto_key_reader:
949953
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
954+
conf.start_message_id_inclusive(start_message_id_inclusive)
950955

951956
c = Reader()
952957
c._reader = self._client.create_reader(topic, start_message_id, conf)

src/config.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,5 +296,7 @@ void export_config(py::module_& m) {
296296
.def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
297297
.def("read_compacted", &ReaderConfiguration::isReadCompacted)
298298
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
299-
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference);
299+
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
300+
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
301+
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
300302
}

tests/pulsar_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,41 @@ def test_reader_is_connected(self):
581581
self.assertFalse(reader.is_connected())
582582
client.close()
583583

584+
def test_reader_seek_for_message_id(self):
585+
client = pulsar.Client(self.serviceUrl)
586+
587+
topic = "test-seek-for-message-id-" + str(int(time.time()))
588+
589+
producer = client.create_producer(topic)
590+
591+
readerExclusive = client.create_reader(topic, MessageId.latest())
592+
readerInclusive = client.create_reader(topic, MessageId.latest(), start_message_id_inclusive=True)
593+
594+
numMessages = 100
595+
seekMessageId = None
596+
597+
r = random.randint(0, numMessages - 2)
598+
for i in range(numMessages):
599+
msg_content = "msg-" + str(i)
600+
id = producer.send(msg_content)
601+
602+
if i == r:
603+
seekMessageId = id
604+
605+
readerExclusive.seek(seekMessageId)
606+
msg0 = readerExclusive.read_next(timeout_millis=3000)
607+
608+
readerInclusive.seek(seekMessageId)
609+
msg1 = readerInclusive.read_next(timeout_millis=3000)
610+
611+
self.assertEqual(msg0.data(), "msg-" + str(r + 1))
612+
self.assertEqual(msg1.data(), "msg-" + str(r))
613+
614+
readerExclusive.close()
615+
readerInclusive.close()
616+
producer.close()
617+
client.close()
618+
584619
def test_producer_sequence_after_reconnection(self):
585620
# Enable deduplication on namespace
586621
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")

0 commit comments

Comments
 (0)