Skip to content

Commit c3c763e

Browse files
committed
feat: add get partitioned topic names
1 parent b1c9487 commit c3c763e

File tree

3 files changed

+96
-0
lines changed

3 files changed

+96
-0
lines changed

pulsar/asyncio.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
ConsumerCryptoFailureAction,
4040
)
4141
import pulsar
42+
from pulsar import _check_type
4243

4344
class PulsarException(BaseException):
4445
"""
@@ -775,6 +776,31 @@ async def subscribe(self, topic: Union[str, List[str]],
775776

776777
schema.attach_client(self._client)
777778
return Consumer(await future, schema)
779+
780+
async def get_topic_partitions(self, topic: str) -> List[str]:
781+
"""
782+
Get the list of partitions for a given topic in asynchronous mode.
783+
784+
If the topic is partitioned, this will return a list of partition names. If the topic is not partitioned, the returned list will contain the topic name itself.
785+
786+
This can be used to discover the partitions and create Reader, Consumer or Producer instances directly on a particular partition.
787+
788+
Parameters
789+
----------
790+
791+
topic: str
792+
the topic name to lookup
793+
794+
Returns
795+
-------
796+
list
797+
a list of partition names
798+
"""
799+
_check_type(str, topic, 'topic')
800+
future = asyncio.get_running_loop().create_future()
801+
self._client.get_topic_partitions_async(topic, functools.partial(_set_future, future))
802+
id = await future
803+
return id
778804

779805
async def close(self) -> None:
780806
"""

src/client.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ std::vector<std::string> Client_getTopicPartitions(Client& client, const std::st
6565
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
6666
}
6767

68+
void Client_getTopicPartitionsAsync(Client &client, const std::string& topic, GetPartitionsCallback callback) {
69+
py::gil_scoped_release release;
70+
client.getPartitionsForTopicAsync(topic, callback);
71+
}
72+
6873
SchemaInfo Client_getSchemaInfo(Client& client, const std::string& topic, int64_t version) {
6974
return waitForAsyncValue<SchemaInfo>([&](std::function<void(Result, const SchemaInfo&)> callback) {
7075
client.getSchemaInfoAsync(topic, version, callback);
@@ -119,6 +124,7 @@ void export_client(py::module_& m) {
119124
.def("get_schema_info", &Client_getSchemaInfo)
120125
.def("close", &Client_close)
121126
.def("close_async", &Client_closeAsync)
127+
.def("get_topic_partitions_async", &Client_getTopicPartitionsAsync)
122128
.def("subscribe_async", &Client_subscribeAsync)
123129
.def("subscribe_async_topics", &Client_subscribeAsync_topics)
124130
.def("subscribe_async_pattern", &Client_subscribeAsync_pattern)

tests/asyncio_test.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,29 @@
4747
String,
4848
)
4949

50+
from urllib.request import urlopen, Request
51+
5052
SERVICE_URL = 'pulsar://localhost:6650'
53+
ADMIN_URL = "http://localhost:8080"
54+
TIMEOUT_MS = 10000 # Do not wait forever in tests
55+
56+
def doHttpPost(url, data):
57+
req = Request(url, data.encode())
58+
req.add_header("Content-Type", "application/json")
59+
urlopen(req)
60+
61+
def doHttpPut(url, data):
62+
try:
63+
req = Request(url, data.encode())
64+
req.add_header("Content-Type", "application/json")
65+
req.get_method = lambda: "PUT"
66+
urlopen(req)
67+
except Exception as ex:
68+
# ignore conflicts exception to have test idempotency
69+
if "409" in str(ex):
70+
pass
71+
else:
72+
raise ex
5173

5274
class AsyncioTest(IsolatedAsyncioTestCase):
5375
"""Test cases for asyncio Pulsar client."""
@@ -133,6 +155,48 @@ async def test_flush(self):
133155
self.assertEqual(msg_id0.batch_index(), 0)
134156
self.assertEqual(msg_id1.batch_index(), 1)
135157

158+
async def test_get_topics_partitions(self):
159+
topic_partitioned = "persistent://public/default/test_get_topics_partitions_async"
160+
topic_non_partitioned = "persistent://public/default/test_get_topics_async_not-partitioned"
161+
162+
url1 = ADMIN_URL + "/admin/v2/persistent/public/default/test_get_topics_partitions_async/partitions"
163+
doHttpPut(url1, "3")
164+
165+
self.assertEqual(
166+
await self._client.get_topic_partitions(topic_partitioned),
167+
[
168+
"persistent://public/default/test_get_topics_partitions_async-partition-0",
169+
"persistent://public/default/test_get_topics_partitions_async-partition-1",
170+
"persistent://public/default/test_get_topics_partitions_async-partition-2",
171+
],
172+
)
173+
self.assertEqual(await self._client.get_topic_partitions(topic_non_partitioned), [topic_non_partitioned])
174+
175+
async def test_get_partitioned_topic_name(self):
176+
url1 = ADMIN_URL + "/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions"
177+
doHttpPut(url1, "3")
178+
179+
partitions = [
180+
"persistent://public/default/partitioned_topic_name_test-partition-0",
181+
"persistent://public/default/partitioned_topic_name_test-partition-1",
182+
"persistent://public/default/partitioned_topic_name_test-partition-2",
183+
]
184+
self.assertEqual(
185+
await self._client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test"), partitions
186+
)
187+
188+
consumer = await self._client.subscribe(
189+
"persistent://public/default/partitioned_topic_name_test",
190+
"partitioned_topic_name_test_sub",
191+
consumer_type=pulsar.ConsumerType.Shared,
192+
)
193+
producer = await self._client.create_producer("persistent://public/default/partitioned_topic_name_test")
194+
await producer.send(b"hello")
195+
196+
async with asyncio.timeout(TIMEOUT_MS / 1000):
197+
msg = await consumer.receive()
198+
self.assertTrue(msg.topic_name() in partitions)
199+
136200
async def test_create_producer_failure(self):
137201
try:
138202
await self._client.create_producer('tenant/ns/asyncio-test-send-failure')

0 commit comments

Comments
 (0)