Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -43,6 +45,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
Expand Down Expand Up @@ -106,7 +109,6 @@ public class PulsarFunctionLocalRunTest {
PulsarAdmin admin;
PulsarClient pulsarClient;
BrokerStats brokerStatsClient;
PulsarWorkerService functionsWorkerService;
final String tenant = "external-repl-prop";
String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
Expand Down Expand Up @@ -177,9 +179,25 @@ void setup(Method method) throws Exception {
config.setBrokerClientTlsEnabled(true);
config.setAllowAutoTopicCreationType("non-partitioned");

functionsWorkerService = createPulsarFunctionWorker(config);
workerConfig = createWorkerConfig(config);

Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
// populate builtin connectors folder
if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
File connectorsDir = new File(workerConfig.getConnectorsDirectory());

if (connectorsDir.exists()) {
FileUtils.deleteDirectory(connectorsDir);
}

if (connectorsDir.mkdir()) {
File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
} else {
throw new RuntimeException("Failed to create builtin connectors directory");
}
}

Optional<WorkerService> functionWorkerService = Optional.empty();
pulsar = new PulsarService(config, workerConfig, functionWorkerService, (exitCode) -> {});
pulsar.start();

Expand All @@ -199,9 +217,9 @@ void setup(Method method) throws Exception {
brokerStatsClient = admin.brokerStats();
primaryHost = pulsar.getWebServiceAddress();

// update cluster metadata
// create cluster metadata
ClusterData clusterData = new ClusterData(urlTls.toString());
admin.clusters().updateCluster(config.getClusterName(), clusterData);
admin.clusters().createCluster(config.getClusterName(), clusterData);

ClientBuilder clientBuilder = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl());
Expand All @@ -218,7 +236,7 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {
TenantInfo propAdmin = new TenantInfo();
propAdmin.getAdminRoles().add("superUser");
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList(CLUSTER)));
admin.tenants().updateTenant(tenant, propAdmin);
admin.tenants().createTenant(tenant, propAdmin);

// setting up simple web sever to test submitting function via URL
fileServer = HttpServer.create(new InetSocketAddress(0), 0);
Expand Down Expand Up @@ -279,17 +297,21 @@ void shutdown() throws Exception {
fileServer.stop(0);
pulsarClient.close();
admin.close();
functionsWorkerService.stop();
pulsar.close();
bkEnsemble.stop();

File connectorsDir = new File(workerConfig.getConnectorsDirectory());
if (connectorsDir.exists()) {
FileUtils.deleteDirectory(connectorsDir);
}
}

private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
private WorkerConfig createWorkerConfig(ServiceConfiguration config) {

System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());

workerConfig = new WorkerConfig();
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
Expand Down Expand Up @@ -321,10 +343,7 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf

workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
return workerConfig;
}

protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
Expand Down Expand Up @@ -707,13 +726,17 @@ private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
}
}

@Test(timeOut = 20000, groups = "builtin")
public void testPulsarSourceStatsBuiltin() throws Exception {
testPulsarSourceLocalRun(String.format("%s://data-generator", Utils.BUILTIN));
}

@Test
@Test(timeOut = 20000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we need these timeouts,
if the test breaks then it is not likely that other tests will run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The individual timeout is in place so we don't just block for 1.5-2 hour which is the absolute test timeout and consume resources for that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We centrally set 300 seconds timeouts for all the tests that don't have a timeout

public void testPulsarSourceLocalRunNoArchive() throws Exception {
testPulsarSourceLocalRun(null);
}

@Test
@Test(timeOut = 20000)
public void testPulsarSourceLocalRunWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
testPulsarSourceLocalRun(jarFilePathUrl);
Expand All @@ -726,7 +749,7 @@ public void testPulsarSourceLocalRunWithUrl() throws Exception {
}


private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
Expand Down Expand Up @@ -813,20 +836,25 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {

}

@Test(timeOut = 20000, groups = "builtin")
public void testPulsarSinkStatsBuiltin() throws Exception {
testPulsarSinkLocalRun(String.format("%s://data-generator", Utils.BUILTIN));
}

@Test(timeOut = 20000)
public void testPulsarSinkStatsNoArchive() throws Exception {
testPulsarSinkStats(null);
testPulsarSinkLocalRun(null);
}

@Test(timeOut = 20000)
public void testPulsarSinkStatsWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
testPulsarSinkStats(jarFilePathUrl);
testPulsarSinkLocalRun(jarFilePathUrl);
}

@Test(timeOut = 40000)
public void testPulsarSinkStatsWithUrl() throws Exception {
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServer.getAddress().getPort());
testPulsarSinkStats(jarFilePathUrl);
testPulsarSinkLocalRun(jarFilePathUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -69,6 +70,7 @@

import lombok.ToString;

import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
Expand Down Expand Up @@ -204,6 +206,22 @@ void setup(Method method) throws Exception {
config.setAllowAutoTopicCreationType("non-partitioned");

functionsWorkerService = createPulsarFunctionWorker(config);
// populate builtin connectors folder
if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
File connectorsDir = new File(workerConfig.getConnectorsDirectory());

if (connectorsDir.exists()) {
FileUtils.deleteDirectory(connectorsDir);
}

if (connectorsDir.mkdir()) {
File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
} else {
throw new RuntimeException("Failed to create builtin connectors directory");
}
}

Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
pulsar = new PulsarService(config, workerConfig, functionWorkerService, (exitCode) -> {});
pulsar.start();
Expand Down Expand Up @@ -317,9 +335,14 @@ void shutdown() throws Exception {
functionsWorkerService.stop();
pulsar.close();
bkEnsemble.stop();

File connectorsDir = new File(workerConfig.getConnectorsDirectory());
if (connectorsDir.exists()) {
FileUtils.deleteDirectory(connectorsDir);
}
}

private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) throws IOException {

System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
Expand Down Expand Up @@ -354,6 +377,8 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);

workerConfig.setConnectorsDirectory(Files.createTempDirectory("tempconnectorsdir").toFile().getAbsolutePath());

PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
Expand Down Expand Up @@ -736,11 +761,21 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {

sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));

admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
sinkConfig.setArchive(jarFilePathUrl);
admin.sinks().createSink(sinkConfig, null);
} else {
admin.sinks().createSinkWithUrl(sinkConfig, jarFilePathUrl);
}

sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(523).build()));

admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
sinkConfig.setArchive(jarFilePathUrl);
admin.sinks().updateSink(sinkConfig, null);
} else {
admin.sinks().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
}

retryStrategically((test) -> {
try {
Expand Down Expand Up @@ -935,6 +970,12 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}

@Test(timeOut = 20000, groups = "builtin")
public void testPulsarSinkStatsBuiltin() throws Exception {
String jarFilePathUrl = String.format("%s://data-generator", Utils.BUILTIN);
testPulsarSinkStats(jarFilePathUrl);
}

@Test(timeOut = 20000)
public void testPulsarSinkStatsWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
Expand All @@ -958,8 +999,12 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);

admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
sourceConfig.setArchive(jarFilePathUrl);
admin.sources().createSource(sourceConfig, null);
} else {
admin.sources().createSourceWithUrl(sourceConfig, jarFilePathUrl);
}

retryStrategically((test) -> {
try {
Expand All @@ -971,7 +1016,12 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {

final String sinkTopic2 = "persistent://" + replNamespace + "/output2";
sourceConfig.setTopicName(sinkTopic2);
admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);

if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
admin.sources().updateSource(sourceConfig, null);
} else {
admin.sources().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
}

retryStrategically((test) -> {
try {
Expand Down Expand Up @@ -1075,6 +1125,12 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}

@Test(timeOut = 20000, groups = "builtin")
public void testPulsarSourceStatsBuiltin() throws Exception {
String jarFilePathUrl = String.format("%s://data-generator", Utils.BUILTIN);
testPulsarSourceStats(jarFilePathUrl);
}

@Test(timeOut = 20000)
public void testPulsarSourceStatsWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
*/
package org.apache.pulsar.common.nar;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
Expand All @@ -44,9 +46,6 @@
import java.util.List;
import java.util.Set;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* <p>
* A <tt>ClassLoader</tt> for loading NARs (NiFi archives). NARs are designed to allow isolating bundles of code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;

Expand All @@ -43,10 +42,7 @@ public static ClassLoader loadJar(File jar) throws MalformedURLException {
(PrivilegedAction<URLClassLoader>) () -> new URLClassLoader(new URL[]{url}));
}

public static ClassLoader extractClassLoader(Path archivePath, File packageFile) throws Exception {
if (archivePath != null) {
return loadJar(archivePath.toFile());
}
public static ClassLoader extractClassLoader(File packageFile) throws Exception {
if (packageFile != null) {
return loadJar(packageFile);
}
Expand Down
Loading