|
47 | 47 | String, |
48 | 48 | ) |
49 | 49 |
|
| 50 | +from urllib.request import urlopen, Request |
| 51 | + |
50 | 52 | SERVICE_URL = 'pulsar://localhost:6650' |
| 53 | +ADMIN_URL = "http://localhost:8080" |
| 54 | +TIMEOUT_MS = 10000 # Do not wait forever in tests |
| 55 | + |
| 56 | +def doHttpPost(url, data): |
| 57 | + req = Request(url, data.encode()) |
| 58 | + req.add_header("Content-Type", "application/json") |
| 59 | + urlopen(req) |
| 60 | + |
| 61 | +def doHttpPut(url, data): |
| 62 | + try: |
| 63 | + req = Request(url, data.encode()) |
| 64 | + req.add_header("Content-Type", "application/json") |
| 65 | + req.get_method = lambda: "PUT" |
| 66 | + urlopen(req) |
| 67 | + except Exception as ex: |
| 68 | + # ignore conflicts exception to have test idempotency |
| 69 | + if "409" in str(ex): |
| 70 | + pass |
| 71 | + else: |
| 72 | + raise ex |
51 | 73 |
|
52 | 74 | class AsyncioTest(IsolatedAsyncioTestCase): |
53 | 75 | """Test cases for asyncio Pulsar client.""" |
@@ -133,6 +155,47 @@ async def test_flush(self): |
133 | 155 | self.assertEqual(msg_id0.batch_index(), 0) |
134 | 156 | self.assertEqual(msg_id1.batch_index(), 1) |
135 | 157 |
|
| 158 | + async def test_get_topics_partitions(self): |
| 159 | + topic_partitioned = "persistent://public/default/test_get_topics_partitions_async" |
| 160 | + topic_non_partitioned = "persistent://public/default/test_get_topics_async_not-partitioned" |
| 161 | + |
| 162 | + url1 = ADMIN_URL + "/admin/v2/persistent/public/default/test_get_topics_partitions_async/partitions" |
| 163 | + doHttpPut(url1, "3") |
| 164 | + |
| 165 | + self.assertEqual( |
| 166 | + await self._client.get_topic_partitions(topic_partitioned), |
| 167 | + [ |
| 168 | + "persistent://public/default/test_get_topics_partitions_async-partition-0", |
| 169 | + "persistent://public/default/test_get_topics_partitions_async-partition-1", |
| 170 | + "persistent://public/default/test_get_topics_partitions_async-partition-2", |
| 171 | + ], |
| 172 | + ) |
| 173 | + self.assertEqual(await self._client.get_topic_partitions(topic_non_partitioned), [topic_non_partitioned]) |
| 174 | + |
| 175 | + async def test_get_partitioned_topic_name(self): |
| 176 | + url1 = ADMIN_URL + "/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions" |
| 177 | + doHttpPut(url1, "3") |
| 178 | + |
| 179 | + partitions = [ |
| 180 | + "persistent://public/default/partitioned_topic_name_test-partition-0", |
| 181 | + "persistent://public/default/partitioned_topic_name_test-partition-1", |
| 182 | + "persistent://public/default/partitioned_topic_name_test-partition-2", |
| 183 | + ] |
| 184 | + self.assertEqual( |
| 185 | + await self._client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test"), partitions |
| 186 | + ) |
| 187 | + |
| 188 | + consumer = await self._client.subscribe( |
| 189 | + "persistent://public/default/partitioned_topic_name_test", |
| 190 | + "partitioned_topic_name_test_sub", |
| 191 | + consumer_type=pulsar.ConsumerType.Shared, |
| 192 | + ) |
| 193 | + producer = await self._client.create_producer("persistent://public/default/partitioned_topic_name_test") |
| 194 | + await producer.send(b"hello") |
| 195 | + |
| 196 | + msg = await asyncio.wait_for(consumer.receive(), TIMEOUT_MS / 1000) |
| 197 | + self.assertTrue(msg.topic_name() in partitions) |
| 198 | + |
136 | 199 | async def test_create_producer_failure(self): |
137 | 200 | try: |
138 | 201 | await self._client.create_producer('tenant/ns/asyncio-test-send-failure') |
|
0 commit comments