Skip to content

Commit 1ff1fcd

Browse files
jerrypengJerry Peng
andauthored
Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation (#9413)
Co-authored-by: Jerry Peng <jerryp@splunk.com>
1 parent d7e68c0 commit 1ff1fcd

File tree

19 files changed

+634
-486
lines changed

19 files changed

+634
-486
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.lang.reflect.Method;
3636
import java.net.InetSocketAddress;
3737
import java.net.URL;
38+
import java.nio.file.Files;
39+
import java.util.Arrays;
3840
import java.util.Collections;
3941
import java.util.HashMap;
4042
import java.util.HashSet;
@@ -43,6 +45,7 @@
4345
import java.util.Set;
4446
import java.util.concurrent.TimeUnit;
4547

48+
import org.apache.commons.io.FileUtils;
4649
import org.apache.pulsar.broker.PulsarService;
4750
import org.apache.pulsar.broker.ServiceConfiguration;
4851
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -106,7 +109,6 @@ public class PulsarFunctionLocalRunTest {
106109
PulsarAdmin admin;
107110
PulsarClient pulsarClient;
108111
BrokerStats brokerStatsClient;
109-
PulsarWorkerService functionsWorkerService;
110112
final String tenant = "external-repl-prop";
111113
String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
112114
String primaryHost;
@@ -177,9 +179,25 @@ void setup(Method method) throws Exception {
177179
config.setBrokerClientTlsEnabled(true);
178180
config.setAllowAutoTopicCreationType("non-partitioned");
179181

180-
functionsWorkerService = createPulsarFunctionWorker(config);
182+
workerConfig = createWorkerConfig(config);
181183

182-
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
184+
// populate builtin connectors folder
185+
if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
186+
File connectorsDir = new File(workerConfig.getConnectorsDirectory());
187+
188+
if (connectorsDir.exists()) {
189+
FileUtils.deleteDirectory(connectorsDir);
190+
}
191+
192+
if (connectorsDir.mkdir()) {
193+
File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
194+
Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
195+
} else {
196+
throw new RuntimeException("Failed to create builtin connectors directory");
197+
}
198+
}
199+
200+
Optional<WorkerService> functionWorkerService = Optional.empty();
183201
pulsar = new PulsarService(config, workerConfig, functionWorkerService, (exitCode) -> {});
184202
pulsar.start();
185203

@@ -199,9 +217,9 @@ void setup(Method method) throws Exception {
199217
brokerStatsClient = admin.brokerStats();
200218
primaryHost = pulsar.getWebServiceAddress();
201219

202-
// update cluster metadata
220+
// create cluster metadata
203221
ClusterData clusterData = new ClusterData(urlTls.toString());
204-
admin.clusters().updateCluster(config.getClusterName(), clusterData);
222+
admin.clusters().createCluster(config.getClusterName(), clusterData);
205223

206224
ClientBuilder clientBuilder = PulsarClient.builder()
207225
.serviceUrl(pulsar.getBrokerServiceUrl());
@@ -218,7 +236,7 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {
218236
TenantInfo propAdmin = new TenantInfo();
219237
propAdmin.getAdminRoles().add("superUser");
220238
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList(CLUSTER)));
221-
admin.tenants().updateTenant(tenant, propAdmin);
239+
admin.tenants().createTenant(tenant, propAdmin);
222240

223241
// setting up simple web sever to test submitting function via URL
224242
fileServer = HttpServer.create(new InetSocketAddress(0), 0);
@@ -279,17 +297,21 @@ void shutdown() throws Exception {
279297
fileServer.stop(0);
280298
pulsarClient.close();
281299
admin.close();
282-
functionsWorkerService.stop();
283300
pulsar.close();
284301
bkEnsemble.stop();
302+
303+
File connectorsDir = new File(workerConfig.getConnectorsDirectory());
304+
if (connectorsDir.exists()) {
305+
FileUtils.deleteDirectory(connectorsDir);
306+
}
285307
}
286308

287-
private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
309+
private WorkerConfig createWorkerConfig(ServiceConfiguration config) {
288310

289311
System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
290312
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
291313

292-
workerConfig = new WorkerConfig();
314+
WorkerConfig workerConfig = new WorkerConfig();
293315
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
294316
workerConfig.setSchedulerClassName(
295317
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -321,10 +343,7 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
321343

322344
workerConfig.setAuthenticationEnabled(true);
323345
workerConfig.setAuthorizationEnabled(true);
324-
325-
PulsarWorkerService workerService = new PulsarWorkerService();
326-
workerService.init(workerConfig, null, false);
327-
return workerService;
346+
return workerConfig;
328347
}
329348

330349
protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
@@ -707,13 +726,17 @@ private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
707726
}
708727
}
709728

729+
@Test(timeOut = 20000, groups = "builtin")
730+
public void testPulsarSourceStatsBuiltin() throws Exception {
731+
testPulsarSourceLocalRun(String.format("%s://data-generator", Utils.BUILTIN));
732+
}
710733

711-
@Test
734+
@Test(timeOut = 20000)
712735
public void testPulsarSourceLocalRunNoArchive() throws Exception {
713736
testPulsarSourceLocalRun(null);
714737
}
715738

716-
@Test
739+
@Test(timeOut = 20000)
717740
public void testPulsarSourceLocalRunWithFile() throws Exception {
718741
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
719742
testPulsarSourceLocalRun(jarFilePathUrl);
@@ -726,7 +749,7 @@ public void testPulsarSourceLocalRunWithUrl() throws Exception {
726749
}
727750

728751

729-
private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
752+
private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
730753
final String namespacePortion = "io";
731754
final String replNamespace = tenant + "/" + namespacePortion;
732755
final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -813,20 +836,25 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
813836

814837
}
815838

839+
@Test(timeOut = 20000, groups = "builtin")
840+
public void testPulsarSinkStatsBuiltin() throws Exception {
841+
testPulsarSinkLocalRun(String.format("%s://data-generator", Utils.BUILTIN));
842+
}
843+
816844
@Test(timeOut = 20000)
817845
public void testPulsarSinkStatsNoArchive() throws Exception {
818-
testPulsarSinkStats(null);
846+
testPulsarSinkLocalRun(null);
819847
}
820848

821849
@Test(timeOut = 20000)
822850
public void testPulsarSinkStatsWithFile() throws Exception {
823851
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
824-
testPulsarSinkStats(jarFilePathUrl);
852+
testPulsarSinkLocalRun(jarFilePathUrl);
825853
}
826854

827855
@Test(timeOut = 40000)
828856
public void testPulsarSinkStatsWithUrl() throws Exception {
829857
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServer.getAddress().getPort());
830-
testPulsarSinkStats(jarFilePathUrl);
858+
testPulsarSinkLocalRun(jarFilePathUrl);
831859
}
832860
}

pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.net.HttpURLConnection;
5252
import java.net.InetSocketAddress;
5353
import java.net.URL;
54+
import java.nio.file.Files;
5455
import java.util.Arrays;
5556
import java.util.Collections;
5657
import java.util.HashMap;
@@ -69,6 +70,7 @@
6970

7071
import lombok.ToString;
7172

73+
import org.apache.commons.io.FileUtils;
7274
import org.apache.pulsar.broker.PulsarService;
7375
import org.apache.pulsar.broker.ServiceConfiguration;
7476
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -204,6 +206,22 @@ void setup(Method method) throws Exception {
204206
config.setAllowAutoTopicCreationType("non-partitioned");
205207

206208
functionsWorkerService = createPulsarFunctionWorker(config);
209+
// populate builtin connectors folder
210+
if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
211+
File connectorsDir = new File(workerConfig.getConnectorsDirectory());
212+
213+
if (connectorsDir.exists()) {
214+
FileUtils.deleteDirectory(connectorsDir);
215+
}
216+
217+
if (connectorsDir.mkdir()) {
218+
File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
219+
Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
220+
} else {
221+
throw new RuntimeException("Failed to create builtin connectors directory");
222+
}
223+
}
224+
207225
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
208226
pulsar = new PulsarService(config, workerConfig, functionWorkerService, (exitCode) -> {});
209227
pulsar.start();
@@ -317,9 +335,14 @@ void shutdown() throws Exception {
317335
functionsWorkerService.stop();
318336
pulsar.close();
319337
bkEnsemble.stop();
338+
339+
File connectorsDir = new File(workerConfig.getConnectorsDirectory());
340+
if (connectorsDir.exists()) {
341+
FileUtils.deleteDirectory(connectorsDir);
342+
}
320343
}
321344

322-
private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
345+
private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) throws IOException {
323346

324347
System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
325348
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
@@ -354,6 +377,8 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
354377
workerConfig.setAuthenticationEnabled(true);
355378
workerConfig.setAuthorizationEnabled(true);
356379

380+
workerConfig.setConnectorsDirectory(Files.createTempDirectory("tempconnectorsdir").toFile().getAbsolutePath());
381+
357382
PulsarWorkerService workerService = new PulsarWorkerService();
358383
workerService.init(workerConfig, null, false);
359384
return workerService;
@@ -736,11 +761,21 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
736761

737762
sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
738763

739-
admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
764+
if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
765+
sinkConfig.setArchive(jarFilePathUrl);
766+
admin.sinks().createSink(sinkConfig, null);
767+
} else {
768+
admin.sinks().createSinkWithUrl(sinkConfig, jarFilePathUrl);
769+
}
740770

741771
sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(523).build()));
742772

743-
admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
773+
if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
774+
sinkConfig.setArchive(jarFilePathUrl);
775+
admin.sinks().updateSink(sinkConfig, null);
776+
} else {
777+
admin.sinks().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
778+
}
744779

745780
retryStrategically((test) -> {
746781
try {
@@ -935,6 +970,12 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
935970
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
936971
}
937972

973+
@Test(timeOut = 20000, groups = "builtin")
974+
public void testPulsarSinkStatsBuiltin() throws Exception {
975+
String jarFilePathUrl = String.format("%s://data-generator", Utils.BUILTIN);
976+
testPulsarSinkStats(jarFilePathUrl);
977+
}
978+
938979
@Test(timeOut = 20000)
939980
public void testPulsarSinkStatsWithFile() throws Exception {
940981
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
@@ -958,8 +999,12 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
958999
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
9591000

9601001
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
961-
962-
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
1002+
if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
1003+
sourceConfig.setArchive(jarFilePathUrl);
1004+
admin.sources().createSource(sourceConfig, null);
1005+
} else {
1006+
admin.sources().createSourceWithUrl(sourceConfig, jarFilePathUrl);
1007+
}
9631008

9641009
retryStrategically((test) -> {
9651010
try {
@@ -971,7 +1016,12 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
9711016

9721017
final String sinkTopic2 = "persistent://" + replNamespace + "/output2";
9731018
sourceConfig.setTopicName(sinkTopic2);
974-
admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
1019+
1020+
if (jarFilePathUrl.startsWith(Utils.BUILTIN)) {
1021+
admin.sources().updateSource(sourceConfig, null);
1022+
} else {
1023+
admin.sources().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
1024+
}
9751025

9761026
retryStrategically((test) -> {
9771027
try {
@@ -1075,6 +1125,12 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
10751125
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
10761126
}
10771127

1128+
@Test(timeOut = 20000, groups = "builtin")
1129+
public void testPulsarSourceStatsBuiltin() throws Exception {
1130+
String jarFilePathUrl = String.format("%s://data-generator", Utils.BUILTIN);
1131+
testPulsarSourceStats(jarFilePathUrl);
1132+
}
1133+
10781134
@Test(timeOut = 20000)
10791135
public void testPulsarSourceStatsWithFile() throws Exception {
10801136
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();

pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
*/
2424
package org.apache.pulsar.common.nar;
2525

26+
import lombok.SneakyThrows;
27+
import lombok.extern.slf4j.Slf4j;
28+
2629
import java.io.BufferedReader;
2730
import java.io.File;
2831
import java.io.FileFilter;
2932
import java.io.FileInputStream;
30-
import java.io.FileReader;
3133
import java.io.IOException;
3234
import java.io.InputStreamReader;
3335
import java.net.URL;
@@ -44,9 +46,6 @@
4446
import java.util.List;
4547
import java.util.Set;
4648

47-
import lombok.SneakyThrows;
48-
import lombok.extern.slf4j.Slf4j;
49-
5049
/**
5150
* <p>
5251
* A <tt>ClassLoader</tt> for loading NARs (NiFi archives). NARs are designed to allow isolating bundles of code

pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.net.MalformedURLException;
2323
import java.net.URL;
2424
import java.net.URLClassLoader;
25-
import java.nio.file.Path;
2625
import java.security.AccessController;
2726
import java.security.PrivilegedAction;
2827

@@ -43,10 +42,7 @@ public static ClassLoader loadJar(File jar) throws MalformedURLException {
4342
(PrivilegedAction<URLClassLoader>) () -> new URLClassLoader(new URL[]{url}));
4443
}
4544

46-
public static ClassLoader extractClassLoader(Path archivePath, File packageFile) throws Exception {
47-
if (archivePath != null) {
48-
return loadJar(archivePath.toFile());
49-
}
45+
public static ClassLoader extractClassLoader(File packageFile) throws Exception {
5046
if (packageFile != null) {
5147
return loadJar(packageFile);
5248
}

0 commit comments

Comments
 (0)