Skip to content

Commit

Permalink
[Integration Tests] Improve and fix integration tests (#2356)
Browse files Browse the repository at this point in the history
* [Integration Tests] Improve and fix integration tests

 ### Motivation

Integration test are basically just not working in apache CI. there are multiple reasons:

1. PULSAR_MEM is set to very high (~2G). If we start more than 6 containers, it will quickly go up to ~12GB.
2. We start containers for each tests, which is very inefficient.
3. There is a regression from #2214. #2214 tried to listen on a hostname that is configured in worker config,
  which the hostname is internally to docker environment.

 ### Changes

1. Set PULSAR_MEM to use no more than 128M.
2. Switch to use test suite, trying to start containers only once as possible and use them across test suites.
   so we only start the cluster for a set of tests. hence reorganize those tests and get rid
   of using dataProvider, which doesn't work well with current approach.
3. Remove binding to hostname in WorkerServer, so it binds to all interfaces.

* remove suite xml

* Avoid running testng suites multiple times
  • Loading branch information
sijie authored Aug 10, 2018
1 parent b6feac8 commit d248cee
Show file tree
Hide file tree
Showing 36 changed files with 1,257 additions and 1,049 deletions.
2 changes: 1 addition & 1 deletion conf/pulsar_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
# PULSAR_GLOBAL_ZK_CONF=

# Extra options to be passed to the jvm
PULSAR_MEM=" -Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"
PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}

# Garbage collection options
PULSAR_GC=" -XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB"
Expand Down
2 changes: 1 addition & 1 deletion conf/pulsar_tools_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
# PULSAR_GLOBAL_ZK_CONF=

# Extra options to be passed to the jvm
PULSAR_MEM=${PULSAR_MEM:-"-Xmx256m -XX:MaxDirectMemorySize=256m"}
PULSAR_MEM=${PULSAR_TOOLS_MEM:-"-Xmx128m -XX:MaxDirectMemorySize=128m"}

# Garbage collection options
PULSAR_GC=" -client "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.pulsar.broker.web.AuthenticationFilter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
Expand Down Expand Up @@ -90,7 +89,6 @@ private void init() {
List<ServerConnector> connectors = new ArrayList<>();
ServerConnector connector = new ServerConnector(server, 1, 1);
connector.setPort(this.workerConfig.getWorkerPort());
connector.setHost(this.workerConfig.getWorkerHostname());
connectors.add(connector);

List<Handler> handlers = new ArrayList<>(3);
Expand All @@ -114,7 +112,6 @@ private void init() {
this.workerConfig.isTlsRequireTrustedClientCertOnConnect());
ServerConnector tlsConnector = new ServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(this.workerConfig.getWorkerPortTls());
tlsConnector.setHost(this.workerConfig.getWorkerHostname());
connectors.add(tlsConnector);
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
Expand Down
4 changes: 3 additions & 1 deletion tests/docker-images/latest-version-image/conf/bookie.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/bookie.log
directory=/pulsar
environment=PULSAR_MEM=-Xms128M
environment=PULSAR_MEM=-Xmx128M
environment=dbStorage_writeCacheMaxSizeMb=16
environment=dbStorage_readAheadCacheMaxSizeMb=16
command=/pulsar/bin/pulsar bookie
2 changes: 1 addition & 1 deletion tests/docker-images/latest-version-image/conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/broker.log
directory=/pulsar
environment=PULSAR_MEM=-Xms128M
environment=PULSAR_MEM=-Xmx128M
command=/pulsar/bin/pulsar broker

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/global-zk.log
directory=/pulsar
environment=PULSAR_MEM=-Xms128M
environment=PULSAR_MEM=-Xmx128M
command=/pulsar/bin/pulsar configuration-store

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/local-zk.log
directory=/pulsar
environment=PULSAR_MEM=-Xms128M
environment=PULSAR_MEM=-Xmx128M
command=/pulsar/bin/pulsar zookeeper

2 changes: 1 addition & 1 deletion tests/docker-images/latest-version-image/conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/proxy.log
directory=/pulsar
environment=PULSAR_MEM=-Xms128M
environment=PULSAR_MEM=-Xmx128M
command=/pulsar/bin/pulsar proxy

4 changes: 4 additions & 0 deletions tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@
-Dio.netty.leakDetectionLevel=advanced
</argLine>
<skipTests>false</skipTests>
<suiteXmlFiles>
<file>src/test/resources/pulsar.xml</file>
</suiteXmlFiles>
<forkCount>1</forkCount>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@
*/
package org.apache.pulsar.tests.integration.cli;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testng.annotations.Test;

/**
* Test Pulsar CLI.
*/
public class CLITest extends PulsarClusterTestBase {
public class CLITest extends PulsarTestSuite {

@Test
public void testDeprecatedCommands() throws Exception {
Expand Down Expand Up @@ -68,7 +66,7 @@ public void testCreateSubscriptionCommand() throws Exception {
for (BrokerContainer container : pulsarCluster.getBrokers()) {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"persistent",
"topics",
"create-subscription",
"persistent://public/default/" + topic,
"--subscription",
Expand Down Expand Up @@ -98,7 +96,7 @@ public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Excep
// terminate the topic
result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"persistent",
"topics",
"terminate",
topicName);
assertTrue(result.getStdout().contains("Topic succesfully terminated at"));
Expand Down Expand Up @@ -209,4 +207,5 @@ public void testSetInfiniteRetention() throws Exception {
result.getStdout().contains("\"retentionSizeInMB\" : -1"),
result.getStdout());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
*/
package org.apache.pulsar.tests.integration.compaction;

import static org.testng.Assert.assertEquals;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;

public class TestCompaction extends PulsarClusterTestBase {
/**
* Test cases for compaction.
*/
public class TestCompaction extends PulsarTestSuite {

@Test(dataProvider = "ServiceUrls")
public void testPublishCompactAndConsumeCLI(String serviceUrl) throws Exception {
Expand All @@ -45,29 +48,42 @@ public void testPublishCompactAndConsumeCLI(String serviceUrl) throws Exception
try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build()) {
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();

try(Producer<byte[]> producer = client.newProducer().topic(topic).create()) {
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
try(Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic).create()) {
producer.newMessage()
.key("key0")
.value("content0")
.send();
producer.newMessage()
.key("key0")
.value("content1")
.send();
}

try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe()) {
Message<byte[]> m = consumer.receive();
try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.readCompacted(true)
.subscriptionName("sub1")
.subscribe()) {
Message<String> m = consumer.receive();
assertEquals(m.getKey(), "key0");
assertEquals(m.getData(), "content0".getBytes());
assertEquals(m.getValue(), "content0");

m = consumer.receive();
assertEquals(m.getKey(), "key0");
assertEquals(m.getData(), "content1".getBytes());
assertEquals(m.getValue(), "content1");
}

pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", "-t", topic);

try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe()) {
Message<byte[]> m = consumer.receive();
try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.readCompacted(true)
.subscriptionName("sub1")
.subscribe()) {
Message<String> m = consumer.receive();
assertEquals(m.getKey(), "key0");
assertEquals(m.getData(), "content1".getBytes());
assertEquals(m.getValue(), "content1");
}
}
}
Expand All @@ -89,54 +105,60 @@ public void testPublishCompactAndConsumeRest(String serviceUrl) throws Exception
try (PulsarClient client = PulsarClient.create(serviceUrl)) {
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();

try(Producer<byte[]> producer = client.newProducer().topic(topic).create()) {
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
try(Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).create()) {
producer.newMessage()
.key("key0")
.value("content0")
.send();
producer.newMessage()
.key("key0")
.value("content1")
.send();
}

try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
try (Consumer<String> consumer = client.newConsumer(Schema.STRING).topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe()) {
Message<byte[]> m = consumer.receive();
Message<String> m = consumer.receive();
assertEquals(m.getKey(), "key0");
assertEquals(m.getData(), "content0".getBytes());
assertEquals(m.getValue(), "content0");

m = consumer.receive();
assertEquals(m.getKey(), "key0");
assertEquals(m.getData(), "content1".getBytes());
assertEquals(m.getValue(), "content1");
}
pulsarCluster.runAdminCommandOnAnyBroker("persistent",
pulsarCluster.runAdminCommandOnAnyBroker("topics",
"compact", topic);

pulsarCluster.runAdminCommandOnAnyBroker("persistent",
pulsarCluster.runAdminCommandOnAnyBroker("topics",
"compaction-status", "-w", topic);

try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
try (Consumer<String> consumer = client.newConsumer(Schema.STRING).topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe()) {
Message<byte[]> m = consumer.receive();
Message<String> m = consumer.receive();
assertEquals(m.getKey(), "key0");
assertEquals(m.getData(), "content1".getBytes());
assertEquals(m.getValue(), "content1");
}
}
}

private static void waitAndVerifyCompacted(PulsarClient client, String topic,
String sub, String expectedKey, String expectedValue) throws Exception {
for (int i = 0; i < 60; i++) {
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
try (Consumer<String> consumer = client.newConsumer(Schema.STRING).topic(topic)
.readCompacted(true).subscriptionName(sub).subscribe()) {
Message<byte[]> m = consumer.receive();
Message<String> m = consumer.receive();
assertEquals(m.getKey(), expectedKey);
if (new String(m.getData()).equals(expectedValue)) {
if (m.getValue() == expectedValue) {
break;
}
}
Thread.sleep(1000);
}
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
try (Consumer<String> consumer = client.newConsumer(Schema.STRING).topic(topic)
.readCompacted(true).subscriptionName(sub).subscribe()) {
Message<byte[]> m = consumer.receive();
Message<String> m = consumer.receive();
assertEquals(m.getKey(), expectedKey);
assertEquals(new String(m.getData()), expectedValue);
assertEquals(m.getValue(), expectedValue);
}
}

Expand All @@ -154,18 +176,27 @@ public void testPublishWithAutoCompaction(String serviceUrl) throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-compaction-threshold", "--threshold", "1", namespace);

try (PulsarClient client = PulsarClient.create(serviceUrl)) {
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();

try(Producer<byte[]> producer = client.newProducer().topic(topic).create()) {
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build()) {
client.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub1").subscribe().close();

try(Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).create()) {
producer.newMessage()
.key("key0")
.value("content0")
.send();
producer.newMessage()
.key("key0")
.value("content1")
.send();
}

waitAndVerifyCompacted(client, topic, "sub1", "key0", "content1");

try(Producer<byte[]> producer = client.newProducer().topic(topic).create()) {
producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
try(Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).create()) {
producer.newMessage()
.key("key0")
.value("content2")
.send();
}
waitAndVerifyCompacted(client, topic, "sub1", "key0", "content2");
}
Expand All @@ -191,4 +222,5 @@ private ContainerExecResult createNamespace(final String Ns) throws Exception {
return result;
}


}
Loading

0 comments on commit d248cee

Please sign in to comment.