Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flaky test] Fix unit tests that occasionally fail #9226

Merged
merged 3 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -33,6 +35,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -115,13 +118,17 @@ public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
.create();

admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
//zk update takes some time
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2));

admin.topics().updatePartitionedTopic(partitionedTopicName, 10);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10));

admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
protected void setup() throws Exception {
resetConfig();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setTtlDurationDefaultInSeconds(3600);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
resetConfig();
conf.setLoadBalancerEnabled(true);
super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;

public class ConsumedLedgersTrimTest extends BrokerTestBase {

Expand All @@ -45,9 +46,10 @@ protected void setup() throws Exception {
//No-op
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
//No-op
super.internalCleanup();
}

@Test
Expand Down Expand Up @@ -101,7 +103,7 @@ public void TestConsumedLedgersTrim() throws Exception {


@Test
public void TestConsumedLedgersTrimNoSubscriptions() throws Exception {
public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
conf.setRetentionCheckIntervalInSeconds(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
super.baseSetup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected void setup() throws Exception {
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
resetConfig();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ void setup() throws Exception {
@AfterClass(alwaysRun = true, timeOut = 300000)
void shutdown() throws Exception {
super.shutdown();
resetConfig1();
resetConfig2();
resetConfig3();
}

enum DispatchRateType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,7 @@ void setup() throws Exception {
// NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have
// completely
// independent config objects instead of referring to the same properties object
config1.setClusterName("r1");
config1.setAdvertisedAddress("localhost");
config1.setWebServicePort(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config1.setDefaultNumberOfNamespaceBundles(1);
config1.setAllowAutoTopicCreationType("non-partitioned");
setConfig1DefaultValue();
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
Expand All @@ -141,23 +125,7 @@ void setup() throws Exception {
bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble2.start();

config2.setClusterName("r2");
config2.setAdvertisedAddress("localhost");
config2.setWebServicePort(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config2.setDefaultNumberOfNamespaceBundles(1);
config2.setAllowAutoTopicCreationType("non-partitioned");
setConfig2DefaultValue();
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
Expand All @@ -172,23 +140,7 @@ void setup() throws Exception {
bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble3.start();

config3.setClusterName("r3");
config3.setAdvertisedAddress("localhost");
config3.setWebServicePort(Optional.of(0));
config3.setWebServicePortTls(Optional.of(0));
config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config3.setBrokerServicePort(Optional.of(0));
config3.setBrokerServicePortTls(Optional.of(0));
config3.setTlsEnabled(true);
config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setDefaultNumberOfNamespaceBundles(1);
config3.setAllowAutoTopicCreationType("non-partitioned");
setConfig3DefaultValue();
pulsar3 = new PulsarService(config3);
pulsar3.start();
ns3 = pulsar3.getBrokerService();
Expand Down Expand Up @@ -227,6 +179,81 @@ void setup() throws Exception {

}

private void setConfig3DefaultValue() {
config3.setClusterName("r3");
config3.setAdvertisedAddress("localhost");
config3.setWebServicePort(Optional.of(0));
config3.setWebServicePortTls(Optional.of(0));
config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config3.setBrokerServicePort(Optional.of(0));
config3.setBrokerServicePortTls(Optional.of(0));
config3.setTlsEnabled(true);
config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config3.setDefaultNumberOfNamespaceBundles(1);
config3.setAllowAutoTopicCreationType("non-partitioned");
}

public void setConfig1DefaultValue(){
config1.setClusterName("r1");
config1.setAdvertisedAddress("localhost");
config1.setWebServicePort(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config1.setDefaultNumberOfNamespaceBundles(1);
config1.setAllowAutoTopicCreationType("non-partitioned");
}

public void setConfig2DefaultValue() {
config2.setClusterName("r2");
config2.setAdvertisedAddress("localhost");
config2.setWebServicePort(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config2.setDefaultNumberOfNamespaceBundles(1);
config2.setAllowAutoTopicCreationType("non-partitioned");
}

public void resetConfig1() {
config1 = new ServiceConfiguration();
setConfig1DefaultValue();
}

public void resetConfig2() {
config2 = new ServiceConfiguration();
setConfig2DefaultValue();
}

public void resetConfig3() {
config3 = new ServiceConfiguration();
setConfig3DefaultValue();
}

private int inSec(int time, TimeUnit unit) {
return (int) TimeUnit.SECONDS.convert(time, unit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public void testDuplicationSnapshotApi() throws Exception {
}

@Test(timeOut = 30000)
private void testTopicPolicyTakeSnapshot() throws Exception {
public void testTopicPolicyTakeSnapshot() throws Exception {
resetConfig();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(5);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(7);
conf.setBrokerDeduplicationEntriesInterval(20000);
super.internalCleanup();
super.internalSetup();
Expand All @@ -124,10 +124,10 @@ private void testTopicPolicyTakeSnapshot() throws Exception {
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
waitCacheInit(topicName);
admin.topics().setDeduplicationSnapshotInterval(topicName, 1);
admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 2);
admin.topics().setDeduplicationSnapshotInterval(topicName, 3);
admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 5);

int msgNum = 50;
int msgNum = 10;
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 0; i < msgNum; i++) {
producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
Expand All @@ -139,40 +139,40 @@ private void testTopicPolicyTakeSnapshot() throws Exception {
.getManagedLedger().getLastConfirmedEntry();
assertEquals(seqId, msgNum - 1);
assertEquals(position.getEntryId(), msgNum - 1);
//The first time, use topic-leve policies, 1 second delay + 1 second interval
Awaitility.await().atMost(2100, TimeUnit.MILLISECONDS)
//The first time, use topic-leve policies, 1 second delay + 3 second interval
Awaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum - 1);
ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
assertEquals(position, markDeletedPosition);

//remove topic-level policies, namespace-level should be used, interval becomes 2 seconds
//remove topic-level policies, namespace-level should be used, interval becomes 5 seconds
admin.topics().removeDeduplicationSnapshotInterval(topicName);
producer.newMessage().value("msg").send();
//zk update time + interval time
Awaitility.await().atMost( 3000, TimeUnit.MILLISECONDS)
//zk update time + 5 second interval time
Awaitility.await().atMost( 7, TimeUnit.SECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
assertEquals(msgNum, markDeletedPosition.getEntryId());
assertEquals(position, markDeletedPosition);

//4 remove namespace-level policies, broker-level should be used, interval becomes 2 seconds
//4 remove namespace-level policies, broker-level should be used, interval becomes 3 seconds
admin.namespaces().removeDeduplicationSnapshotInterval(myNamespace);
Awaitility.await().atMost(2, TimeUnit.SECONDS)
Awaitility.await().atMost(4, TimeUnit.SECONDS)
.until(() -> (admin.namespaces().getDeduplicationSnapshotInterval(myNamespace) == null));
producer.newMessage().value("msg").send();
//ensure that the time exceeds the scheduling interval of ns and topic, but no snapshot is generated
Thread.sleep(3000);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
// broker-level interval is 5 seconds, so 3 seconds will not take a snapshot
// broker-level interval is 7 seconds, so 3 seconds will not take a snapshot
assertNotEquals(msgNum + 1, markDeletedPosition.getEntryId());
assertNotEquals(position, markDeletedPosition);
// wait for scheduler
Awaitility.await().atMost(3, TimeUnit.SECONDS)
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
.getMarkDeletedPosition()).getEntryId() == msgNum + 1);
markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
Expand Down