diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java index 845b53f61ad07..524369137d8f6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java @@ -48,11 +48,12 @@ public class OffloaderUtils { * @return the offloader class name * @throws IOException when fail to retrieve the pulsar offloader class */ - static Pair getOffloaderFactory(String narPath) throws IOException { + static Pair getOffloaderFactory(String narPath, String narExtractionDirectory) throws IOException { // need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case // LedgerOffloaderFactory is loaded by a classloader that is not the default classloader // as is the case for the pulsar presto plugin - NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), LedgerOffloaderFactory.class.getClassLoader()); + NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), + LedgerOffloaderFactory.class.getClassLoader(), narExtractionDirectory); String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME); OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml() @@ -105,15 +106,15 @@ private static void rethrowIOException(Throwable cause) } } - public static OffloaderDefinition getOffloaderDefinition(String narPath) throws IOException { - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) { + public static OffloaderDefinition getOffloaderDefinition(String narPath, String narExtractionDirectory) throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) { String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME); return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, OffloaderDefinition.class); } } - public static Offloaders searchForOffloaders(String connectorsDirectory) throws IOException { + public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException { Path path = Paths.get(connectorsDirectory).toAbsolutePath(); log.info("Searching for offloaders in {}", path); @@ -127,13 +128,13 @@ public static Offloaders searchForOffloaders(String connectorsDirectory) throws try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) { stream.forEach(archive -> { try { - OffloaderDefinition definition = getOffloaderDefinition(archive.toString()); + OffloaderDefinition definition = getOffloaderDefinition(archive.toString(), narExtractionDirectory); log.info("Found offloader {} from {}", definition, archive); if (!StringUtils.isEmpty(definition.getOffloaderFactoryClass())) { // Validate offloader factory class to be present and of the right type Pair offloaderFactoryPair = - getOffloaderFactory(archive.toString()); + getOffloaderFactory(archive.toString(), narExtractionDirectory); if (null != offloaderFactoryPair) { offloaders.getOffloaders().add(offloaderFactoryPair); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1d10b8d52a9cc..34163b489380a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -34,6 +34,7 @@ import lombok.Setter; import org.apache.bookkeeper.client.api.DigestType; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.protocol.Commands; @@ -1563,6 +1564,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int managedLedgerOffloadMaxThreads = 2; + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "The directory where nar Extraction of offloaders happens" + ) + private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + @FieldContext( category = CATEGORY_STORAGE_OFFLOADING, doc = "Maximum prefetch rounds for ledger reading for offloading" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 30d1dde60312c..21a73a0b6bdc2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -838,7 +838,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", offloadPolicies.getManagedLedgerOffloadDriver()); - this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory()); + this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory()); LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( offloadPolicies.getManagedLedgerOffloadDriver()); try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java index 4a64ebf7e7a43..6ccfff93ac91c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtils.java @@ -49,8 +49,8 @@ class ProtocolHandlerUtils { * @return the protocol handler definition * @throws IOException when fail to load the protocol handler or get the definition */ - public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath) throws IOException { - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) { + public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath, String narExtractionDirectory) throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) { return getProtocolHandlerDefinition(ncl); } } @@ -70,7 +70,8 @@ private static ProtocolHandlerDefinition getProtocolHandlerDefinition(NarClassLo * @return a collection of protocol handlers * @throws IOException when fail to load the available protocol handlers from the provided directory. */ - public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory) throws IOException { + public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory, + String narExtractionDirectory) throws IOException { Path path = Paths.get(handlersDirectory).toAbsolutePath(); log.info("Searching for protocol handlers in {}", path); @@ -84,7 +85,7 @@ public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirect for (Path archive : stream) { try { ProtocolHandlerDefinition phDef = - ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString()); + ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString(), narExtractionDirectory); log.info("Found protocol handler from {} : {}", archive, phDef); checkArgument(StringUtils.isNotBlank(phDef.getName())); @@ -113,11 +114,12 @@ public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirect * @param metadata the protocol handler definition. * @return */ - static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata) throws IOException { + static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata, + String narExtractionDirectory) throws IOException { NarClassLoader ncl = NarClassLoader.getFromArchive( metadata.getArchivePath().toAbsolutePath().toFile(), Collections.emptySet(), - ProtocolHandler.class.getClassLoader()); + ProtocolHandler.class.getClassLoader(), narExtractionDirectory); ProtocolHandlerDefinition phDef = getProtocolHandlerDefinition(ncl); if (StringUtils.isBlank(phDef.getHandlerClass())) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java index b09912b007602..af0d726d66216 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java @@ -46,7 +46,7 @@ public class ProtocolHandlers implements AutoCloseable { */ public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException { ProtocolHandlerDefinitions definitions = - ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory()); + ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory(), conf.getNarExtractionDirectory()); ImmutableMap.Builder handlersBuilder = ImmutableMap.builder(); @@ -60,7 +60,7 @@ public static ProtocolHandlers load(ServiceConfiguration conf) throws IOExceptio ProtocolHandlerWithClassLoader handler; try { - handler = ProtocolHandlerUtils.load(definition); + handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory()); } catch (IOException e) { log.error("Failed to load the protocol handler for protocol `" + protocol + "`", e); throw new RuntimeException("Failed to load the protocol handler for protocol `" + protocol + "`"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java index e7910dcd83ab7..3fda50845a705 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerUtilsTest.java @@ -75,10 +75,11 @@ public void testLoadProtocolHandler() throws Exception { PowerMockito.when(NarClassLoader.getFromArchive( any(File.class), any(Set.class), - any(ClassLoader.class) + any(ClassLoader.class), + any(String.class) )).thenReturn(mockLoader); - ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata); + ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata, ""); ProtocolHandler returnedPh = returnedPhWithCL.getHandler(); assertSame(mockLoader, returnedPhWithCL.getClassLoader()); @@ -107,11 +108,12 @@ public void testLoadProtocolHandlerBlankHandlerClass() throws Exception { PowerMockito.when(NarClassLoader.getFromArchive( any(File.class), any(Set.class), - any(ClassLoader.class) + any(ClassLoader.class), + any(String.class) )).thenReturn(mockLoader); try { - ProtocolHandlerUtils.load(metadata); + ProtocolHandlerUtils.load(metadata, ""); fail("Should not reach here"); } catch (IOException ioe) { // expected @@ -141,11 +143,12 @@ public void testLoadProtocolHandlerWrongHandlerClass() throws Exception { PowerMockito.when(NarClassLoader.getFromArchive( any(File.class), any(Set.class), - any(ClassLoader.class) + any(ClassLoader.class), + any(String.class) )).thenReturn(mockLoader); try { - ProtocolHandlerUtils.load(metadata); + ProtocolHandlerUtils.load(metadata, ""); fail("Should not reach here"); } catch (IOException ioe) { // expected diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java index 3fdfef0463f79..e33cbfa693062 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java @@ -135,10 +135,11 @@ public class NarClassLoader extends URLClassLoader { private static final String TMP_DIR_PREFIX = "pulsar-nar"; - private static final File NAR_CACHE_DIR = new File(System.getProperty("java.io.tmpdir") + "/" + TMP_DIR_PREFIX); + public static final String DEFAULT_NAR_EXTRACTION_DIR = System.getProperty("java.io.tmpdir"); - public static NarClassLoader getFromArchive(File narPath, Set additionalJars) throws IOException { - File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR); + public static NarClassLoader getFromArchive(File narPath, Set additionalJars, + String narExtractionDirectory) throws IOException { + File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory)); try { return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader()); } catch (ClassNotFoundException | NoClassDefFoundError e) { @@ -146,9 +147,10 @@ public static NarClassLoader getFromArchive(File narPath, Set additional } } - public static NarClassLoader getFromArchive(File narPath, Set additionalJars, ClassLoader parent) + public static NarClassLoader getFromArchive(File narPath, Set additionalJars, ClassLoader parent, + String narExtractionDirectory) throws IOException { - File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR); + File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory)); try { return new NarClassLoader(unpacked, additionalJars, parent); } catch (ClassNotFoundException | NoClassDefFoundError e) { @@ -156,6 +158,10 @@ public static NarClassLoader getFromArchive(File narPath, Set additional } } + private static File getNarExtractionDirectory(String configuredDirectory) { + return new File(configuredDirectory + "/" + TMP_DIR_PREFIX); + } + /** * Construct a nar class loader. * diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java index 0dfa46922e403..59bd652d9804f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SearchBcNarUtils.java @@ -65,7 +65,7 @@ static Provider getBcProvider(String loaderDirectory) throws IOException { NarClassLoader ncl = NarClassLoader.getFromArchive( new File(narPath), Collections.emptySet(), - BCLoader.class.getClassLoader()); + BCLoader.class.getClassLoader(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR); String configStr = ncl.getServiceDefinition(BC_DEF_NAME); BcNarDefinition nar = ObjectMapperFactory.getThreadLocalYaml() diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 66cdb60c01bec..0fa4fc6955931 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -133,6 +133,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private final ClassLoader instanceClassLoader; private ClassLoader functionClassLoader; + private String narExtractionDirectory; public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, @@ -140,7 +141,8 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig, PulsarClient pulsarClient, String stateStorageServiceUrl, SecretsProvider secretsProvider, - CollectorRegistry collectorRegistry) { + CollectorRegistry collectorRegistry, + String narExtractionDirectory) { this.instanceConfig = instanceConfig; this.fnCache = fnCache; this.jarFile = jarFile; @@ -148,6 +150,7 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig, this.stateStorageServiceUrl = stateStorageServiceUrl; this.secretsProvider = secretsProvider; this.collectorRegistry = collectorRegistry; + this.narExtractionDirectory = narExtractionDirectory; this.metricsLabels = new String[]{ instanceConfig.getFunctionDetails().getTenant(), String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(), @@ -304,7 +307,7 @@ private ClassLoader loadJars() throws Exception { fnCache.registerFunctionInstanceWithArchive( instanceConfig.getFunctionId(), instanceConfig.getInstanceName(), - jarFile); + jarFile, narExtractionDirectory); } catch (FileNotFoundException e) { // create the function class loader fnCache.registerFunctionInstance( diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index 56d80ae56363b..691b1fe64ed1a 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -59,7 +59,7 @@ private static InstanceConfig createInstanceConfig(String outputSerde) { private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception { InstanceConfig config = createInstanceConfig(outputSerde); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - config, null, null, null, null, null, null); + config, null, null, null, null, null, null, null); return javaInstanceRunnable; } diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 2d8b72f9817cd..57209de82256f 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -67,6 +68,7 @@ public class LocalRunner { private final AtomicBoolean running = new AtomicBoolean(false); private final List spawners = new LinkedList<>(); + private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; public enum RuntimeEnv { THREAD, @@ -256,14 +258,14 @@ public void start(boolean blocking) throws Exception { if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) { File file = FunctionCommon.extractFileFromPkgURL(userCodeFile); - functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file)); + functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory)); } else { File file = new File(userCodeFile); if (!file.exists()) { throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist"); } - functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file)); + functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file, narExtractionDirectory)); } } else if (sinkConfig != null) { inferMissingArguments(sinkConfig); @@ -284,13 +286,13 @@ public void start(boolean blocking) throws Exception { if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) { File file = FunctionCommon.extractFileFromPkgURL(userCodeFile); - functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file)); + functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory)); } else { File file = new File(userCodeFile); if (!file.exists()) { throw new RuntimeException("Sink archive does not exist"); } - functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file)); + functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file, narExtractionDirectory)); } } else { throw new IllegalArgumentException("Must specify Function, Source or Sink config"); @@ -346,6 +348,7 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio null, /* python instance file */ null, /* log directory */ null, /* extra dependencies dir */ + narExtractionDirectory, /* nar extraction dir */ new DefaultSecretsProviderConfigurator(), false, Optional.empty(), Optional.empty())) { for (int i = 0; i < parallelism; ++i) { @@ -407,7 +410,7 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi serviceUrl, stateStorageServiceUrl, authConfig, - new ClearTextSecretsProvider(), null, null); + new ClearTextSecretsProvider(), null, narExtractionDirectory, null); for (int i = 0; i < parallelism; ++i) { InstanceConfig instanceConfig = new InstanceConfig(); instanceConfig.setFunctionDetails(functionDetails); @@ -461,6 +464,6 @@ private Connectors getConnectors() throws IOException { pulsarHome = Paths.get("").toAbsolutePath().toString(); } String connectorsDir = Paths.get(pulsarHome, "connectors").toString(); - return ConnectorUtils.searchForConnectors(connectorsDir); + return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory); } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java index 970047fcd7332..f7104df178c28 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java @@ -41,6 +41,7 @@ import io.prometheus.jmx.JmxCollector; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceCache; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -125,6 +126,9 @@ public class JavaInstanceStarter implements AutoCloseable { @Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true) public String clusterName; + @Parameter(names = "--nar_extraction_directory", description = "The directory where extraction of nar packages happen", required = false) + public String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + @Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance", required = false) public int maxPendingAsyncRequests = 1000; @@ -198,7 +202,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection)) .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled)) .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), - secretsProvider, collectorRegistry, rootClassLoader); + secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader); runtimeSpawner = new RuntimeSpawner( instanceConfig, jarFile, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 351fb675f402b..ca70b42d06e10 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -70,7 +70,8 @@ public static List composeCmd(InstanceConfig instanceConfig, Boolean installUserCodeDependencies, String pythonDependencyRepository, String pythonExtraDependencyRepository, - int metricsPort) throws Exception { + int metricsPort, + String narExtractionDirectory) throws Exception { final List cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir); @@ -79,7 +80,7 @@ public static List composeCmd(InstanceConfig instanceConfig, authConfig, shardId, grpcPort, expectedHealthCheckInterval, logConfigFile, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, - pythonExtraDependencyRepository, metricsPort)); + pythonExtraDependencyRepository, metricsPort, narExtractionDirectory)); return cmd; } @@ -244,7 +245,8 @@ public static List getCmd(InstanceConfig instanceConfig, Boolean installUserCodeDependencies, String pythonDependencyRepository, String pythonExtraDependencyRepository, - int metricsPort) throws Exception { + int metricsPort, + String narExtractionDirectory) throws Exception { final List args = new LinkedList<>(); if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) { @@ -386,6 +388,13 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) { args.add("--cluster_name"); args.add(instanceConfig.getClusterName()); + + if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { + if (!StringUtils.isEmpty(narExtractionDirectory)) { + args.add("--nar_extraction_directory"); + args.add(narExtractionDirectory); + } + } return args; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index ca6c3537f5bf3..56f6aafd37cb6 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -145,6 +145,7 @@ public class KubernetesRuntime implements Runtime { private final AuthenticationConfig authConfig; private Integer grpcPort; private Integer metricsPort; + private String narExtractionDirectory; private final Optional manifestCustomizer; KubernetesRuntime(AppsV1Api appsClient, @@ -177,6 +178,7 @@ public class KubernetesRuntime implements Runtime { boolean authenticationEnabled, Integer grpcPort, Integer metricsPort, + String narExtractionDirectory, Optional manifestCustomizer) throws Exception { this.appsClient = appsClient; this.coreClient = coreClient; @@ -219,6 +221,7 @@ public class KubernetesRuntime implements Runtime { this.grpcPort = grpcPort; this.metricsPort = metricsPort; + this.narExtractionDirectory = narExtractionDirectory; this.processArgs = new LinkedList<>(); this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir)); @@ -245,7 +248,8 @@ public class KubernetesRuntime implements Runtime { installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository, - metricsPort)); + metricsPort, + narExtractionDirectory)); doChecks(instanceConfig.getFunctionDetails()); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index 46a3216650b97..6414783e56ca4 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -93,6 +93,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private boolean authenticationEnabled; private Integer grpcPort; private Integer metricsPort; + private String narExtractionDirectory; @ToString.Exclude @EqualsAndHashCode.Exclude @@ -229,6 +230,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic this.grpcPort = factoryConfig.getGrpcPort(); this.metricsPort = factoryConfig.getMetricsPort(); + this.narExtractionDirectory = factoryConfig.getNarExtractionDirectory(); } @Override @@ -292,6 +294,7 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c authenticationEnabled, grpcPort, metricsPort, + narExtractionDirectory, manifestCustomizer); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java index f0173422d1817..61b1f10b4f71d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java @@ -21,6 +21,7 @@ import lombok.Data; import lombok.experimental.Accessors; import org.apache.pulsar.common.configuration.FieldContext; +import org.apache.pulsar.common.nar.NarClassLoader; import java.util.Map; @@ -139,4 +140,10 @@ public class KubernetesRuntimeFactoryConfig { doc = "The port inside the function pod on which prometheus metrics are exposed" ) private Integer metricsPort = 9094; + + @FieldContext( + doc = "The directory inside the function pod where nar packages will be extracted" + ) + private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java index 91e2e087bac9d..db2d8c18d5b13 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java @@ -76,12 +76,14 @@ class ProcessRuntime implements Runtime { private final Long expectedHealthCheckInterval; private final SecretsProviderConfigurator secretsProviderConfigurator; private final String extraDependenciesDir; + private final String narExtractionDirectory; private static final long GRPC_TIMEOUT_SECS = 5; private final String funcLogDir; ProcessRuntime(InstanceConfig instanceConfig, String instanceFile, String extraDependenciesDir, + String narExtractionDirectory, String logDirectory, String codeFile, String pulsarServiceUrl, @@ -112,6 +114,7 @@ class ProcessRuntime implements Runtime { break; } this.extraDependenciesDir = extraDependenciesDir; + this.narExtractionDirectory = narExtractionDirectory; this.processArgs = RuntimeUtils.composeCmd( instanceConfig, instanceFile, @@ -133,7 +136,7 @@ class ProcessRuntime implements Runtime { false, null, null, - this.metricsPort); + this.metricsPort, narExtractionDirectory); } /** diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java index f0c0db20eae72..6892f8fe02131 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java @@ -58,6 +58,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory { private String pythonInstanceFile; private String logDirectory; private String extraDependenciesDir; + private String narExtractionDirectory; @ToString.Exclude @EqualsAndHashCode.Exclude @@ -78,13 +79,14 @@ public ProcessRuntimeFactory(String pulsarServiceUrl, String pythonInstanceFile, String logDirectory, String extraDependenciesDir, + String narExtractionDirectory, SecretsProviderConfigurator secretsProviderConfigurator, boolean authenticationEnabled, Optional functionAuthProvider, Optional runtimeCustomizer) { initialize(pulsarServiceUrl, stateStorageServiceUrl, authConfig, javaInstanceJarFile, - pythonInstanceFile, logDirectory, extraDependenciesDir, + pythonInstanceFile, logDirectory, extraDependenciesDir, narExtractionDirectory, secretsProviderConfigurator, authenticationEnabled, functionAuthProvider, runtimeCustomizer); } @@ -103,6 +105,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic factoryConfig.getPythonInstanceLocation(), factoryConfig.getLogDirectory(), factoryConfig.getExtraFunctionDependenciesDir(), + workerConfig.getNarExtractionDirectory(), secretsProviderConfigurator, workerConfig.isAuthenticationEnabled(), authProvider, @@ -116,6 +119,7 @@ private void initialize(String pulsarServiceUrl, String pythonInstanceFile, String logDirectory, String extraDependenciesDir, + String narExtractionDirectory, SecretsProviderConfigurator secretsProviderConfigurator, boolean authenticationEnabled, Optional functionAuthProvider, @@ -127,6 +131,7 @@ private void initialize(String pulsarServiceUrl, this.javaInstanceJarFile = javaInstanceJarFile; this.pythonInstanceFile = pythonInstanceFile; this.extraDependenciesDir = extraDependenciesDir; + this.narExtractionDirectory = narExtractionDirectory; this.logDirectory = logDirectory; this.authenticationEnabled = authenticationEnabled; @@ -209,6 +214,7 @@ public ProcessRuntime createContainer(InstanceConfig instanceConfig, String code instanceConfig, instanceFile, extraDependenciesDir, + narExtractionDirectory, logDirectory, codeFile, pulsarServiceUrl, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index 3dea62300ff34..ffbed6b43cb02 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -57,6 +57,7 @@ public class ThreadRuntime implements Runtime { private String stateStorageServiceUrl; private SecretsProvider secretsProvider; private CollectorRegistry collectorRegistry; + private String narExtractionDirectory; ThreadRuntime(InstanceConfig instanceConfig, FunctionCacheManager fnCache, ThreadGroup threadGroup, @@ -64,7 +65,8 @@ public class ThreadRuntime implements Runtime { PulsarClient pulsarClient, String stateStorageServiceUrl, SecretsProvider secretsProvider, - CollectorRegistry collectorRegistry) { + CollectorRegistry collectorRegistry, + String narExtractionDirectory) { this.instanceConfig = instanceConfig; if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) { throw new RuntimeException("Thread Container only supports Java Runtime"); @@ -77,6 +79,7 @@ public class ThreadRuntime implements Runtime { this.stateStorageServiceUrl = stateStorageServiceUrl; this.secretsProvider = secretsProvider; this.collectorRegistry = collectorRegistry; + this.narExtractionDirectory = narExtractionDirectory; this.javaInstanceRunnable = new JavaInstanceRunnable( instanceConfig, fnCache, @@ -84,7 +87,8 @@ public class ThreadRuntime implements Runtime { pulsarClient, stateStorageServiceUrl, secretsProvider, - collectorRegistry); + collectorRegistry, + narExtractionDirectory); } /** @@ -100,7 +104,8 @@ public void start() { pulsarClient, stateStorageServiceUrl, secretsProvider, - collectorRegistry); + collectorRegistry, + narExtractionDirectory); log.info("ThreadContainer starting function with instance config {}", instanceConfig); this.fnThread = new Thread(threadGroup, javaInstanceRunnable, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index 5ba741be5f5a3..c675418440f68 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -59,22 +59,24 @@ public class ThreadRuntimeFactory implements RuntimeFactory { private String storageServiceUrl; private SecretsProvider secretsProvider; private CollectorRegistry collectorRegistry; + private String narExtractionDirectory; private volatile boolean closed; public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl, AuthenticationConfig authConfig, SecretsProvider secretsProvider, - CollectorRegistry collectorRegistry, ClassLoader rootClassLoader) throws Exception { + CollectorRegistry collectorRegistry, String narExtractionDirectory, + ClassLoader rootClassLoader) throws Exception { initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), - storageServiceUrl, secretsProvider, collectorRegistry, rootClassLoader); + storageServiceUrl, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader); } @VisibleForTesting public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, - ClassLoader rootClassLoader) { + String narExtractionDirectory, ClassLoader rootClassLoader) { initialize(threadGroupName, pulsarClient, storageServiceUrl, - secretsProvider, collectorRegistry, rootClassLoader); + secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader); } private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) @@ -100,7 +102,7 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) { private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, - ClassLoader rootClassLoader) { + String narExtractionDirectory, ClassLoader rootClassLoader) { if (rootClassLoader == null) { rootClassLoader = Thread.currentThread().getContextClassLoader(); } @@ -111,6 +113,7 @@ private void initialize(String threadGroupName, PulsarClient pulsarClient, Strin this.pulsarClient = pulsarClient; this.storageServiceUrl = storageServiceUrl; this.collectorRegistry = collectorRegistry; + this.narExtractionDirectory = narExtractionDirectory; } @Override @@ -124,7 +127,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic initialize(factoryConfig.getThreadGroupName(), createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig), workerConfig.getStateStorageServiceUrl(), new ClearTextSecretsProvider(), - null, null); + null, workerConfig.getNarExtractionDirectory(), null); } @Override @@ -139,7 +142,8 @@ public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFi pulsarClient, storageServiceUrl, secretsProvider, - collectorRegistry); + collectorRegistry, + narExtractionDirectory); } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 7cd718652104e..4fd21f024aa6e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -46,6 +46,7 @@ import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider; import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig; @@ -134,6 +135,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "The path to the location to locate builtin connectors" ) private String connectorsDirectory = "./connectors"; + @FieldContext( + category = CATEGORY_CONNECTORS, + doc = "The directory where nar packages are extractors" + ) + private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; @FieldContext( category = CATEGORY_FUNC_METADATA_MNG, doc = "The pulsar topic used for storing function metadata" diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 7cd3819e814cc..ab6325edcab46 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -71,6 +71,7 @@ public class KubernetesRuntimeTest { private static final Map topicsToSchema = new HashMap<>(); private static final Function.Resources RESOURCES = Function.Resources.newBuilder() .setRam(1000L).setCpu(1).setDisk(10000L).build(); + private static final String narExtractionDirectory = "/tmp/foo"; static { topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", ""); @@ -201,6 +202,7 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int kubernetesRuntimeFactoryConfig.setChangeConfigMap(null); kubernetesRuntimeFactoryConfig.setGrpcPort(4332); kubernetesRuntimeFactoryConfig.setMetricsPort(4331); + kubernetesRuntimeFactoryConfig.setNarExtractionDirectory(narExtractionDirectory); workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName()); workerConfig.setFunctionRuntimeFactoryConfigs( ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class)); @@ -358,14 +360,14 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s if (null != depsDir) { extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir; classpath = classpath + ":" + depsDir + "/*"; - totalArgs = 35; + totalArgs = 37; portArg = 26; metricsPortArg = 28; } else { extraDepsEnv = ""; portArg = 25; metricsPortArg = 27; - totalArgs = 34; + totalArgs = 36; } if (secretsAttached) { totalArgs += 4; @@ -394,7 +396,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s expectedArgs += " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider" + " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'"; } - expectedArgs += " --cluster_name standalone"; + expectedArgs += " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory; assertEquals(String.join(" ", args), expectedArgs); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index a7b63d85bb480..83477c17ebe14 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -58,6 +58,7 @@ * Unit test of {@link ThreadRuntime}. */ public class ProcessRuntimeTest { + private String narExtractionDirectory = "/tmp/foo"; class TestSecretsProviderConfigurator implements SecretsProviderConfigurator { @@ -148,6 +149,7 @@ private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenci workerConfig.setPulsarServiceUrl(pulsarServiceUrl); workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl); workerConfig.setAuthenticationEnabled(false); + workerConfig.setNarExtractionDirectory(narExtractionDirectory); ProcessRuntimeFactoryConfig processRuntimeFactoryConfig = new ProcessRuntimeFactoryConfig(); processRuntimeFactoryConfig.setJavaInstanceJarLocation(javaInstanceJarFile); @@ -274,13 +276,13 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir) throws Exce int portArg; int metricsPortArg; if (null != depsDir) { - assertEquals(args.size(), 37); + assertEquals(args.size(), 39); extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir.toString(); classpath = classpath + ":" + depsDir + "/*"; portArg = 24; metricsPortArg = 26; } else { - assertEquals(args.size(), 36); + assertEquals(args.size(), 38); extraDepsEnv = ""; portArg = 23; metricsPortArg = 25; @@ -303,7 +305,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir) throws Exce + " --expected_healthcheck_interval 30" + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider" + " --secrets_provider_config '{\"Config\":\"Value\"}'" - + " --cluster_name standalone"; + + " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory; assertEquals(String.join(" ", args), expectedArgs); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index 7c22518c1571d..1bd99b2a16bce 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -298,11 +298,12 @@ public static Class loadClass(String className, ClassLoader classLoader) thro return objectClass; } - public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile) { + public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile, + String narExtractionDirectory) { if (archivePath != null) { try { return NarClassLoader.getFromArchive(archivePath.toFile(), - Collections.emptySet()); + Collections.emptySet(), narExtractionDirectory); } catch (IOException e) { throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath)); } @@ -311,7 +312,7 @@ public static NarClassLoader extractNarClassLoader(Path archivePath, File packag if (packageFile != null) { try { return NarClassLoader.getFromArchive(packageFile, - Collections.emptySet()); + Collections.emptySet(), narExtractionDirectory); } catch (IOException e) { throw new IllegalArgumentException(e.getMessage()); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 0c50265cd20cf..57aa8ec8d43c8 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -302,7 +302,7 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) { } public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath, - File sinkPackageFile) { + File sinkPackageFile, String narExtractionDirectory) { if (isEmpty(sinkConfig.getTenant())) { throw new IllegalArgumentException("Sink tenant cannot be null"); } @@ -356,7 +356,7 @@ public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archiveP jarClassLoaderException = e; } try { - narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile); + narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile, narExtractionDirectory); } catch (Exception e) { narClassLoaderException = e; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index c3d6d0ad71e0e..b184ef0393f24 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -208,7 +208,8 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) { return sourceConfig; } - public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath, File sourcePackageFile) { + public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath, + File sourcePackageFile, String narExtractionDirectory) { if (isEmpty(sourceConfig.getTenant())) { throw new IllegalArgumentException("Source tenant cannot be null"); } @@ -249,7 +250,7 @@ public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path ar jarClassLoaderException = e; } try { - narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile); + narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile, narExtractionDirectory); } catch (Exception e) { narClassLoaderException = e; } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java index 5dbb804464266..81d39d956ea96 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java @@ -71,8 +71,10 @@ public class FunctionCacheEntry implements AutoCloseable { this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId)); } - FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader) throws IOException { - this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(), rootClassLoader); + FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader, + String narExtractionDirectory) throws IOException { + this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(), + rootClassLoader, narExtractionDirectory); this.classpaths = Collections.emptySet(); this.jarFiles = Collections.singleton(narArchive); this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId)); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java index 427e263a775ff..58508eccb7984 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManager.java @@ -60,7 +60,9 @@ void registerFunctionInstance(String fid, List requiredClasspaths) throws IOException; - void registerFunctionInstanceWithArchive(String fid, String eid, String narArchive) throws IOException; + void registerFunctionInstanceWithArchive(String fid, String eid, + String narArchive, + String narExtractionDirectory) throws IOException; /** * Unregisters a job from the function cache manager. diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java index b5f7e42b31df6..73b7222186865 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java @@ -110,7 +110,8 @@ public void registerFunctionInstance(String fid, } @Override - public void registerFunctionInstanceWithArchive(String fid, String eid, String narArchive) throws IOException { + public void registerFunctionInstanceWithArchive(String fid, String eid, + String narArchive, String narExtractionDirectory) throws IOException { if (fid == null) { throw new NullPointerException("FunctionID not set"); } @@ -125,7 +126,7 @@ public void registerFunctionInstanceWithArchive(String fid, String eid, String n // Create new cache entry try { - cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader)); + cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader, narExtractionDirectory)); } catch (Throwable cause) { Exceptions.rethrowIOException(cause); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java index 2b78bf027b1d9..a78c24ccfde7e 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java @@ -98,15 +98,15 @@ public static String getIOSinkClass(ClassLoader classLoader) throws IOException return conf.getSinkClass(); } - public static ConnectorDefinition getConnectorDefinition(String narPath) throws IOException { - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) { + public static ConnectorDefinition getConnectorDefinition(String narPath, String narExtractionDirectory) throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) { String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME); return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class); } } - public static Connectors searchForConnectors(String connectorsDirectory) throws IOException { + public static Connectors searchForConnectors(String connectorsDirectory, String narExtractionDirectory) throws IOException { Path path = Paths.get(connectorsDirectory).toAbsolutePath(); log.info("Searching for connectors in {}", path); @@ -120,7 +120,7 @@ public static Connectors searchForConnectors(String connectorsDirectory) throws try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) { for (Path archive : stream) { try { - ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toString()); + ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toString(), narExtractionDirectory); log.info("Found connector {} from {}", cntDef, archive); if (!StringUtils.isEmpty(cntDef.getSourceClass())) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java index 02e3fc4f32680..40156eb68f4fe 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java @@ -31,7 +31,7 @@ public class ConnectorsManager { private Connectors connectors; public ConnectorsManager(WorkerConfig workerConfig) throws IOException { - this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory()); + this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory()); } public List getConnectors() { @@ -47,6 +47,6 @@ public Path getSinkArchive(String sinkType) { } public void reloadConnectors(WorkerConfig workerConfig) throws IOException { - this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory()); + this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory()); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index eef95a1a72215..0a9575568a9c8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -398,7 +398,7 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I SourceSpec sourceSpec = functionDetails.getSource(); if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { File archive = connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile(); - String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass(); + String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString(), workerConfig.getNarExtractionDirectory()).getSourceClass(); SourceSpec.Builder builder = SourceSpec.newBuilder(functionDetails.getSource()); builder.setClassName(sourceClass); functionDetails.setSource(builder); @@ -412,7 +412,7 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I SinkSpec sinkSpec = functionDetails.getSink(); if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { File archive = connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile(); - String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSinkClass(); + String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString(), workerConfig.getNarExtractionDirectory()).getSinkClass(); SinkSpec.Builder builder = SinkSpec.newBuilder(functionDetails.getSink()); builder.setClassName(sinkClass); functionDetails.setSink(builder); @@ -427,7 +427,7 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I private void fillSourceTypeClass(FunctionDetails.Builder functionDetails, File archive, String className) throws IOException, ClassNotFoundException { - try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet(), workerConfig.getNarExtractionDirectory())) { String typeArg = getSourceType(className, ncl).getName(); SourceSpec.Builder sourceBuilder = SourceSpec.newBuilder(functionDetails.getSource()); @@ -445,7 +445,7 @@ private void fillSourceTypeClass(FunctionDetails.Builder functionDetails, File a private void fillSinkTypeClass(FunctionDetails.Builder functionDetails, File archive, String className) throws IOException, ClassNotFoundException { - try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet(), workerConfig.getNarExtractionDirectory())) { String typeArg = getSinkType(className, ncl).getName(); SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(functionDetails.getSink()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index dd4b0eea65271..560c8cbe4d2eb 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -688,7 +688,8 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath)); } } - SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile); + SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, + componentPackageFile, worker().getWorkerConfig().getNarExtractionDirectory()); return SinkConfigUtils.convert(sinkConfig, sinkDetails); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index 9f74a1ddb2784..4f434fa0bd38e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -684,7 +684,8 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath)); } } - SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, sourcePackageFile); + SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, + sourcePackageFile, worker().getWorkerConfig().getNarExtractionDirectory()); return SourceConfigUtils.convert(sourceConfig, sourceDetails); } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 6ee14dc2e4854..243f1274c5ead 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -148,7 +148,7 @@ public void testSchedule() throws Exception { functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -194,7 +194,7 @@ public void testNothingNewToSchedule() throws Exception { functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -241,7 +241,7 @@ public void testAddingFunctions() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -301,7 +301,7 @@ public void testDeletingFunctions() throws Exception { functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -367,7 +367,7 @@ public void testScalingUp() throws Exception { doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider - (), new CollectorRegistry(), null); + (), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -478,7 +478,7 @@ public void testScalingDown() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -605,7 +605,7 @@ public void testHeartbeatFunction() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); Map> currentAssignments = new HashMap<>(); @@ -659,7 +659,7 @@ public void testUpdate() throws Exception { functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments @@ -792,7 +792,7 @@ public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSu functionMetaDataList.add(function2); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null); + ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null, null); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); // set assignments diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index cd55b2aee0396..bd65a0b2bfdcc 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -162,7 +162,7 @@ public void setup() throws Exception { instanceConfig.setMaxBufferedTuples(1024); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, null, null, null, null, null, null); + instanceConfig, null, null, null, null, null, null, null); CompletableFuture metricsDataCompletableFuture = new CompletableFuture(); metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics()); Runtime runtime = mock(Runtime.class); @@ -208,7 +208,7 @@ public void testMetricsEmpty() { instanceConfig.setMaxBufferedTuples(1024); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( - instanceConfig, null, null, null, null, null, null); + instanceConfig, null, null, null, null, null, null, null); CompletableFuture completableFuture = new CompletableFuture(); completableFuture.complete(javaInstanceRunnable.getMetrics()); Runtime runtime = mock(Runtime.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index b9aeccb58eec8..bc117abb41869 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -817,7 +817,7 @@ private void testUpdateSinkMissingArguments( FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class)); doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class); - FunctionCommon.extractNarClassLoader(any(), any()); + FunctionCommon.extractNarClassLoader(any(), any(), any()); doReturn(ATLEAST_ONCE).when(FunctionCommon.class); FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); @@ -888,7 +888,7 @@ private void updateDefaultSink() throws Exception { FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class)); doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class); - FunctionCommon.extractNarClassLoader(any(), any()); + FunctionCommon.extractNarClassLoader(any(), any(), any()); doReturn(ATLEAST_ONCE).when(FunctionCommon.class); FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); @@ -986,7 +986,7 @@ public void testUpdateSinkWithUrl() throws Exception { PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod(); doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class); - FunctionCommon.extractNarClassLoader(any(), any()); + FunctionCommon.extractNarClassLoader(any(), any(), any()); doReturn(ATLEAST_ONCE).when(FunctionCommon.class); FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index 88dcff4fcaf55..d13fd3d021666 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -837,7 +837,7 @@ private void testUpdateSourceMissingArguments( FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class)); doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class); - FunctionCommon.extractNarClassLoader(any(), any()); + FunctionCommon.extractNarClassLoader(any(), any(), any()); this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); @@ -907,7 +907,7 @@ private void updateDefaultSource() throws Exception { FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class)); doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class); - FunctionCommon.extractNarClassLoader(any(), any(File.class)); + FunctionCommon.extractNarClassLoader(any(), any(File.class), any()); this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); @@ -1001,7 +1001,7 @@ public void testUpdateSourceWithUrl() throws Exception { PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod(); doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class); - FunctionCommon.extractNarClassLoader(any(), any()); + FunctionCommon.extractNarClassLoader(any(), any(), any()); this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index a29aabd155879..b8708883e9a1f 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -83,7 +83,7 @@ private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws OffloadPolicies offloadPolicies = new OffloadPolicies(); BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig); - this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies); + this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig); } public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { @@ -117,7 +117,8 @@ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConf return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig); } - public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies) { + public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies, + PulsarConnectorConfig pulsarConnectorConfig) { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); if (offloadPolicies == null) { managedLedgerConfig.setLedgerOffloader(this.defaultOffloader); @@ -130,7 +131,7 @@ public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, O if (offloader != null) { offloader.close(); } - return initManagedLedgerOffloader(offloadPolicies); + return initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig); } }); managedLedgerConfig.setLedgerOffloader(ledgerOffloader); @@ -147,14 +148,16 @@ private synchronized OrderedScheduler getOffloaderScheduler(OffloadPolicies offl return this.offloaderScheduler; } - private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies) { + private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies, + PulsarConnectorConfig pulsarConnectorConfig) { try { if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) { checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", offloadPolicies.getManagedLedgerOffloadDriver()); - this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory()); + this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(), + pulsarConnectorConfig.getNarExtractionDirectory()); LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( offloadPolicies.getManagedLedgerOffloadDriver()); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java index b2b27558eb05b..49d2ae349153d 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamedEntity; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.protocol.Commands; /** @@ -74,6 +75,9 @@ public class PulsarConnectorConfig implements AutoCloseable { private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors(); private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors(); + // --- Nar extraction + private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + @NotNull public String getBrokerServiceUrl() { return brokerServiceUrl; @@ -358,6 +362,17 @@ public PulsarConnectorConfig setManagedLedgerNumSchedulerThreads(int managedLedg return this; } + // --- Nar extraction config + public String getNarExtractionDirectory() { + return narExtractionDirectory; + } + + @Config("pulsar.nar-extraction-directory") + public PulsarConnectorConfig setNarExtractionDirectory(String narExtractionDirectory) { + this.narExtractionDirectory = narExtractionDirectory; + return this; + } + @NotNull public PulsarAdmin getPulsarAdmin() throws PulsarClientException { if (this.pulsarAdmin == null) { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index 003c51f8f4143..d7e2e2fec0432 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -124,7 +124,8 @@ public PulsarRecordCursor(List columnHandles, PulsarSplit pu pulsarConnectorCache.getManagedLedgerFactory(), pulsarConnectorCache.getManagedLedgerConfig( TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()), - pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies), + pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies, + pulsarConnectorConfig), new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider())); }