11package org .testcontainers .containers ;
22
3+ import org .apache .pulsar .client .admin .ListTopicsOptions ;
34import org .apache .pulsar .client .admin .PulsarAdmin ;
45import org .apache .pulsar .client .admin .PulsarAdminException ;
56import org .apache .pulsar .client .api .Consumer ;
1314import org .testcontainers .utility .DockerImageName ;
1415
1516import java .time .Duration ;
17+ import java .util .List ;
1618import java .util .concurrent .CompletableFuture ;
1719import java .util .concurrent .TimeUnit ;
1820
@@ -23,14 +25,14 @@ public class PulsarContainerTest {
2325
2426 public static final String TEST_TOPIC = "test_topic" ;
2527
26- private static final DockerImageName PULSAR_IMAGE = DockerImageName .parse ("apachepulsar/pulsar:2.10 .0" );
28+ private static final DockerImageName PULSAR_IMAGE = DockerImageName .parse ("apachepulsar/pulsar:3.0 .0" );
2729
2830 @ Test
2931 public void testUsage () throws Exception {
3032 try (
3133 // do not use PULSAR_IMAGE to make the doc looks easier
3234 // constructorWithVersion {
33- PulsarContainer pulsar = new PulsarContainer (DockerImageName .parse ("apachepulsar/pulsar:2.10 .0" ));
35+ PulsarContainer pulsar = new PulsarContainer (DockerImageName .parse ("apachepulsar/pulsar:3.0 .0" ));
3436 // }
3537 ) {
3638 pulsar .start ();
@@ -103,31 +105,26 @@ public void testTransactions() throws Exception {
103105 pulsar .start ();
104106
105107 try (PulsarAdmin pulsarAdmin = PulsarAdmin .builder ().serviceHttpUrl (pulsar .getHttpServiceUrl ()).build ()) {
106- assertThat (
107- pulsarAdmin
108- .topics ()
109- .getList ("pulsar/system" )
110- .contains ("persistent://pulsar/system/transaction_coordinator_assign-partition-0" )
111- )
112- .isTrue ();
108+ assertTransactionsTopicCreated (pulsarAdmin );
113109 }
114110 testTransactionFunctionality (pulsar .getPulsarBrokerUrl ());
115111 }
116112 }
117113
114+ private void assertTransactionsTopicCreated (PulsarAdmin pulsarAdmin ) throws PulsarAdminException {
115+ final List <String > topics = pulsarAdmin
116+ .topics ()
117+ .getPartitionedTopicList ("pulsar/system" , ListTopicsOptions .builder ().includeSystemTopic (true ).build ());
118+ assertThat (topics ).contains ("persistent://pulsar/system/transaction_coordinator_assign" );
119+ }
120+
118121 @ Test
119122 public void testTransactionsAndFunctionsWorker () throws Exception {
120123 try (PulsarContainer pulsar = new PulsarContainer (PULSAR_IMAGE ).withTransactions ().withFunctionsWorker ()) {
121124 pulsar .start ();
122125
123126 try (PulsarAdmin pulsarAdmin = PulsarAdmin .builder ().serviceHttpUrl (pulsar .getHttpServiceUrl ()).build ();) {
124- assertThat (
125- pulsarAdmin
126- .topics ()
127- .getList ("pulsar/system" )
128- .contains ("persistent://pulsar/system/transaction_coordinator_assign-partition-0" )
129- )
130- .isTrue ();
127+ assertTransactionsTopicCreated (pulsarAdmin );
131128 assertThat (pulsarAdmin .functions ().getFunctions ("public" , "default" )).hasSize (0 );
132129 }
133130 testTransactionFunctionality (pulsar .getPulsarBrokerUrl ());
0 commit comments