Skip to content

Commit

Permalink
[flaky test] Fix unit tests that occasionally fail (apache#9226)
Browse files Browse the repository at this point in the history
* fix unit test

* fix unit test

* fix unit test
  • Loading branch information
315157973 authored Jan 19, 2021
1 parent 6c8c127 commit 63acd20
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -33,6 +35,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -115,13 +118,17 @@ public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
.create();

admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
//zk update takes some time
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2));

admin.topics().updatePartitionedTopic(partitionedTopicName, 10);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10));

admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
protected void setup() throws Exception {
resetConfig();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setTtlDurationDefaultInSeconds(3600);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
resetConfig();
conf.setLoadBalancerEnabled(true);
super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;

public class ConsumedLedgersTrimTest extends BrokerTestBase {

Expand All @@ -45,9 +46,10 @@ protected void setup() throws Exception {
//No-op
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
//No-op
super.internalCleanup();
}

@Test
Expand Down Expand Up @@ -101,7 +103,7 @@ public void TestConsumedLedgersTrim() throws Exception {


@Test
public void TestConsumedLedgersTrimNoSubscriptions() throws Exception {
public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
conf.setRetentionCheckIntervalInSeconds(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
super.baseSetup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected void setup() throws Exception {
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
resetConfig();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ void setup() throws Exception {
@AfterClass(alwaysRun = true, timeOut = 300000)
void shutdown() throws Exception {
super.shutdown();
resetConfig1();
resetConfig2();
resetConfig3();
}

enum DispatchRateType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,7 @@ void setup() throws Exception {
// NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have
// completely
// independent config objects instead of referring to the same properties object
config1.setClusterName("r1");
config1.setAdvertisedAddress("localhost");
config1.setWebServicePort(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config1.setDefaultNumberOfNamespaceBundles(1);
config1.setAllowAutoTopicCreationType("non-partitioned");
setConfig1DefaultValue();
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
Expand All @@ -141,23 +125,7 @@ void setup() throws Exception {
bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble2.start();

config2.setClusterName("r2");
config2.setAdvertisedAddress("localhost");
config2.setWebServicePort(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config2.setDefaultNumberOfNamespaceBundles(1);
config2.setAllowAutoTopicCreationType("non-partitioned");
setConfig2DefaultValue();
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
Expand All @@ -172,23 +140,7 @@ void setup() throws Exception {
bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble3.start();

config3.setClusterName("r3");
config3.setAdvertisedAddress("localhost");
config3.setWebServicePort(Optional.of(0));
config3.setWebServicePortTls(Optional.of(0));
config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config3.setBrokerServicePort(Optional.of(0));
config3.setBrokerServicePortTls(Optional.of(0));
config3.setTlsEnabled(true);
config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setDefaultNumberOfNamespaceBundles(1);
config3.setAllowAutoTopicCreationType("non-partitioned");
setConfig3DefaultValue();
pulsar3 = new PulsarService(config3);
pulsar3.start();
ns3 = pulsar3.getBrokerService();
Expand Down Expand Up @@ -227,6 +179,81 @@ void setup() throws Exception {

}

private void setConfig3DefaultValue() {
config3.setClusterName("r3");
config3.setAdvertisedAddress("localhost");
config3.setWebServicePort(Optional.of(0));
config3.setWebServicePortTls(Optional.of(0));
config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config3.setBrokerServicePort(Optional.of(0));
config3.setBrokerServicePortTls(Optional.of(0));
config3.setTlsEnabled(true);
config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setDefaultNumberOfNamespaceBundles(1);
config3.setAllowAutoTopicCreationType("non-partitioned");
}

public void setConfig1DefaultValue(){
config1.setClusterName("r1");
config1.setAdvertisedAddress("localhost");
config1.setWebServicePort(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config1.setDefaultNumberOfNamespaceBundles(1);
config1.setAllowAutoTopicCreationType("non-partitioned");
}

public void setConfig2DefaultValue() {
config2.setClusterName("r2");
config2.setAdvertisedAddress("localhost");
config2.setWebServicePort(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config2.setDefaultNumberOfNamespaceBundles(1);
config2.setAllowAutoTopicCreationType("non-partitioned");
}

public void resetConfig1() {
config1 = new ServiceConfiguration();
setConfig1DefaultValue();
}

public void resetConfig2() {
config2 = new ServiceConfiguration();
setConfig2DefaultValue();
}

public void resetConfig3() {
config3 = new ServiceConfiguration();
setConfig3DefaultValue();
}

private int inSec(int time, TimeUnit unit) {
return (int) TimeUnit.SECONDS.convert(time, unit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public void testDuplicationSnapshotApi() throws Exception {
}

@Test(timeOut = 30000)
private void testTopicPolicyTakeSnapshot() throws Exception {
public void testTopicPolicyTakeSnapshot() throws Exception {
resetConfig();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(5);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(7);
conf.setBrokerDeduplicationEntriesInterval(20000);
super.internalCleanup();
super.internalSetup();
Expand All @@ -124,10 +124,10 @@ private void testTopicPolicyTakeSnapshot() throws Exception {
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
waitCacheInit(topicName);
admin.topics().setDeduplicationSnapshotInterval(topicName, 1);
admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 2);
admin.topics().setDeduplicationSnapshotInterval(topicName, 3);
admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 5);

int msgNum = 50;
int msgNum = 10;
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 0; i < msgNum; i++) {
producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
Expand All @@ -139,40 +139,40 @@ private void testTopicPolicyTakeSnapshot() throws Exception {
.getManagedLedger().getLastConfirmedEntry();
assertEquals(seqId, msgNum - 1);
assertEquals(position.getEntryId(), msgNum - 1);
//The first time, use topic-leve policies, 1 second delay + 1 second interval
Awaitility.await().atMost(2100, TimeUnit.MILLISECONDS)
//The first time, use topic-leve policies, 1 second delay + 3 second interval
Awaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum - 1);
ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
assertEquals(position, markDeletedPosition);

//remove topic-level policies, namespace-level should be used, interval becomes 2 seconds
//remove topic-level policies, namespace-level should be used, interval becomes 5 seconds
admin.topics().removeDeduplicationSnapshotInterval(topicName);
producer.newMessage().value("msg").send();
//zk update time + interval time
Awaitility.await().atMost( 3000, TimeUnit.MILLISECONDS)
//zk update time + 5 second interval time
Awaitility.await().atMost( 7, TimeUnit.SECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
assertEquals(msgNum, markDeletedPosition.getEntryId());
assertEquals(position, markDeletedPosition);

//4 remove namespace-level policies, broker-level should be used, interval becomes 2 seconds
//4 remove namespace-level policies, broker-level should be used, interval becomes 3 seconds
admin.namespaces().removeDeduplicationSnapshotInterval(myNamespace);
Awaitility.await().atMost(2, TimeUnit.SECONDS)
Awaitility.await().atMost(4, TimeUnit.SECONDS)
.until(() -> (admin.namespaces().getDeduplicationSnapshotInterval(myNamespace) == null));
producer.newMessage().value("msg").send();
//ensure that the time exceeds the scheduling interval of ns and topic, but no snapshot is generated
Thread.sleep(3000);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
// broker-level interval is 5 seconds, so 3 seconds will not take a snapshot
// broker-level interval is 7 seconds, so 3 seconds will not take a snapshot
assertNotEquals(msgNum + 1, markDeletedPosition.getEntryId());
assertNotEquals(position, markDeletedPosition);
// wait for scheduler
Awaitility.await().atMost(3, TimeUnit.SECONDS)
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum + 1);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
Expand Down

0 comments on commit 63acd20

Please sign in to comment.