Skip to content

Commit

Permalink
[Issue 14052] Fixes use default KEY_SHARE ``ConsistentHashingStickyKe…
Browse files Browse the repository at this point in the history
…yConsumerSelector`` cause flaky-test (apache#14068)

* Fixes issue 14052

* Fixes issue 14052

* Rollback some changes.

* Revert "Fixes issue 14052"

This reverts commit f06a465.
  • Loading branch information
mattisonchao authored Feb 4, 2022
1 parent 9497305 commit cc4e352
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
6 changes: 6 additions & 0 deletions pulsar-testclient/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;

@Slf4j
Expand Down Expand Up @@ -84,12 +84,13 @@ public void testMsgKey() throws Exception {
e.printStackTrace();
}
});
thread.start();
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1")
.subscriptionType(SubscriptionType.Key_Shared).subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1")
.subscriptionType(SubscriptionType.Key_Shared).subscribe();

thread.start();

int count1 = 0;
int count2 = 0;
for (int i = 0; i < 10; i++) {
Expand Down Expand Up @@ -129,32 +130,32 @@ public void testMsgKey() throws Exception {
e.printStackTrace();
}
});
thread2.start();

Consumer newConsumer1 = pulsarClient.newConsumer().topic(topic2).subscriptionName("sub-2")
Consumer<byte[]> newConsumer1 = pulsarClient.newConsumer().topic(topic2).subscriptionName("sub-2")
.subscriptionType(SubscriptionType.Key_Shared).subscribe();
Consumer newConsumer2 = pulsarClient.newConsumer().topic(topic2).subscriptionName("sub-2")
Consumer<byte[]> newConsumer2 = pulsarClient.newConsumer().topic(topic2).subscriptionName("sub-2")
.subscriptionType(SubscriptionType.Key_Shared).subscribe();
count1 = 0;
count2 = 0;
for (int i = 0; i < 10; i++) {
Message<byte[]> message = newConsumer1.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count1++;
newConsumer1.acknowledge(message);
}
for (int i = 0; i < 10; i++) {
Message<byte[]> message = newConsumer2.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count2++;
newConsumer2.acknowledge(message);
}

Assert.assertTrue(count1 > 0 && count2 > 0);
thread2.start();

Awaitility.await()
.untilAsserted(() -> {
Message<byte[]> message = newConsumer1.receive(1, TimeUnit.SECONDS);
if (message != null) {
newConsumer1.acknowledge(message);
}
assertNotNull(message);
});

Awaitility.await()
.untilAsserted(() -> {
Message<byte[]> message = newConsumer2.receive(1, TimeUnit.SECONDS);
if (message != null) {
newConsumer2.acknowledge(message);
}
assertNotNull(message);
});

thread2.interrupt();
newConsumer1.close();
newConsumer2.close();
Expand Down

0 comments on commit cc4e352

Please sign in to comment.