Skip to content

Commit

Permalink
[improve][fn] Optimize Function Worker startup by lazy loading and di…
Browse files Browse the repository at this point in the history
…rect zip/bytecode access (apache#22122)

(cherry picked from commit bbc6224)
(cherry picked from commit 3d3606b)
lhotari authored and mukesh-ctds committed Mar 6, 2024
1 parent 25a0a7f commit 4946e62
Showing 43 changed files with 3,642 additions and 4,809 deletions.
15 changes: 14 additions & 1 deletion conf/functions_worker.yml
Original file line number Diff line number Diff line change
@@ -43,6 +43,16 @@ metadataStoreOperationTimeoutSeconds: 30
# Metadata store cache expiry time in seconds
metadataStoreCacheExpirySeconds: 300

# Specifies if the function worker should use classloading for validating submissions for built-in
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfBuiltinFiles: false

# Specifies if the function worker should use classloading for validating submissions for external
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfExternalFiles: false

################################
# Function package management
################################
@@ -400,7 +410,10 @@ saslJaasServerRoleTokenSignerSecretPath:
connectorsDirectory: ./connectors
functionsDirectory: ./functions

# Should connector config be validated during submission
# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
# enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect.
# Default is false.
validateConnectorConfig: false

# Whether to initialize distributed log metadata by runtime.
4 changes: 4 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
@@ -445,6 +445,10 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
* Byte Buddy
- net.bytebuddy-byte-buddy-1.14.12.jar
* zt-zip
- org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
- org.apache.avro-avro-1.11.3.jar
- org.apache.avro-avro-protobuf-1.11.3.jar
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -162,6 +162,8 @@ flexible messaging model and an intuitive client API.</description>
<docker-maven.version>0.43.3</docker-maven.version>
<docker.verbose>true</docker.verbose>
<typetools.version>0.5.0</typetools.version>
<byte-buddy.version>1.14.12</byte-buddy.version>
<zt-zip.version>1.17</zt-zip.version>
<protobuf3.version>3.19.6</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.55.3</grpc.version>
@@ -1063,6 +1065,18 @@ flexible messaging model and an intuitive client API.</description>
<version>${typetools.version}</version>
</dependency>

<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${byte-buddy.version}</version>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>${zt-zip.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
Original file line number Diff line number Diff line change
@@ -154,6 +154,11 @@ public NarClassLoader run() {
});
}

public static List<File> getClasspathFromArchive(File narPath, String narExtractionDirectory) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
return getClassPathEntries(unpacked);
}

private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}
@@ -164,16 +169,11 @@ private static File getNarExtractionDirectory(String configuredDirectory) {
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
* @throws IllegalArgumentException
* if the NAR is missing the Java Services API file for <tt>FlowFileProcessor</tt> implementations.
* @throws ClassNotFoundException
* if any of the <tt>FlowFileProcessor</tt> implementations defined by the Java Services API cannot be
* loaded.
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars, ClassLoader parent)
throws ClassNotFoundException, IOException {
throws IOException {
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;

@@ -238,22 +238,31 @@ public List<String> getServiceImplementation(String serviceName) throws IOExcept
* if the URL list could not be updated.
*/
private void updateClasspath(File root) throws IOException {
addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
getClassPathEntries(root).forEach(f -> {
try {
addURL(f.toURI().toURL());
} catch (IOException e) {
log.error("Failed to add entry to classpath: {}", f, e);
}
});
}

static List<File> getClassPathEntries(File root) {
List<File> classPathEntries = new ArrayList<>();
classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
log.warn("{} does not contain META-INF/bundled-dependencies!", narWorkingDirectory);
log.warn("{} does not contain META-INF/bundled-dependencies!", root);
}
addURL(dependencies.toURI().toURL());
classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
final File[] jarFiles = dependencies.listFiles(JAR_FILTER);
if (jarFiles != null) {
Arrays.sort(jarFiles, Comparator.comparing(File::getName));
for (File libJar : jarFiles) {
addURL(libJar.toURI().toURL());
}
classPathEntries.addAll(Arrays.asList(jarFiles));
}
}
return classPathEntries;
}

@Override
Original file line number Diff line number Diff line change
@@ -32,13 +32,14 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import lombok.extern.slf4j.Slf4j;

/**
@@ -113,18 +114,24 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
* if the NAR could not be unpacked.
*/
private static void unpack(final File nar, final File workingDirectory) throws IOException {
try (JarFile jarFile = new JarFile(nar)) {
Enumeration<JarEntry> jarEntries = jarFile.entries();
while (jarEntries.hasMoreElements()) {
JarEntry jarEntry = jarEntries.nextElement();
String name = jarEntry.getName();
File f = new File(workingDirectory, name);
if (jarEntry.isDirectory()) {
Path workingDirectoryPath = workingDirectory.toPath().normalize();
try (ZipFile zipFile = new ZipFile(nar)) {
Enumeration<? extends ZipEntry> zipEntries = zipFile.entries();
while (zipEntries.hasMoreElements()) {
ZipEntry zipEntry = zipEntries.nextElement();
String name = zipEntry.getName();
Path targetFilePath = workingDirectoryPath.resolve(name).normalize();
if (!targetFilePath.startsWith(workingDirectoryPath)) {
log.error("Invalid zip file with entry '{}'", name);
throw new IOException("Invalid zip file. Aborting unpacking.");
}
File f = targetFilePath.toFile();
if (zipEntry.isDirectory()) {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f);
} else {
// The directory entry might appear after the file entry
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f.getParentFile());
makeFile(jarFile.getInputStream(jarEntry), f);
makeFile(zipFile.getInputStream(zipEntry), f);
}
}
}
Original file line number Diff line number Diff line change
@@ -52,7 +52,9 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.FileUtils;
@@ -75,8 +77,11 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionRuntimeCommon;
import org.apache.pulsar.functions.utils.LoadedFunctionPackage;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@@ -357,9 +362,12 @@ public void start(boolean blocking) throws Exception {
userCodeFile = functionConfig.getJar();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.FUNCTION, functionConfig.getClassName());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
FunctionDefinition.class);
functionDetails = FunctionConfigUtils.convert(
functionConfig,
FunctionConfigUtils.validateJavaFunction(functionConfig, getCurrentOrUserCodeClassLoader()));
FunctionConfigUtils.validateJavaFunction(functionConfig, validatableFunctionPackage));
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
@@ -369,24 +377,30 @@ public void start(boolean blocking) throws Exception {
}

if (functionDetails == null) {
functionDetails = FunctionConfigUtils.convert(functionConfig, getCurrentOrUserCodeClassLoader());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
FunctionDefinition.class);
functionDetails = FunctionConfigUtils.convert(functionConfig, validatableFunctionPackage);
}
} else if (sourceConfig != null) {
inferMissingArguments(sourceConfig);
userCodeFile = sourceConfig.getArchive();
parallelism = sourceConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SOURCE, sourceConfig.getClassName());
functionDetails = SourceConfigUtils.convert(
sourceConfig,
SourceConfigUtils.validateAndExtractDetails(sourceConfig, getCurrentOrUserCodeClassLoader(), true));
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
functionDetails = SourceConfigUtils.convert(sourceConfig,
SourceConfigUtils.validateAndExtractDetails(sourceConfig, validatableFunctionPackage, true));
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
userCodeFile = sinkConfig.getArchive();
transformFunctionFile = sinkConfig.getTransformFunction();
parallelism = sinkConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SINK, sinkConfig.getClassName());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
if (isNotEmpty(sinkConfig.getTransformFunction())) {
transformFunctionCodeClassLoader = extractClassLoader(
sinkConfig.getTransformFunction(),
@@ -395,16 +409,19 @@ public void start(boolean blocking) throws Exception {
}

ClassLoader functionClassLoader = null;
ValidatableFunctionPackage validatableTransformFunction = null;
if (transformFunctionCodeClassLoader != null) {
functionClassLoader = transformFunctionCodeClassLoader.getClassLoader() == null
? Thread.currentThread().getContextClassLoader()
: transformFunctionCodeClassLoader.getClassLoader();
validatableTransformFunction =
new LoadedFunctionPackage(functionClassLoader, FunctionDefinition.class);
}

functionDetails = SinkConfigUtils.convert(
sinkConfig,
SinkConfigUtils.validateAndExtractDetails(sinkConfig, getCurrentOrUserCodeClassLoader(),
functionClassLoader, true));
SinkConfigUtils.validateAndExtractDetails(sinkConfig, validatableFunctionPackage,
validatableTransformFunction, true));
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
}
@@ -472,7 +489,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
if (classLoader == null) {
if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
classLoader = FunctionCommon.getClassLoaderFromPackage(
classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else if (userCodeFile != null) {
@@ -494,7 +511,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
}
throw new RuntimeException(errorMsg + " (" + userCodeFile + ") does not exist");
}
classLoader = FunctionCommon.getClassLoaderFromPackage(
classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else {
@@ -713,7 +730,7 @@ private ClassLoader isBuiltInFunction(String functionType) throws IOException {
FunctionArchive function = functions.get(functionName);
if (function != null && function.getFunctionDefinition().getFunctionClass() != null) {
// Function type is a valid built-in type.
return function.getClassLoader();
return function.getFunctionPackage().getClassLoader();
} else {
return null;
}
@@ -727,7 +744,7 @@ private ClassLoader isBuiltInSource(String sourceType) throws IOException {
Connector connector = connectors.get(source);
if (connector != null && connector.getConnectorDefinition().getSourceClass() != null) {
// Source type is a valid built-in connector type.
return connector.getClassLoader();
return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
@@ -741,18 +758,18 @@ private ClassLoader isBuiltInSink(String sinkType) throws IOException {
Connector connector = connectors.get(sink);
if (connector != null && connector.getConnectorDefinition().getSinkClass() != null) {
// Sink type is a valid built-in connector type
return connector.getClassLoader();
return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
}

private TreeMap<String, FunctionArchive> getFunctions() throws IOException {
return FunctionUtils.searchForFunctions(functionsDir);
return FunctionUtils.searchForFunctions(functionsDir, narExtractionDirectory, true);
}

private TreeMap<String, Connector> getConnectors() throws IOException {
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory, true);
}

private SecretsProviderConfigurator getSecretsProviderConfigurator() {
Original file line number Diff line number Diff line change
@@ -38,6 +38,9 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.description.type.TypeDefinition;
import net.bytebuddy.dynamic.ClassFileLocator;
import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
@@ -325,7 +328,8 @@ public void close() {
}

private void inferringMissingTypeClassName(Function.FunctionDetails.Builder functionDetailsBuilder,
ClassLoader classLoader) throws ClassNotFoundException {
ClassLoader classLoader) {
TypePool typePool = TypePool.Default.of(ClassFileLocator.ForClassLoader.of(classLoader));
switch (functionDetailsBuilder.getComponentType()) {
case FUNCTION:
if ((functionDetailsBuilder.hasSource()
@@ -344,22 +348,21 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
WindowConfig.class);
className = windowConfig.getActualWindowFunctionClassName();
}

Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
TypeDefinition[] typeArgs = FunctionCommon.getFunctionTypes(typePool.describe(className).resolve(),
isWindowConfigPresent);
if (functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
&& typeArgs[0] != null) {
Function.SourceSpec.Builder sourceBuilder = functionDetailsBuilder.getSource().toBuilder();
sourceBuilder.setTypeClassName(typeArgs[0].getName());
sourceBuilder.setTypeClassName(typeArgs[0].asErasure().getTypeName());
functionDetailsBuilder.setSource(sourceBuilder.build());
}

if (functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
&& typeArgs[1] != null) {
Function.SinkSpec.Builder sinkBuilder = functionDetailsBuilder.getSink().toBuilder();
sinkBuilder.setTypeClassName(typeArgs[1].getName());
sinkBuilder.setTypeClassName(typeArgs[1].asErasure().getTypeName());
functionDetailsBuilder.setSink(sinkBuilder.build());
}
}
@@ -368,7 +371,8 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
if ((functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
String typeArg =
getSinkType(functionDetailsBuilder.getSink().getClassName(), classLoader).getName();
getSinkType(functionDetailsBuilder.getSink().getClassName(), typePool).asErasure()
.getTypeName();

Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
@@ -387,7 +391,8 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
String typeArg =
getSourceType(functionDetailsBuilder.getSource().getClassName(), classLoader).getName();
getSourceType(functionDetailsBuilder.getSource().getClassName(), typePool).asErasure()
.getTypeName();

Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
Original file line number Diff line number Diff line change
@@ -124,17 +124,17 @@ private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig,
if (componentType == Function.FunctionDetails.ComponentType.FUNCTION && functionsManager.isPresent()) {
return functionsManager.get()
.getFunction(instanceConfig.getFunctionDetails().getBuiltin())
.getClassLoader();
.getFunctionPackage().getClassLoader();
}
if (componentType == Function.FunctionDetails.ComponentType.SOURCE && connectorsManager.isPresent()) {
return connectorsManager.get()
.getConnector(instanceConfig.getFunctionDetails().getSource().getBuiltin())
.getClassLoader();
.getConnectorFunctionPackage().getClassLoader();
}
if (componentType == Function.FunctionDetails.ComponentType.SINK && connectorsManager.isPresent()) {
return connectorsManager.get()
.getConnector(instanceConfig.getFunctionDetails().getSink().getBuiltin())
.getClassLoader();
.getConnectorFunctionPackage().getClassLoader();
}
}
return loadJars(jarFile, instanceConfig, functionId, instanceConfig.getFunctionDetails().getName(),
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
@@ -27,18 +28,35 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;

@Slf4j
public class ConnectorsManager {
public class ConnectorsManager implements AutoCloseable {

@Getter
private volatile TreeMap<String, Connector> connectors;

@VisibleForTesting
public ConnectorsManager() {
this.connectors = new TreeMap<>();
}

public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
this.connectors = ConnectorUtils
.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
this.connectors = createConnectors(workerConfig);
}

private static TreeMap<String, Connector> createConnectors(WorkerConfig workerConfig) throws IOException {
boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles()
|| ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
return ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(),
workerConfig.getNarExtractionDirectory(), enableClassloading);
}

@VisibleForTesting
public void addConnector(String connectorType, Connector connector) {
connectors.put(connectorType, connector);
}

public Connector getConnector(String connectorType) {
@@ -71,7 +89,25 @@ public Path getSinkArchive(String sinkType) {
}

public void reloadConnectors(WorkerConfig workerConfig) throws IOException {
connectors = ConnectorUtils
.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
TreeMap<String, Connector> oldConnectors = connectors;
this.connectors = createConnectors(workerConfig);
closeConnectors(oldConnectors);
}

@Override
public void close() {
closeConnectors(connectors);
}

private void closeConnectors(TreeMap<String, Connector> connectorMap) {
connectorMap.values().forEach(connector -> {
try {
connector.close();
} catch (Exception e) {
log.warn("Failed to close connector", e);
}
});
connectorMap.clear();
}

}
Original file line number Diff line number Diff line change
@@ -18,23 +18,33 @@
*/
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;

@Slf4j
public class FunctionsManager {

public class FunctionsManager implements AutoCloseable {
private TreeMap<String, FunctionArchive> functions;

@VisibleForTesting
public FunctionsManager() {
this.functions = new TreeMap<>();
}

public FunctionsManager(WorkerConfig workerConfig) throws IOException {
this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
this.functions = createFunctions(workerConfig);
}

public void addFunction(String functionType, FunctionArchive functionArchive) {
functions.put(functionType, functionArchive);
}

public FunctionArchive getFunction(String functionType) {
@@ -51,6 +61,32 @@ public List<FunctionDefinition> getFunctionDefinitions() {
}

public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
TreeMap<String, FunctionArchive> oldFunctions = functions;
this.functions = createFunctions(workerConfig);
closeFunctions(oldFunctions);
}

private static TreeMap<String, FunctionArchive> createFunctions(WorkerConfig workerConfig) throws IOException {
boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles()
|| ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
return FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory(),
workerConfig.getNarExtractionDirectory(),
enableClassloading);
}

@Override
public void close() {
closeFunctions(functions);
}

private void closeFunctions(TreeMap<String, FunctionArchive> functionMap) {
functionMap.values().forEach(functionArchive -> {
try {
functionArchive.close();
} catch (Exception e) {
log.warn("Failed to close function archive", e);
}
});
functionMap.clear();
}
}
Original file line number Diff line number Diff line change
@@ -238,6 +238,22 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private boolean zooKeeperAllowReadOnlyOperations;

@FieldContext(
category = CATEGORY_WORKER,
doc = "Specifies if the function worker should use classloading for validating submissions for built-in "
+ "connectors and functions. This is required for validateConnectorConfig to take effect. "
+ "Default is false."
)
private Boolean enableClassloadingOfBuiltinFiles = false;

@FieldContext(
category = CATEGORY_WORKER,
doc = "Specifies if the function worker should use classloading for validating submissions for external "
+ "connectors and functions. This is required for validateConnectorConfig to take effect. "
+ "Default is false."
)
private Boolean enableClassloadingOfExternalFiles = false;

@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
@@ -250,7 +266,10 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Should we validate connector config during submission"
doc = "Enables extended validation for connector config with fine-grain annotation based validation "
+ "during submission. Classloading with either enableClassloadingOfExternalFiles or "
+ "enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect. "
+ "Default is false."
)
private Boolean validateConnectorConfig = false;
@FieldContext(
11 changes: 11 additions & 0 deletions pulsar-functions/utils/pom.xml
Original file line number Diff line number Diff line change
@@ -87,6 +87,17 @@
<artifactId>typetools</artifactId>
</dependency>

<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>1.17</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -18,12 +18,11 @@
*/
package org.apache.pulsar.functions.utils;

import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.functions.Utils.BUILTIN;
import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromCompressionType;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsCompressionType;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
@@ -32,9 +31,7 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@@ -44,18 +41,20 @@
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import net.bytebuddy.description.type.TypeDefinition;
import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;

@Slf4j
public class FunctionConfigUtils {
@@ -74,26 +73,21 @@ public static class ExtractedFunctionDetails {

private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();

public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
throws IllegalArgumentException {
public static FunctionDetails convert(FunctionConfig functionConfig) {
return convert(functionConfig, (ValidatableFunctionPackage) null);
}

if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
if (classLoader != null) {
try {
Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
return convert(
functionConfig,
new ExtractedFunctionDetails(
functionConfig.getClassName(),
typeArgs[0].getName(),
typeArgs[1].getName()));
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
}
public static FunctionDetails convert(FunctionConfig functionConfig,
ValidatableFunctionPackage validatableFunctionPackage)
throws IllegalArgumentException {
if (functionConfig == null) {
throw new IllegalArgumentException("Function config is not provided");
}
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && validatableFunctionPackage != null) {
return convert(functionConfig, doJavaChecks(functionConfig, validatableFunctionPackage));
} else {
return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
}
return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
}

public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFunctionDetails extractedDetails)
@@ -593,48 +587,49 @@ public static void inferMissingArguments(FunctionConfig functionConfig,
}
}

public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig,
ValidatableFunctionPackage validatableFunctionPackage) {

String functionClassName = functionConfig.getClassName();
Class functionClass;
String functionClassName = StringUtils.trimToNull(functionConfig.getClassName());
TypeDefinition functionClass;
try {
// if class name in function config is not set, this should be a built-in function
// thus we should try to find its class name in the NAR service definition
if (functionClassName == null) {
try {
functionClassName = FunctionUtils.getFunctionClass(clsLoader);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract source class from archive", e);
FunctionDefinition functionDefinition =
validatableFunctionPackage.getFunctionMetaData(FunctionDefinition.class);
if (functionDefinition == null) {
throw new IllegalArgumentException("Function class name is not provided.");
}
functionClassName = functionDefinition.getFunctionClass();
if (functionClassName == null) {
throw new IllegalArgumentException("Function class name is not provided.");
}
}
functionClass = clsLoader.loadClass(functionClassName);
functionClass = validatableFunctionPackage.resolveType(functionClassName);

if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass)
&& !java.util.function.Function.class.isAssignableFrom(functionClass)
&& !org.apache.pulsar.functions.api.WindowFunction.class.isAssignableFrom(functionClass)) {
if (!functionClass.asErasure().isAssignableTo(org.apache.pulsar.functions.api.Function.class)
&& !functionClass.asErasure().isAssignableTo(java.util.function.Function.class)
&& !functionClass.asErasure()
.isAssignableTo(org.apache.pulsar.functions.api.WindowFunction.class)) {
throw new IllegalArgumentException(
String.format("Function class %s does not implement the correct interface",
functionClass.getName()));
functionClassName));
}
} catch (ClassNotFoundException | NoClassDefFoundError e) {
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
String.format("Function class %s must be in class path", functionClassName), e);
}

Class<?>[] typeArgs;
try {
typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
TypeDefinition[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
// inputs use default schema, so there is no check needed there

// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], clsLoader, true);
ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], validatableFunctionPackage.getTypePool(),
true);
});
}

@@ -649,8 +644,8 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
throw new IllegalArgumentException(
String.format("Topic %s has an incorrect schema Info", topicName));
}
ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0], clsLoader, true);

ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0],
validatableFunctionPackage.getTypePool(), true);
});
}

@@ -665,22 +660,25 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
"Only one of schemaType or serdeClassName should be set in inputSpec");
}
if (!isEmpty(conf.getSerdeClassName())) {
ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0], clsLoader, true);
ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0],
validatableFunctionPackage.getTypePool(), true);
}
if (!isEmpty(conf.getSchemaType())) {
ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0], clsLoader, true);
ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0],
validatableFunctionPackage.getTypePool(), true);
}
if (conf.getCryptoConfig() != null) {
ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(), clsLoader, false);
ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(),
validatableFunctionPackage.getTypePool(), false);
}
});
}

if (Void.class.equals(typeArgs[1])) {
return new FunctionConfigUtils.ExtractedFunctionDetails(
functionClassName,
typeArgs[0].getName(),
typeArgs[1].getName());
typeArgs[0].asErasure().getTypeName(),
typeArgs[1].asErasure().getTypeName());
}

// One and only one of outputSchemaType and outputSerdeClassName should be set
@@ -690,22 +688,25 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
}

if (!isEmpty(functionConfig.getOutputSchemaType())) {
ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], clsLoader, false);
ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1],
validatableFunctionPackage.getTypePool(), false);
}

if (!isEmpty(functionConfig.getOutputSerdeClassName())) {
ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], clsLoader, false);
ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1],
validatableFunctionPackage.getTypePool(), false);
}

if (functionConfig.getProducerConfig() != null
&& functionConfig.getProducerConfig().getCryptoConfig() != null) {
ValidatorUtils
.validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(), clsLoader, true);
.validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(),
validatableFunctionPackage.getTypePool(), true);
}
return new FunctionConfigUtils.ExtractedFunctionDetails(
functionClassName,
typeArgs[0].getName(),
typeArgs[1].getName());
typeArgs[0].asErasure().getTypeName(),
typeArgs[1].asErasure().getTypeName());
}

private static void doPythonChecks(FunctionConfig functionConfig) {
@@ -912,47 +913,21 @@ public static Collection<String> collectAllInputTopics(FunctionConfig functionCo
return retval;
}

public static ClassLoader validate(FunctionConfig functionConfig, File functionPackageFile) {
public static void validateNonJavaFunction(FunctionConfig functionConfig) {
doCommonChecks(functionConfig);
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
ClassLoader classLoader;
if (functionPackageFile != null) {
try {
classLoader = loadJar(functionPackageFile);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Corrupted Jar File", e);
}
} else if (!isEmpty(functionConfig.getJar())) {
File jarFile = new File(functionConfig.getJar());
if (!jarFile.exists()) {
throw new IllegalArgumentException("Jar file does not exist");
}
try {
classLoader = loadJar(jarFile);
} catch (Exception e) {
throw new IllegalArgumentException("Corrupted Jar File", e);
}
} else {
throw new IllegalArgumentException("Function Package is not provided");
}

doJavaChecks(functionConfig, classLoader);
return classLoader;
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
doGolangChecks(functionConfig);
return null;
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
doPythonChecks(functionConfig);
return null;
} else {
throw new IllegalArgumentException("Function language runtime is either not set or cannot be determined");
}
}

public static ExtractedFunctionDetails validateJavaFunction(FunctionConfig functionConfig,
ClassLoader classLoader) {
ValidatableFunctionPackage validatableFunctionPackage) {
doCommonChecks(functionConfig);
return doJavaChecks(functionConfig, classLoader);
return doJavaChecks(functionConfig, validatableFunctionPackage);
}

public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) {
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.ClassFileLocator;
import net.bytebuddy.pool.TypePool;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.zeroturnaround.zip.ZipUtil;

/**
* FunctionFilePackage is a class that represents a function package and
* implements the ValidatableFunctionPackage interface which decouples the
* function package from classloading.
*/
public class FunctionFilePackage implements AutoCloseable, ValidatableFunctionPackage {
private final File file;
private final ClassFileLocator.Compound classFileLocator;
private final TypePool typePool;
private final boolean isNar;
private final String narExtractionDirectory;
private final boolean enableClassloading;

private ClassLoader classLoader;

private final Object configMetadata;

public FunctionFilePackage(File file, String narExtractionDirectory, boolean enableClassloading,
Class<?> configClass) {
this.file = file;
boolean nonZeroFile = file.isFile() && file.length() > 0;
this.isNar = nonZeroFile ? ZipUtil.containsAnyEntry(file,
new String[] {"META-INF/services/pulsar-io.yaml", "META-INF/bundled-dependencies"}) : false;
this.narExtractionDirectory = narExtractionDirectory;
this.enableClassloading = enableClassloading;
if (isNar) {
List<File> classpathFromArchive = null;
try {
classpathFromArchive = NarClassLoader.getClasspathFromArchive(file, narExtractionDirectory);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
List<ClassFileLocator> classFileLocators = new ArrayList<>();
classFileLocators.add(ClassFileLocator.ForClassLoader.ofSystemLoader());
for (File classpath : classpathFromArchive) {
if (classpath.exists()) {
try {
ClassFileLocator locator;
if (classpath.isDirectory()) {
locator = new ClassFileLocator.ForFolder(classpath);
} else {
locator = ClassFileLocator.ForJarFile.of(classpath);
}
classFileLocators.add(locator);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
this.classFileLocator = new ClassFileLocator.Compound(classFileLocators);
this.typePool = TypePool.Default.of(classFileLocator);
try {
this.configMetadata = FunctionUtils.getPulsarIOServiceConfig(file, configClass);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
try {
this.classFileLocator = nonZeroFile
? new ClassFileLocator.Compound(ClassFileLocator.ForClassLoader.ofSystemLoader(),
ClassFileLocator.ForJarFile.of(file)) :
new ClassFileLocator.Compound(ClassFileLocator.ForClassLoader.ofSystemLoader());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
this.typePool =
TypePool.Default.of(classFileLocator);
this.configMetadata = null;
}
}

public TypeDescription resolveType(String className) {
return typePool.describe(className).resolve();
}

public boolean isNar() {
return isNar;
}

public File getFile() {
return file;
}

public TypePool getTypePool() {
return typePool;
}

@Override
public <T> T getFunctionMetaData(Class<T> clazz) {
return configMetadata != null ? clazz.cast(configMetadata) : null;
}

@Override
public synchronized void close() throws IOException {
classFileLocator.close();
if (classLoader instanceof Closeable) {
((Closeable) classLoader).close();
}
}

public boolean isEnableClassloading() {
return enableClassloading;
}

public synchronized ClassLoader getClassLoader() {
if (classLoader == null) {
classLoader = createClassLoader();
}
return classLoader;
}

private ClassLoader createClassLoader() {
if (enableClassloading) {
if (isNar) {
try {
return NarClassLoaderBuilder.builder()
.narFile(file)
.extractionDirectory(narExtractionDirectory)
.build();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
try {
return new URLClassLoader(new java.net.URL[] {file.toURI().toURL()},
NarClassLoader.class.getClassLoader());
} catch (MalformedURLException e) {
throw new UncheckedIOException(e);
}
}
} else {
throw new IllegalStateException("Classloading is not enabled");
}
}

@Override
public String toString() {
return "FunctionFilePackage{"
+ "file=" + file
+ ", isNar=" + isNar
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import java.io.File;
import java.io.IOException;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;

public class FunctionRuntimeCommon {
public static NarClassLoader extractNarClassLoader(File packageFile,
String narExtractionDirectory) {
if (packageFile != null) {
try {
return NarClassLoaderBuilder.builder()
.narFile(packageFile)
.extractionDirectory(narExtractionDirectory)
.build();
} catch (IOException e) {
throw new IllegalArgumentException(e.getMessage());
}
}
return null;
}

public static ClassLoader getClassLoaderFromPackage(
Function.FunctionDetails.ComponentType componentType,
String className,
File packageFile,
String narExtractionDirectory) {
String connectorClassName = className;
ClassLoader jarClassLoader = null;
boolean keepJarClassLoader = false;
NarClassLoader narClassLoader = null;
boolean keepNarClassLoader = false;

Exception jarClassLoaderException = null;
Exception narClassLoaderException = null;

try {
try {
jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
} catch (Exception e) {
jarClassLoaderException = e;
}
try {
narClassLoader = extractNarClassLoader(packageFile, narExtractionDirectory);
} catch (Exception e) {
narClassLoaderException = e;
}

// if connector class name is not provided, we can only try to load archive as a NAR
if (isEmpty(connectorClassName)) {
if (narClassLoader == null) {
throw new IllegalArgumentException(String.format("%s package does not have the correct format. "
+ "Pulsar cannot determine if the package is a NAR package or JAR package. "
+ "%s classname is not provided and attempts to load it as a NAR package produced "
+ "the following error.",
FunctionCommon.capFirstLetter(componentType), FunctionCommon.capFirstLetter(componentType)),
narClassLoaderException);
}
try {
if (componentType == Function.FunctionDetails.ComponentType.FUNCTION) {
connectorClassName = FunctionUtils.getFunctionClass(narClassLoader);
} else if (componentType == Function.FunctionDetails.ComponentType.SOURCE) {
connectorClassName = ConnectorUtils.getIOSourceClass(narClassLoader);
} else {
connectorClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
}
} catch (IOException e) {
throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
componentType.toString().toLowerCase()), e);
}

try {
narClassLoader.loadClass(connectorClassName);
keepNarClassLoader = true;
return narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(String.format("%s class %s must be in class path",
FunctionCommon.capFirstLetter(componentType), connectorClassName), e);
}

} else {
// if connector class name is provided, we need to try to load it as a JAR and as a NAR.
if (jarClassLoader != null) {
try {
jarClassLoader.loadClass(connectorClassName);
keepJarClassLoader = true;
return jarClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e) {
// class not found in JAR try loading as a NAR and searching for the class
if (narClassLoader != null) {

try {
narClassLoader.loadClass(connectorClassName);
keepNarClassLoader = true;
return narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e1) {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path",
FunctionCommon.capFirstLetter(componentType), connectorClassName), e1);
}
} else {
throw new IllegalArgumentException(String.format("%s class %s must be in class path",
FunctionCommon.capFirstLetter(componentType), connectorClassName), e);
}
}
} else if (narClassLoader != null) {
try {
narClassLoader.loadClass(connectorClassName);
keepNarClassLoader = true;
return narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e1) {
throw new IllegalArgumentException(
String.format("%s class %s must be in class path",
FunctionCommon.capFirstLetter(componentType), connectorClassName), e1);
}
} else {
StringBuilder errorMsg = new StringBuilder(FunctionCommon.capFirstLetter(componentType)
+ " package does not have the correct format."
+ " Pulsar cannot determine if the package is a NAR package or JAR package.");

if (jarClassLoaderException != null) {
errorMsg.append(
" Attempts to load it as a JAR package produced error: " + jarClassLoaderException
.getMessage());
}

if (narClassLoaderException != null) {
errorMsg.append(
" Attempts to load it as a NAR package produced error: " + narClassLoaderException
.getMessage());
}

throw new IllegalArgumentException(errorMsg.toString());
}
}
} finally {
if (!keepJarClassLoader) {
ClassLoaderUtils.closeClassLoader(jarClassLoader);
}
if (!keepNarClassLoader) {
ClassLoaderUtils.closeClassLoader(narClassLoader);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;

import java.io.IOException;
import java.io.UncheckedIOException;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.ClassFileLocator;
import net.bytebuddy.pool.TypePool;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;

/**
* LoadedFunctionPackage is a class that represents a function package and
* implements the ValidatableFunctionPackage interface which decouples the
* function package from classloading. This implementation is backed by
* a ClassLoader, and it is used when the function package is already loaded
* by a ClassLoader. This is the case in the LocalRunner and in some of
* the unit tests.
*/
public class LoadedFunctionPackage implements ValidatableFunctionPackage {
private final ClassLoader classLoader;
private final Object configMetadata;
private final TypePool typePool;

public <T> LoadedFunctionPackage(ClassLoader classLoader, Class<T> configMetadataClass, T configMetadata) {
this.classLoader = classLoader;
this.configMetadata = configMetadata;
typePool = TypePool.Default.of(
ClassFileLocator.ForClassLoader.of(classLoader));
}

public LoadedFunctionPackage(ClassLoader classLoader, Class<?> configMetadataClass) {
this.classLoader = classLoader;
if (classLoader instanceof NarClassLoader) {
try {
configMetadata = FunctionUtils.getPulsarIOServiceConfig((NarClassLoader) classLoader,
configMetadataClass);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
configMetadata = null;
}
typePool = TypePool.Default.of(
ClassFileLocator.ForClassLoader.of(classLoader));
}

@Override
public TypeDescription resolveType(String className) {
return typePool.describe(className).resolve();
}

@Override
public TypePool getTypePool() {
return typePool;
}

@Override
public <T> T getFunctionMetaData(Class<T> clazz) {
return configMetadata != null ? clazz.cast(configMetadata) : null;
}

@Override
public boolean isEnableClassloading() {
return true;
}

@Override
public ClassLoader getClassLoader() {
return classLoader;
}
}
Original file line number Diff line number Diff line change
@@ -41,23 +41,23 @@
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.description.type.TypeDefinition;
import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;

@Slf4j
public class SinkConfigUtils {
@@ -402,8 +402,8 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
}

public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConfig,
ClassLoader sinkClassLoader,
ClassLoader functionClassLoader,
ValidatableFunctionPackage sinkFunction,
ValidatableFunctionPackage transformFunction,
boolean validateConnectorConfig) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
@@ -443,63 +443,72 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
// if class name in sink config is not set, this should be a built-in sink
// thus we should try to find it class name in the NAR service definition
if (sinkClassName == null) {
try {
sinkClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) sinkClassLoader);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract sink class from archive", e);
ConnectorDefinition connectorDefinition = sinkFunction.getFunctionMetaData(ConnectorDefinition.class);
if (connectorDefinition == null) {
throw new IllegalArgumentException(
"Sink package doesn't contain the META-INF/services/pulsar-io.yaml file.");
}
sinkClassName = connectorDefinition.getSinkClass();
if (sinkClassName == null) {
throw new IllegalArgumentException("Failed to extract sink class from archive");
}
}

// check if sink implements the correct interfaces
Class sinkClass;
TypeDefinition sinkClass;
try {
sinkClass = sinkClassLoader.loadClass(sinkClassName);
} catch (ClassNotFoundException e) {
sinkClass = sinkFunction.resolveType(sinkClassName);
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("Sink class %s not found in class loader", sinkClassName), e);
String.format("Sink class %s not found", sinkClassName), e);
}

String functionClassName = sinkConfig.getTransformFunctionClassName();
Class<?> typeArg;
ClassLoader inputClassLoader;
if (functionClassLoader != null) {
TypeDefinition typeArg;
ValidatableFunctionPackage inputFunction;
if (transformFunction != null) {
// if function class name in sink config is not set, this should be a built-in function
// thus we should try to find it class name in the NAR service definition
if (functionClassName == null) {
try {
functionClassName = FunctionUtils.getFunctionClass(functionClassLoader);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract function class from archive", e);
FunctionDefinition functionDefinition =
transformFunction.getFunctionMetaData(FunctionDefinition.class);
if (functionDefinition == null) {
throw new IllegalArgumentException(
"Function package doesn't contain the META-INF/services/pulsar-io.yaml file.");
}
functionClassName = functionDefinition.getFunctionClass();
if (functionClassName == null) {
throw new IllegalArgumentException("Transform function class name must be set");
}
}
Class functionClass;
TypeDefinition functionClass;
try {
functionClass = functionClassLoader.loadClass(functionClassName);
} catch (ClassNotFoundException e) {
functionClass = transformFunction.resolveType(functionClassName);
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("Function class %s not found in class loader", functionClassName), e);
String.format("Function class %s not found", functionClassName), e);
}
// extract type from transform function class
if (!getRawFunctionTypes(functionClass, false)[1].equals(Record.class)) {
if (!getRawFunctionTypes(functionClass, false)[1].asErasure().isAssignableTo(Record.class)) {
throw new IllegalArgumentException("Sink transform function output must be of type Record");
}
typeArg = getFunctionTypes(functionClass, false)[0];
inputClassLoader = functionClassLoader;
inputFunction = transformFunction;
} else {
// extract type from sink class
typeArg = getSinkType(sinkClass);
inputClassLoader = sinkClassLoader;
inputFunction = sinkFunction;
}

if (sinkConfig.getTopicToSerdeClassName() != null) {
for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
ValidatorUtils.validateSerde(serdeClassName, typeArg, inputClassLoader, true);
ValidatorUtils.validateSerde(serdeClassName, typeArg, inputFunction.getTypePool(), true);
}
}

if (sinkConfig.getTopicToSchemaType() != null) {
for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
ValidatorUtils.validateSchema(schemaType, typeArg, inputClassLoader, true);
ValidatorUtils.validateSchema(schemaType, typeArg, inputFunction.getTypePool(), true);
}
}

@@ -512,23 +521,43 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
}
if (!isEmpty(consumerSpec.getSerdeClassName())) {
ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg, inputClassLoader, true);
ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg,
inputFunction.getTypePool(), true);
}
if (!isEmpty(consumerSpec.getSchemaType())) {
ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, inputClassLoader, true);
ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg,
inputFunction.getTypePool(), true);
}
if (consumerSpec.getCryptoConfig() != null) {
ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(), inputClassLoader, false);
ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(),
inputFunction.getTypePool(), false);
}
}
}

// validate user defined config if enabled and sink is loaded from NAR
if (validateConnectorConfig && sinkClassLoader instanceof NarClassLoader) {
validateSinkConfig(sinkConfig, (NarClassLoader) sinkClassLoader);
if (sinkConfig.getRetainKeyOrdering() != null
&& sinkConfig.getRetainKeyOrdering()
&& sinkConfig.getProcessingGuarantees() != null
&& sinkConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException(
"When effectively once processing guarantee is specified, retain Key ordering cannot be set");
}

if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()
&& sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) {
throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}

return new ExtractedSinkDetails(sinkClassName, typeArg.getName(), functionClassName);
// validate user defined config if enabled and classloading is enabled
if (validateConnectorConfig) {
if (sinkFunction.isEnableClassloading()) {
validateSinkConfig(sinkConfig, sinkFunction);
} else {
log.warn("Skipping annotation based validation of sink config as classloading is disabled");
}
}

return new ExtractedSinkDetails(sinkClassName, typeArg.asErasure().getTypeName(), functionClassName);
}

public static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
@@ -684,29 +713,13 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
return mergedConfig;
}

public static void validateSinkConfig(SinkConfig sinkConfig, NarClassLoader narClassLoader) {

if (sinkConfig.getRetainKeyOrdering() != null
&& sinkConfig.getRetainKeyOrdering()
&& sinkConfig.getProcessingGuarantees() != null
&& sinkConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException(
"When effectively once processing guarantee is specified, retain Key ordering cannot be set");
}

if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()
&& sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) {
throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}

public static void validateSinkConfig(SinkConfig sinkConfig, ValidatableFunctionPackage sinkFunction) {
try {
ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(narClassLoader);
if (defn.getSinkConfigClass() != null) {
Class configClass = Class.forName(defn.getSinkConfigClass(), true, narClassLoader);
ConnectorDefinition defn = sinkFunction.getFunctionMetaData(ConnectorDefinition.class);
if (defn != null && defn.getSinkConfigClass() != null) {
Class configClass = Class.forName(defn.getSinkConfigClass(), true, sinkFunction.getClassLoader());
validateSinkConfig(sinkConfig, configClass);
}
} catch (IOException e) {
throw new IllegalArgumentException("Error validating sink config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find sink config class", e);
}
Original file line number Diff line number Diff line change
@@ -35,7 +35,9 @@
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import net.bytebuddy.description.type.TypeDefinition;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
@@ -44,13 +46,11 @@
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Source;

@@ -294,7 +294,7 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
}

public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sourceConfig,
ClassLoader sourceClassLoader,
ValidatableFunctionPackage sourceFunction,
boolean validateConnectorConfig) {
if (isEmpty(sourceConfig.getTenant())) {
throw new IllegalArgumentException("Source tenant cannot be null");
@@ -319,29 +319,34 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
// if class name in source config is not set, this should be a built-in source
// thus we should try to find it class name in the NAR service definition
if (sourceClassName == null) {
try {
sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) sourceClassLoader);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract source class from archive", e);
ConnectorDefinition connectorDefinition = sourceFunction.getFunctionMetaData(ConnectorDefinition.class);
if (connectorDefinition == null) {
throw new IllegalArgumentException(
"Source package doesn't contain the META-INF/services/pulsar-io.yaml file.");
}
sourceClassName = connectorDefinition.getSourceClass();
if (sourceClassName == null) {
throw new IllegalArgumentException("Failed to extract source class from archive");
}
}

// check if source implements the correct interfaces
Class sourceClass;
TypeDescription sourceClass;
try {
sourceClass = sourceClassLoader.loadClass(sourceClassName);
} catch (ClassNotFoundException e) {
sourceClass = sourceFunction.resolveType(sourceClassName);
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("Source class %s not found in class loader", sourceClassName), e);
}

if (!Source.class.isAssignableFrom(sourceClass) && !BatchSource.class.isAssignableFrom(sourceClass)) {
if (!(sourceClass.asErasure().isAssignableTo(Source.class) || sourceClass.asErasure()
.isAssignableTo(BatchSource.class))) {
throw new IllegalArgumentException(
String.format("Source class %s does not implement the correct interface",
sourceClass.getName()));
String.format("Source class %s does not implement the correct interface",
sourceClass.getName()));
}

if (BatchSource.class.isAssignableFrom(sourceClass)) {
if (sourceClass.asErasure().isAssignableTo(BatchSource.class)) {
if (sourceConfig.getBatchSourceConfig() != null) {
validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
} else {
@@ -352,7 +357,14 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
}

// extract type from source class
Class<?> typeArg = getSourceType(sourceClass);
TypeDefinition typeArg;

try {
typeArg = getSourceType(sourceClass);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to resolve type for Source class %s", sourceClassName), e);
}

// Only one of serdeClassName or schemaType should be set
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils
@@ -361,29 +373,30 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
}

if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), typeArg, sourceClassLoader, false);
ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), typeArg, sourceFunction.getTypePool(),
false);
}
if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, sourceClassLoader, false);
ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, sourceFunction.getTypePool(),
false);
}

if (sourceConfig.getProducerConfig() != null && sourceConfig.getProducerConfig().getCryptoConfig() != null) {
ValidatorUtils
.validateCryptoKeyReader(sourceConfig.getProducerConfig().getCryptoConfig(), sourceClassLoader,
true);
.validateCryptoKeyReader(sourceConfig.getProducerConfig().getCryptoConfig(),
sourceFunction.getTypePool(), true);
}

if (typeArg.equals(TypeResolver.Unknown.class)) {
throw new IllegalArgumentException(
String.format("Failed to resolve type for Source class %s", sourceClassName));
}

// validate user defined config if enabled and source is loaded from NAR
if (validateConnectorConfig && sourceClassLoader instanceof NarClassLoader) {
validateSourceConfig(sourceConfig, (NarClassLoader) sourceClassLoader);
// validate user defined config if enabled and classloading is enabled
if (validateConnectorConfig) {
if (sourceFunction.isEnableClassloading()) {
validateSourceConfig(sourceConfig, sourceFunction);
} else {
log.warn("Skipping annotation based validation of sink config as classloading is disabled");
}
}

return new ExtractedSourceDetails(sourceClassName, typeArg.getName());
return new ExtractedSourceDetails(sourceClassName, typeArg.asErasure().getTypeName());
}

@SneakyThrows
@@ -524,15 +537,14 @@ public static void validateBatchSourceConfigUpdate(BatchSourceConfig existingCon
}
}

public static void validateSourceConfig(SourceConfig sourceConfig, NarClassLoader narClassLoader) {
public static void validateSourceConfig(SourceConfig sourceConfig, ValidatableFunctionPackage sourceFunction) {
try {
ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(narClassLoader);
if (defn.getSourceConfigClass() != null) {
Class configClass = Class.forName(defn.getSourceConfigClass(), true, narClassLoader);
ConnectorDefinition defn = sourceFunction.getFunctionMetaData(ConnectorDefinition.class);
if (defn != null && defn.getSourceConfigClass() != null) {
Class configClass =
Class.forName(defn.getSourceConfigClass(), true, sourceFunction.getClassLoader());
validateSourceConfig(sourceConfig, configClass);
}
} catch (IOException e) {
throw new IllegalArgumentException("Error validating source config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find source config class");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;

import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.pool.TypePool;

/**
* This abstraction separates the function and connector definition from classloading,
* enabling validation without the need for classloading. It utilizes Byte Buddy for
* type and annotation resolution.
*
* The function or connector definition is directly extracted from the archive file,
* eliminating the need for classloader initialization.
*
* The getClassLoader method should only be invoked when classloading is enabled.
* Classloading is required in the LocalRunner and in the Functions worker when the
* worker is configured with the 'validateConnectorConfig' set to true.
*/
public interface ValidatableFunctionPackage {
/**
* Resolves the type description for the given class name within the function package.
*/
TypeDescription resolveType(String className);
/**
* Returns the Byte Buddy TypePool instance for the function package.
*/
TypePool getTypePool();
/**
* Returns the function or connector definition metadata.
* Supports FunctionDefinition and ConnectorDefinition as the metadata type.
*/
<T> T getFunctionMetaData(Class<T> clazz);
/**
* Returns if classloading is enabled for the function package.
*/
boolean isEnableClassloading();
/**
* Returns the classloader for the function package. The classloader is
* lazily initialized when classloading is enabled.
*/
ClassLoader getClassLoader();
}
Original file line number Diff line number Diff line change
@@ -18,35 +18,40 @@
*/
package org.apache.pulsar.functions.utils;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import net.bytebuddy.description.type.TypeDefinition;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.pool.TypePool;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;

@Slf4j
public class ValidatorUtils {
private static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";

public static void validateSchema(String schemaType, Class<?> typeArg, ClassLoader clsLoader,
public static void validateSchema(String schemaType, TypeDefinition typeArg, TypePool typePool,
boolean input) {
if (isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) {
// If it's empty, we use the default schema and no need to validate
// If it's built-in, no need to validate
} else {
ClassLoaderUtils.implementsClass(schemaType, Schema.class, clsLoader);
validateSchemaType(schemaType, typeArg, clsLoader, input);
TypeDescription schemaClass = null;
try {
schemaClass = typePool.describe(schemaType).resolve();
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("The schema class %s does not exist", schemaType));
}
if (!schemaClass.asErasure().isAssignableTo(Schema.class)) {
throw new IllegalArgumentException(
String.format("%s does not implement %s", schemaType, Schema.class.getName()));
}
validateSchemaType(schemaClass, typeArg, typePool, input);
}
}

@@ -60,192 +65,94 @@ private static SchemaType getBuiltinSchemaType(String schemaTypeOrClassName) {
}


public static void validateCryptoKeyReader(CryptoConfig conf, ClassLoader classLoader, boolean isProducer) {
public static void validateCryptoKeyReader(CryptoConfig conf, TypePool typePool, boolean isProducer) {
if (isEmpty(conf.getCryptoKeyReaderClassName())) {
return;
}

Class<?> cryptoClass;
String cryptoClassName = conf.getCryptoKeyReaderClassName();
TypeDescription cryptoClass = null;
try {
cryptoClass = ClassLoaderUtils.loadClass(conf.getCryptoKeyReaderClassName(), classLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
cryptoClass = typePool.describe(cryptoClassName).resolve();
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("The crypto key reader class %s does not exist", conf.getCryptoKeyReaderClassName()));
String.format("The crypto key reader class %s does not exist", cryptoClassName));
}
if (!cryptoClass.asErasure().isAssignableTo(CryptoKeyReader.class)) {
throw new IllegalArgumentException(
String.format("%s does not implement %s", cryptoClassName, CryptoKeyReader.class.getName()));
}
ClassLoaderUtils.implementsClass(conf.getCryptoKeyReaderClassName(), CryptoKeyReader.class, classLoader);

try {
cryptoClass.getConstructor(Map.class);
} catch (NoSuchMethodException ex) {
boolean hasConstructor = cryptoClass.getDeclaredMethods().stream()
.anyMatch(method -> method.isConstructor() && method.getParameters().size() == 1
&& method.getParameters().get(0).getType().asErasure().represents(Map.class));

if (!hasConstructor) {
throw new IllegalArgumentException(
String.format("The crypto key reader class %s does not implement the desired constructor.",
conf.getCryptoKeyReaderClassName()));

} catch (SecurityException e) {
throw new IllegalArgumentException("Failed to access crypto key reader class", e);
}

if (isProducer && (conf.getEncryptionKeys() == null || conf.getEncryptionKeys().length == 0)) {
throw new IllegalArgumentException("Missing encryption key name for producer crypto key reader");
}
}

public static void validateSerde(String inputSerializer, Class<?> typeArg, ClassLoader clsLoader,
public static void validateSerde(String inputSerializer, TypeDefinition typeArg, TypePool typePool,
boolean deser) {
if (isEmpty(inputSerializer)) {
return;
}
if (inputSerializer.equals(DEFAULT_SERDE)) {
return;
}
TypeDescription serdeClass;
try {
Class<?> serdeClass = ClassLoaderUtils.loadClass(inputSerializer, clsLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
serdeClass = typePool.describe(inputSerializer).resolve();
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("The input serialization/deserialization class %s does not exist",
inputSerializer));
}
ClassLoaderUtils.implementsClass(inputSerializer, SerDe.class, clsLoader);

SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
if (serDe == null) {
throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
inputSerializer));
}
Class<?>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());

// type inheritance information seems to be lost in generic type
// load the actual type class for verification
Class<?> fnInputClass;
Class<?> serdeInputClass;
try {
fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException("Failed to load type class", e);
}
TypeDescription.Generic serDeTypeArg = serdeClass.getInterfaces().stream()
.filter(i -> i.asErasure().isAssignableTo(SerDe.class))
.findFirst()
.map(i -> i.getTypeArguments().get(0))
.orElseThrow(() -> new IllegalArgumentException(
String.format("%s does not implement %s", inputSerializer, SerDe.class.getName())));

if (deser) {
if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
if (!serDeTypeArg.asErasure().isAssignableTo(typeArg.asErasure())) {
throw new IllegalArgumentException("Serializer type mismatch " + typeArg.getActualName() + " vs "
+ serDeTypeArg.getActualName());
}
} else {
if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
if (!serDeTypeArg.asErasure().isAssignableFrom(typeArg.asErasure())) {
throw new IllegalArgumentException("Serializer type mismatch " + typeArg.getActualName() + " vs "
+ serDeTypeArg.getActualName());
}
}
}

private static void validateSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader,
private static void validateSchemaType(TypeDefinition schema, TypeDefinition typeArg, TypePool typePool,
boolean input) {
Schema<?> schema = (Schema<?>) Reflections.createInstance(schemaClassName, clsLoader);
if (schema == null) {
throw new IllegalArgumentException(String.format("The Schema class %s does not exist",
schemaClassName));
}
Class<?>[] schemaTypes = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());

// type inheritance information seems to be lost in generic type
// load the actual type class for verification
Class<?> fnInputClass;
Class<?> schemaInputClass;
try {
fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
schemaInputClass = Class.forName(schemaTypes[0].getName(), true, clsLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException("Failed to load type class", e);
}
TypeDescription.Generic schemaTypeArg = schema.getInterfaces().stream()
.filter(i -> i.asErasure().isAssignableTo(Schema.class))
.findFirst()
.map(i -> i.getTypeArguments().get(0))
.orElse(null);

if (input) {
if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
if (!schemaTypeArg.asErasure().isAssignableTo(typeArg.asErasure())) {
throw new IllegalArgumentException(
"Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
"Schema type mismatch " + typeArg.getActualName() + " vs " + schemaTypeArg.getActualName());
}
} else {
if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
if (!schemaTypeArg.asErasure().isAssignableFrom(typeArg.asErasure())) {
throw new IllegalArgumentException(
"Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
}
}
}


public static void validateFunctionClassTypes(ClassLoader classLoader,
Function.FunctionDetails.Builder functionDetailsBuilder) {

// validate only if classLoader is provided
if (classLoader == null) {
return;
}

if (isBlank(functionDetailsBuilder.getClassName())) {
throw new IllegalArgumentException("Function class-name can't be empty");
}

// validate function class-type
Class functionClass;
try {
functionClass = classLoader.loadClass(functionDetailsBuilder.getClassName());
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionDetailsBuilder.getClassName()), e);
}
Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionClass, false);

if (!(org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass))
&& !(java.util.function.Function.class.isAssignableFrom(functionClass))) {
throw new RuntimeException("User class must either be Function or java.util.Function");
}

if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource() != null
&& isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
try {
String sourceClassName = functionDetailsBuilder.getSource().getClassName();
String argClassName = FunctionCommon.getTypeArg(sourceClassName, Source.class, classLoader).getName();
functionDetailsBuilder
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));

// if sink-class not present then set same arg as source
if (!functionDetailsBuilder.hasSink() || isBlank(functionDetailsBuilder.getSink().getClassName())) {
functionDetailsBuilder
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
}

} catch (IllegalArgumentException ie) {
throw ie;
} catch (Exception e) {
log.error("Failed to validate source class", e);
throw new IllegalArgumentException("Failed to validate source class-name", e);
}
} else if (isBlank(functionDetailsBuilder.getSourceBuilder().getTypeClassName())) {
// if function-src-class is not present then set function-src type-class according to function class
functionDetailsBuilder
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName()));
}

if (functionDetailsBuilder.hasSink() && functionDetailsBuilder.getSink() != null
&& isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
try {
String sinkClassName = functionDetailsBuilder.getSink().getClassName();
String argClassName = FunctionCommon.getTypeArg(sinkClassName, Sink.class, classLoader).getName();
functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));

// if source-class not present then set same arg as sink
if (!functionDetailsBuilder.hasSource() || isBlank(functionDetailsBuilder.getSource().getClassName())) {
functionDetailsBuilder
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
}

} catch (IllegalArgumentException ie) {
throw ie;
} catch (Exception e) {
log.error("Failed to validate sink class", e);
throw new IllegalArgumentException("Failed to validate sink class-name", e);
"Schema type mismatch " + typeArg.getActualName() + " vs " + schemaTypeArg.getActualName());
}
} else if (isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())) {
// if function-sink-class is not present then set function-sink type-class according to function class
functionDetailsBuilder
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
}
}
}
Original file line number Diff line number Diff line change
@@ -19,14 +19,50 @@
package org.apache.pulsar.functions.utils.functions;

import java.nio.file.Path;
import lombok.Builder;
import lombok.Data;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.functions.utils.FunctionFilePackage;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;

@Builder
@Data
public class FunctionArchive {
private Path archivePath;
private ClassLoader classLoader;
private FunctionDefinition functionDefinition;
public class FunctionArchive implements AutoCloseable {
private final Path archivePath;
private final FunctionDefinition functionDefinition;
private final String narExtractionDirectory;
private final boolean enableClassloading;
private ValidatableFunctionPackage functionPackage;
private boolean closed;

public FunctionArchive(Path archivePath, FunctionDefinition functionDefinition, String narExtractionDirectory,
boolean enableClassloading) {
this.archivePath = archivePath;
this.functionDefinition = functionDefinition;
this.narExtractionDirectory = narExtractionDirectory;
this.enableClassloading = enableClassloading;
}

public Path getArchivePath() {
return archivePath;
}

public synchronized ValidatableFunctionPackage getFunctionPackage() {
if (closed) {
throw new IllegalStateException("FunctionArchive is already closed");
}
if (functionPackage == null) {
functionPackage = new FunctionFilePackage(archivePath.toFile(), narExtractionDirectory, enableClassloading,
FunctionDefinition.class);
}
return functionPackage;
}

public FunctionDefinition getFunctionDefinition() {
return functionDefinition;
}

@Override
public synchronized void close() throws Exception {
closed = true;
if (functionPackage instanceof AutoCloseable) {
((AutoCloseable) functionPackage).close();
}
}
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.utils.functions;

import java.io.File;
@@ -30,10 +31,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.utils.Exceptions;
import org.zeroturnaround.zip.ZipUtil;


@UtilityClass
@@ -45,43 +44,40 @@ public class FunctionUtils {
/**
* Extract the Pulsar Function class from a function or archive.
*/
public static String getFunctionClass(ClassLoader classLoader) throws IOException {
NarClassLoader ncl = (NarClassLoader) classLoader;
String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);

FunctionDefinition conf = ObjectMapperFactory.getYamlMapper().reader().readValue(configStr,
FunctionDefinition.class);
if (StringUtils.isEmpty(conf.getFunctionClass())) {
throw new IOException(
String.format("The '%s' functionctor does not provide a function implementation", conf.getName()));
}
public static String getFunctionClass(File narFile) throws IOException {
return getFunctionDefinition(narFile).getFunctionClass();
}

try {
// Try to load source class and check it implements Function interface
Class functionClass = ncl.loadClass(conf.getFunctionClass());
if (!(Function.class.isAssignableFrom(functionClass))) {
throw new IOException(
"Class " + conf.getFunctionClass() + " does not implement interface " + Function.class
.getName());
}
} catch (Throwable t) {
Exceptions.rethrowIOException(t);
public static FunctionDefinition getFunctionDefinition(File narFile) throws IOException {
return getPulsarIOServiceConfig(narFile, FunctionDefinition.class);
}

public static <T> T getPulsarIOServiceConfig(File narFile, Class<T> valueType) throws IOException {
String filename = "META-INF/services/" + PULSAR_IO_SERVICE_NAME;
byte[] configEntry = ZipUtil.unpackEntry(narFile, filename);
if (configEntry != null) {
return ObjectMapperFactory.getYamlMapper().reader().readValue(configEntry, valueType);
} else {
return null;
}
}

return conf.getFunctionClass();
public static String getFunctionClass(NarClassLoader narClassLoader) throws IOException {
return getFunctionDefinition(narClassLoader).getFunctionClass();
}

public static FunctionDefinition getFunctionDefinition(NarClassLoader narClassLoader) throws IOException {
String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
return ObjectMapperFactory.getYamlMapper().reader().readValue(configStr, FunctionDefinition.class);
return getPulsarIOServiceConfig(narClassLoader, FunctionDefinition.class);
}

public static TreeMap<String, FunctionArchive> searchForFunctions(String functionsDirectory) throws IOException {
return searchForFunctions(functionsDirectory, false);
public static <T> T getPulsarIOServiceConfig(NarClassLoader narClassLoader, Class<T> valueType) throws IOException {
return ObjectMapperFactory.getYamlMapper().reader()
.readValue(narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME), valueType);
}

public static TreeMap<String, FunctionArchive> searchForFunctions(String functionsDirectory,
boolean alwaysPopulatePath) throws IOException {
String narExtractionDirectory,
boolean enableClassloading) throws IOException {
Path path = Paths.get(functionsDirectory).toAbsolutePath();
log.info("Searching for functions in {}", path);

@@ -95,22 +91,12 @@ public static TreeMap<String, FunctionArchive> searchForFunctions(String functio
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {

NarClassLoader ncl = NarClassLoaderBuilder.builder()
.narFile(new File(archive.toString()))
.build();

FunctionArchive.FunctionArchiveBuilder functionArchiveBuilder = FunctionArchive.builder();
FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(ncl);
FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toFile());
log.info("Found function {} from {}", cntDef, archive);

functionArchiveBuilder.archivePath(archive);

functionArchiveBuilder.classLoader(ncl);
functionArchiveBuilder.functionDefinition(cntDef);

if (alwaysPopulatePath || !StringUtils.isEmpty(cntDef.getFunctionClass())) {
functions.put(cntDef.getName(), functionArchiveBuilder.build());
if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
FunctionArchive functionArchive =
new FunctionArchive(archive, cntDef, narExtractionDirectory, enableClassloading);
functions.put(cntDef.getName(), functionArchive);
}
} catch (Throwable t) {
log.warn("Failed to load function from {}", archive, t);
Original file line number Diff line number Diff line change
@@ -20,17 +20,79 @@

import java.nio.file.Path;
import java.util.List;
import lombok.Builder;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.utils.FunctionFilePackage;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;

@Builder
@Data
public class Connector {
private Path archivePath;
public class Connector implements AutoCloseable {
private final Path archivePath;
private final String narExtractionDirectory;
private final boolean enableClassloading;
private ValidatableFunctionPackage connectorFunctionPackage;
private List<ConfigFieldDefinition> sourceConfigFieldDefinitions;
private List<ConfigFieldDefinition> sinkConfigFieldDefinitions;
private ClassLoader classLoader;
private ConnectorDefinition connectorDefinition;
private boolean closed;

public Connector(Path archivePath, ConnectorDefinition connectorDefinition, String narExtractionDirectory,
boolean enableClassloading) {
this.archivePath = archivePath;
this.connectorDefinition = connectorDefinition;
this.narExtractionDirectory = narExtractionDirectory;
this.enableClassloading = enableClassloading;
}

public Path getArchivePath() {
return archivePath;
}

public synchronized ValidatableFunctionPackage getConnectorFunctionPackage() {
checkState();
if (connectorFunctionPackage == null) {
connectorFunctionPackage =
new FunctionFilePackage(archivePath.toFile(), narExtractionDirectory, enableClassloading,
ConnectorDefinition.class);
}
return connectorFunctionPackage;
}

private void checkState() {
if (closed) {
throw new IllegalStateException("Connector is already closed");
}
}

public synchronized List<ConfigFieldDefinition> getSourceConfigFieldDefinitions() {
checkState();
if (sourceConfigFieldDefinitions == null && !StringUtils.isEmpty(connectorDefinition.getSourceClass())
&& !StringUtils.isEmpty(connectorDefinition.getSourceConfigClass())) {
sourceConfigFieldDefinitions = ConnectorUtils.getConnectorConfigDefinition(getConnectorFunctionPackage(),
connectorDefinition.getSourceConfigClass());
}
return sourceConfigFieldDefinitions;
}

public synchronized List<ConfigFieldDefinition> getSinkConfigFieldDefinitions() {
checkState();
if (sinkConfigFieldDefinitions == null && !StringUtils.isEmpty(connectorDefinition.getSinkClass())
&& !StringUtils.isEmpty(connectorDefinition.getSinkConfigClass())) {
sinkConfigFieldDefinitions = ConnectorUtils.getConnectorConfigDefinition(getConnectorFunctionPackage(),
connectorDefinition.getSinkConfigClass());
}
return sinkConfigFieldDefinitions;
}

public ConnectorDefinition getConnectorDefinition() {
return connectorDefinition;
}

@Override
public synchronized void close() throws Exception {
closed = true;
if (connectorFunctionPackage instanceof AutoCloseable) {
((AutoCloseable) connectorFunctionPackage).close();
}
}
}
Loading

0 comments on commit 4946e62

Please sign in to comment.