Skip to content

Commit

Permalink
Make Nar Extraction Directory configurable (apache#6933)
Browse files Browse the repository at this point in the history
* Make Nar Extraction Directory configurable

* Fixed unittests

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
  • Loading branch information
2 people authored and huangdx0726 committed Aug 24, 2020
1 parent 4fd986e commit a28da84
Show file tree
Hide file tree
Showing 41 changed files with 215 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ public class OffloaderUtils {
* @return the offloader class name
* @throws IOException when fail to retrieve the pulsar offloader class
*/
static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath) throws IOException {
static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath, String narExtractionDirectory) throws IOException {
// need to load offloader NAR to the classloader that also loaded LedgerOffloaderFactory in case
// LedgerOffloaderFactory is loaded by a classloader that is not the default classloader
// as is the case for the pulsar presto plugin
NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), LedgerOffloaderFactory.class.getClassLoader());
NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
LedgerOffloaderFactory.class.getClassLoader(), narExtractionDirectory);
String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);

OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml()
Expand Down Expand Up @@ -105,15 +106,15 @@ private static void rethrowIOException(Throwable cause)
}
}

public static OffloaderDefinition getOffloaderDefinition(String narPath) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
public static OffloaderDefinition getOffloaderDefinition(String narPath, String narExtractionDirectory) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);

return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, OffloaderDefinition.class);
}
}

public static Offloaders searchForOffloaders(String connectorsDirectory) throws IOException {
public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for offloaders in {}", path);

Expand All @@ -127,13 +128,13 @@ public static Offloaders searchForOffloaders(String connectorsDirectory) throws
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
stream.forEach(archive -> {
try {
OffloaderDefinition definition = getOffloaderDefinition(archive.toString());
OffloaderDefinition definition = getOffloaderDefinition(archive.toString(), narExtractionDirectory);
log.info("Found offloader {} from {}", definition, archive);

if (!StringUtils.isEmpty(definition.getOffloaderFactoryClass())) {
// Validate offloader factory class to be present and of the right type
Pair<NarClassLoader, LedgerOffloaderFactory> offloaderFactoryPair =
getOffloaderFactory(archive.toString());
getOffloaderFactory(archive.toString(), narExtractionDirectory);
if (null != offloaderFactoryPair) {
offloaders.getOffloaders().add(offloaderFactoryPair);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -1563,6 +1564,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int managedLedgerOffloadMaxThreads = 2;

@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The directory where nar Extraction of offloaders happens"
)
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;

@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Maximum prefetch rounds for ledger reading for offloading"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());
this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory());
this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class ProtocolHandlerUtils {
* @return the protocol handler definition
* @throws IOException when fail to load the protocol handler or get the definition
*/
public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath, String narExtractionDirectory) throws IOException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
return getProtocolHandlerDefinition(ncl);
}
}
Expand All @@ -70,7 +70,8 @@ private static ProtocolHandlerDefinition getProtocolHandlerDefinition(NarClassLo
* @return a collection of protocol handlers
* @throws IOException when fail to load the available protocol handlers from the provided directory.
*/
public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory) throws IOException {
public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory,
String narExtractionDirectory) throws IOException {
Path path = Paths.get(handlersDirectory).toAbsolutePath();
log.info("Searching for protocol handlers in {}", path);

Expand All @@ -84,7 +85,7 @@ public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirect
for (Path archive : stream) {
try {
ProtocolHandlerDefinition phDef =
ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString());
ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString(), narExtractionDirectory);
log.info("Found protocol handler from {} : {}", archive, phDef);

checkArgument(StringUtils.isNotBlank(phDef.getName()));
Expand Down Expand Up @@ -113,11 +114,12 @@ public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirect
* @param metadata the protocol handler definition.
* @return
*/
static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata) throws IOException {
static ProtocolHandlerWithClassLoader load(ProtocolHandlerMetadata metadata,
String narExtractionDirectory) throws IOException {
NarClassLoader ncl = NarClassLoader.getFromArchive(
metadata.getArchivePath().toAbsolutePath().toFile(),
Collections.emptySet(),
ProtocolHandler.class.getClassLoader());
ProtocolHandler.class.getClassLoader(), narExtractionDirectory);

ProtocolHandlerDefinition phDef = getProtocolHandlerDefinition(ncl);
if (StringUtils.isBlank(phDef.getHandlerClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ProtocolHandlers implements AutoCloseable {
*/
public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException {
ProtocolHandlerDefinitions definitions =
ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory());
ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory(), conf.getNarExtractionDirectory());

ImmutableMap.Builder<String, ProtocolHandlerWithClassLoader> handlersBuilder = ImmutableMap.builder();

Expand All @@ -60,7 +60,7 @@ public static ProtocolHandlers load(ServiceConfiguration conf) throws IOExceptio

ProtocolHandlerWithClassLoader handler;
try {
handler = ProtocolHandlerUtils.load(definition);
handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory());
} catch (IOException e) {
log.error("Failed to load the protocol handler for protocol `" + protocol + "`", e);
throw new RuntimeException("Failed to load the protocol handler for protocol `" + protocol + "`");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ public void testLoadProtocolHandler() throws Exception {
PowerMockito.when(NarClassLoader.getFromArchive(
any(File.class),
any(Set.class),
any(ClassLoader.class)
any(ClassLoader.class),
any(String.class)
)).thenReturn(mockLoader);

ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata);
ProtocolHandlerWithClassLoader returnedPhWithCL = ProtocolHandlerUtils.load(metadata, "");
ProtocolHandler returnedPh = returnedPhWithCL.getHandler();

assertSame(mockLoader, returnedPhWithCL.getClassLoader());
Expand Down Expand Up @@ -107,11 +108,12 @@ public void testLoadProtocolHandlerBlankHandlerClass() throws Exception {
PowerMockito.when(NarClassLoader.getFromArchive(
any(File.class),
any(Set.class),
any(ClassLoader.class)
any(ClassLoader.class),
any(String.class)
)).thenReturn(mockLoader);

try {
ProtocolHandlerUtils.load(metadata);
ProtocolHandlerUtils.load(metadata, "");
fail("Should not reach here");
} catch (IOException ioe) {
// expected
Expand Down Expand Up @@ -141,11 +143,12 @@ public void testLoadProtocolHandlerWrongHandlerClass() throws Exception {
PowerMockito.when(NarClassLoader.getFromArchive(
any(File.class),
any(Set.class),
any(ClassLoader.class)
any(ClassLoader.class),
any(String.class)
)).thenReturn(mockLoader);

try {
ProtocolHandlerUtils.load(metadata);
ProtocolHandlerUtils.load(metadata, "");
fail("Should not reach here");
} catch (IOException ioe) {
// expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,33 @@ public class NarClassLoader extends URLClassLoader {

private static final String TMP_DIR_PREFIX = "pulsar-nar";

private static final File NAR_CACHE_DIR = new File(System.getProperty("java.io.tmpdir") + "/" + TMP_DIR_PREFIX);
public static final String DEFAULT_NAR_EXTRACTION_DIR = System.getProperty("java.io.tmpdir");

public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars,
String narExtractionDirectory) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
try {
return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader());
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IOException(e);
}
}

public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent)
public static NarClassLoader getFromArchive(File narPath, Set<String> additionalJars, ClassLoader parent,
String narExtractionDirectory)
throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, NAR_CACHE_DIR);
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
try {
return new NarClassLoader(unpacked, additionalJars, parent);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IOException(e);
}
}

private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}

/**
* Construct a nar class loader.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static Provider getBcProvider(String loaderDirectory) throws IOException {
NarClassLoader ncl = NarClassLoader.getFromArchive(
new File(narPath),
Collections.emptySet(),
BCLoader.class.getClassLoader());
BCLoader.class.getClassLoader(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
String configStr = ncl.getServiceDefinition(BC_DEF_NAME);

BcNarDefinition nar = ObjectMapperFactory.getThreadLocalYaml()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,24 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private final ClassLoader instanceClassLoader;
private ClassLoader functionClassLoader;
private String narExtractionDirectory;

public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
PulsarClient pulsarClient,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry) {
CollectorRegistry collectorRegistry,
String narExtractionDirectory) {
this.instanceConfig = instanceConfig;
this.fnCache = fnCache;
this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
Expand Down Expand Up @@ -304,7 +307,7 @@ private ClassLoader loadJars() throws Exception {
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
jarFile);
jarFile, narExtractionDirectory);
} catch (FileNotFoundException e) {
// create the function class loader
fnCache.registerFunctionInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private static InstanceConfig createInstanceConfig(String outputSerde) {
private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception {
InstanceConfig config = createInstanceConfig(outputSerde);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
config, null, null, null, null, null, null);
config, null, null, null, null, null, null, null);
return javaInstanceRunnable;
}

Expand Down
Loading

0 comments on commit a28da84

Please sign in to comment.