Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix submit function via url #3934

Merged
merged 7 commits into from
Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;
import lombok.ToString;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -61,11 +63,16 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -123,6 +130,9 @@ public class PulsarFunctionE2ETest {
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";

private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
private Thread fileServerThread;
private static final int fileServerPort = PortManager.nextFreePort();
private HttpServer fileServer;

@DataProvider(name = "validRoleName")
public Object[][] validRoleName() {
Expand Down Expand Up @@ -213,12 +223,71 @@ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {

System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");

Thread.sleep(100);
// setting up simple web sever to test submitting function via URL
fileServerThread = new Thread(() -> {
try {
fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0);
fileServer.createContext("/pulsar-io-data-generator.nar", he -> {
try {

Headers headers = he.getResponseHeaders();
headers.add("Content-Type", "application/octet-stream");

File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
byte[] bytes = new byte [(int)file.length()];

FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
bufferedInputStream.read(bytes, 0, bytes.length);

he.sendResponseHeaders(200, file.length());
OutputStream outputStream = he.getResponseBody();
outputStream.write(bytes, 0, bytes.length);
outputStream.close();

} catch (Exception e) {
log.error("Error when downloading: {}", e, e);
}
});
fileServer.createContext("/pulsar-functions-api-examples.jar", he -> {
try {

Headers headers = he.getResponseHeaders();
headers.add("Content-Type", "application/octet-stream");

File file = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
byte[] bytes = new byte [(int)file.length()];

FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
bufferedInputStream.read(bytes, 0, bytes.length);

he.sendResponseHeaders(200, file.length());
OutputStream outputStream = he.getResponseBody();
outputStream.write(bytes, 0, bytes.length);
outputStream.close();

} catch (Exception e) {
log.error("Error when downloading: {}", e, e);
}
});
fileServer.setExecutor(null); // creates a default executor
log.info("Starting file server...");
fileServer.start();
} catch (Exception e) {
log.error("Failed to start file server: ", e);
fileServer.stop(0);
}

});
fileServerThread.start();
}

@AfterMethod
void shutdown() throws Exception {
log.info("--- Shutting down ---");
fileServer.stop(0);
fileServerThread.interrupt();
pulsarClient.close();
admin.close();
functionsWorkerService.stop();
Expand Down Expand Up @@ -309,8 +378,7 @@ private static SinkConfig createSinkConfig(String tenant, String namespace, Stri
*
* @throws Exception
*/
@Test(timeOut = 20000)
public void testE2EPulsarFunction() throws Exception {
private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception {

final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
Expand All @@ -328,7 +396,6 @@ public void testE2EPulsarFunction() throws Exception {
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
Expand Down Expand Up @@ -386,7 +453,18 @@ public void testE2EPulsarFunction() throws Exception {
}

@Test(timeOut = 20000)
public void testPulsarSinkStats() throws Exception {
public void testE2EPulsarFunctionWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
testE2EPulsarFunction(jarFilePathUrl);
}

@Test(timeOut = 40000)
public void testE2EPulsarFunctionWithUrl() throws Exception {
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", fileServerPort);
testE2EPulsarFunction(jarFilePathUrl);
}

private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
Expand All @@ -401,7 +479,6 @@ public void testPulsarSinkStats() throws Exception {
// create a producer that creates a topic at broker
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName);
admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);

Expand All @@ -413,7 +490,7 @@ public void testPulsarSinkStats() throws Exception {
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
}, 50, 150);
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);

Expand Down Expand Up @@ -586,7 +663,18 @@ public void testPulsarSinkStats() throws Exception {
}

@Test(timeOut = 20000)
public void testPulsarSourceStats() throws Exception {
public void testPulsarSinkStatsWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
testPulsarSinkStats(jarFilePathUrl);
}

@Test(timeOut = 40000)
public void testPulsarSinkStatsWithUrl() throws Exception {
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
testPulsarSinkStats(jarFilePathUrl);
}

private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/output";
Expand All @@ -595,7 +683,6 @@ public void testPulsarSourceStats() throws Exception {
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic);
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);

Expand All @@ -607,7 +694,7 @@ public void testPulsarSourceStats() throws Exception {
} catch (PulsarAdminException e) {
return false;
}
}, 10, 150);
}, 50, 150);
assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);

String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
Expand Down Expand Up @@ -679,6 +766,18 @@ public void testPulsarSourceStats() throws Exception {
assertTrue(m.value > 0.0);
}

@Test(timeOut = 20000)
public void testPulsarSourceStatsWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
testPulsarSourceStats(jarFilePathUrl);
}

@Test(timeOut = 40000)
public void testPulsarSourceStatsWithUrl() throws Exception {
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
testPulsarSourceStats(jarFilePathUrl);
}

@Test(timeOut = 20000)
public void testPulsarFunctionStats() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,11 @@ public static File extractFileFromPkg(String destPkgUrl) throws IOException, URI
URL website = new URL(destPkgUrl);
File tempFile = File.createTempFile("function", ".tmp");
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
log.info("Downloading function package from {} to {} ...", destPkgUrl, tempFile.getAbsoluteFile());
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
fos.getChannel().transferFrom(rbc, 0, 10);
}
if (tempFile.exists()) {
tempFile.delete();
fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
}
log.info("Downloading function package from {} to {} completed!", destPkgUrl, tempFile.getAbsoluteFile());
return tempFile;
} else {
throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
Expand Down Expand Up @@ -318,39 +317,16 @@ public static NarClassLoader extractNarClassLoader(Path archivePath, String pkgU
throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
}
}

if (!isEmpty(pkgUrl)) {
if (pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
try {
URL url = new URL(pkgUrl);
File file = new File(url.toURI());
if (!file.exists()) {
throw new IOException(pkgUrl + " does not exists locally");
}
return NarClassLoader.getFromArchive(file, Collections.emptySet());
} catch (Exception e) {
throw new IllegalArgumentException(
"Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
}
} else if (pkgUrl.startsWith("http")) {
try {
URL website = new URL(pkgUrl);
File tempFile = File.createTempFile("function", ".tmp");
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
fos.getChannel().transferFrom(rbc, 0, 10);
}
if (tempFile.exists()) {
tempFile.delete();
}
return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
} catch (Exception e) {
throw new IllegalArgumentException(
"Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
}
} else {
throw new IllegalArgumentException("Unsupported url protocol "+ pkgUrl +", supported url protocols: [file/http/https]");
try {
return NarClassLoader.getFromArchive(extractFileFromPkg(pkgUrl), Collections.emptySet());
} catch (Exception e) {
throw new IllegalArgumentException(
"Corrupt User PackageFile " + pkgUrl + " with error " + e.getMessage());
}
}

if (uploadedInputStreamFileName != null) {
try {
return NarClassLoader.getFromArchive(uploadedInputStreamFileName,
Expand Down