From 51aae5bcf6dd7ec1aaece74ec69744817e923011 Mon Sep 17 00:00:00 2001 From: Bence Simon Date: Wed, 5 May 2021 23:34:13 +0200 Subject: [PATCH] NIFI-8519 Adding HDFS support for NAR autoload - Refining classloader management with the help of @markap14 This closes #5059 Signed-off-by: Mark Payne --- .../java/org/apache/nifi/nar/NarProvider.java | 44 ++++ .../nar/NarProviderInitializationContext.java | 30 +++ .../org/apache/nifi/util/NiFiProperties.java | 33 +++ .../apache/nifi/util/NiFiPropertiesTest.java | 96 +++++++ .../main/asciidoc/administration-guide.adoc | 39 +++ .../hadoop/AbstractHadoopProcessor.java | 87 ------ .../hadoop/ExtendedConfiguration.java | 84 ++++++ .../nifi/processors/hadoop/HdfsResources.java | 52 ++++ .../org/apache/nifi/nar/NarAutoLoader.java | 58 +++- .../apache/nifi/nar/NarAutoLoaderTask.java | 3 +- .../org/apache/nifi/nar/NarProviderTask.java | 109 ++++++++ ...BasedNarProviderInitializationContext.java | 62 +++++ ...BasedNarProviderInitializationContext.java | 102 +++++++ .../org/apache/nifi/nar/ExtensionManager.java | 7 + .../nifi/nar/NarThreadContextClassLoader.java | 81 +++++- .../StandardExtensionDiscoveringManager.java | 6 + .../apache/nifi/web/server/JettyServer.java | 2 +- .../nifi/nar/hadoop/HDFSNarProvider.java | 248 ++++++++++++++++++ .../nifi/nar/hadoop/util/ExtensionFilter.java | 37 +++ .../services/org.apache.nifi.nar.NarProvider | 15 ++ .../nar/hadoop/util/ExtensionFilterTest.java | 103 ++++++++ .../hadoop/GetHDFSSequenceFileTest.java | 6 +- 22 files changed, 1193 insertions(+), 111 deletions(-) create mode 100644 nifi-api/src/main/java/org/apache/nifi/nar/NarProvider.java create mode 100644 nifi-api/src/main/java/org/apache/nifi/nar/NarProviderInitializationContext.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/ExtendedConfiguration.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HdfsResources.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarProviderTask.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/PropertyBasedNarProviderInitializationContext.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/test/java/org/apache/nifi/nar/TestPropertyBasedNarProviderInitializationContext.java create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/HDFSNarProvider.java create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/util/ExtensionFilter.java create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.nar.NarProvider create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/nar/hadoop/util/ExtensionFilterTest.java diff --git a/nifi-api/src/main/java/org/apache/nifi/nar/NarProvider.java b/nifi-api/src/main/java/org/apache/nifi/nar/NarProvider.java new file mode 100644 index 000000000000..63fad957cec1 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/nar/NarProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; + +/** + * Represents an external source where the NAR files might be acquired from. Used by the NAR auto loader functionality + * in order to poll an external source for new NAR files to load. + */ +public interface NarProvider { + /** + * Initializes the NAR Provider based on the given set of properties. + */ + void initialize(NarProviderInitializationContext context); + + /** + * Performs a listing of all NAR's that are available. + * + * @Return The result is a list of locations, where the format depends on the actual implementation. + */ + Collection listNars() throws IOException; + + /** + * Fetches the NAR at the given location. The location should be one of the values returned by listNars(). + */ + InputStream fetchNarContents(String location) throws IOException; +} diff --git a/nifi-api/src/main/java/org/apache/nifi/nar/NarProviderInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/nar/NarProviderInitializationContext.java new file mode 100644 index 000000000000..26d6b7f4600f --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/nar/NarProviderInitializationContext.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import java.util.Map; + +/** + * Contains necessary information for extensions of NAR auto loader functionality. + */ +public interface NarProviderInitializationContext { + + /** + * @return Returns with the available properties. + */ + Map getProperties(); +} diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 7370fa6969fd..371cc5866919 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -1914,6 +1914,39 @@ public Path getQuestDbStatusRepositoryPath() { return Paths.get(getProperty(STATUS_REPOSITORY_QUESTDB_PERSIST_LOCATION, DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_LOCATION)); } + /** + * Returns all properties where the property key starts with the prefix. + * + * @param prefix The exact string the returned properties should start with. Dots are considered, thus prefix "item" will return both + * properties starting with "item." and "items". Properties with empty value will be included as well. + * + * @return A map of properties starting with the prefix. + */ + public Map getPropertiesWithPrefix(final String prefix) { + return getPropertyKeys().stream().filter(key -> key.startsWith(prefix)).collect(Collectors.toMap(key -> key, key -> getProperty(key))); + } + + /** + * Returns with all the possible next "tokens" after the given prefix. An alphanumeric string between dots is considered as a "token". + * + * For example if there are "parent.sub1" and a "parent.sub2" properties are set, and the prefix is "parent", the method will return + * with a set, consisting of "sub1" and "sub2. Only directly subsequent tokens are considered, so in case of "parent.sub1.subsub1", the + * result will contain "sub1" as well. + * + * @param prefix The prefix of the request. + * + * @return A set of direct subsequent tokens. + */ + public Set getDirectSubsequentTokens(final String prefix) { + final String fixedPrefix = prefix.endsWith(".") ? prefix : prefix + "."; + + return getPropertyKeys().stream() + .filter(key -> key.startsWith(fixedPrefix)) + .map(key -> key.substring(fixedPrefix.length())) + .map(key -> key.indexOf('.') == -1 ? key : key.substring(0, key.indexOf('.'))) + .collect(Collectors.toSet()); + } + /** * Creates an instance of NiFiProperties. This should likely not be called * by any classes outside of the NiFi framework but can be useful by the diff --git a/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java b/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java index f78c2de9a263..08a87bf244e9 100644 --- a/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java +++ b/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java @@ -374,4 +374,100 @@ public void testTlsConfigurationIsNotPresentWithNoProperties() { assertFalse(properties.isTlsConfigurationPresent()); } + + @Test + public void testGetPropertiesWithPrefixWithoutDot() { + // given + final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null); + + // when + final Map result = testSubject.getPropertiesWithPrefix("nifi.web.http"); + + // then + Assert.assertEquals(4, result.size()); + Assert.assertTrue(result.containsKey("nifi.web.http.host")); + Assert.assertTrue(result.containsKey("nifi.web.https.host")); + } + + @Test + public void testGetPropertiesWithPrefixWithDot() { + // given + final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null); + + // when + final Map result = testSubject.getPropertiesWithPrefix("nifi.web.http."); + + // then + Assert.assertEquals(2, result.size()); + Assert.assertTrue(result.containsKey("nifi.web.http.host")); + Assert.assertFalse(result.containsKey("nifi.web.https.host")); + } + + @Test + public void testGetPropertiesWithPrefixWhenNoResult() { + // given + final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null); + + // when + final Map result = testSubject.getPropertiesWithPrefix("invalid.property"); + + // then + Assert.assertTrue(result.isEmpty()); + } + + @Test + public void testGetDirectSubsequentTokensWithoutDot() { + // given + final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null); + + // when + final Set result = testSubject.getDirectSubsequentTokens("nifi.web.http"); + + // then + Assert.assertEquals(2, result.size()); + Assert.assertTrue(result.contains("host")); + Assert.assertTrue(result.contains("port")); + } + + @Test + public void testGetDirectSubsequentTokensWithDot() { + // given + final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null); + + // when + final Set result = testSubject.getDirectSubsequentTokens("nifi.web.http."); + + // then + Assert.assertEquals(2, result.size()); + Assert.assertTrue(result.contains("host")); + Assert.assertTrue(result.contains("port")); + } + + @Test + public void testGetDirectSubsequentTokensWithNonExistingToken() { + // given + final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null); + + // when + final Set result = testSubject.getDirectSubsequentTokens("lorem.ipsum"); + + // then + Assert.assertTrue(result.isEmpty()); + } + + @Test + public void testGetDirectSubsequentTokensWhenMoreTokensAfterward() { + // given + final NiFiProperties testSubject = loadNiFiProperties("/NiFiProperties/conf/nifi.properties", null); + + // when + final Set result = testSubject.getDirectSubsequentTokens("nifi.web"); + + // then + Assert.assertEquals(4, result.size()); + Assert.assertTrue(result.contains("http")); + Assert.assertTrue(result.contains("https")); + Assert.assertTrue(result.contains("war")); + Assert.assertTrue(result.contains("jetty")); + } } diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index e9731253289c..6b682828e1b7 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -4012,3 +4012,42 @@ Now, we must place our custom processor nar in the configured directory. The con Ensure that the file has appropriate permissions for the nifi user and group. Refresh the browser page and the custom processor should now be available when adding a new Processor to your flow. + +=== NAR Providers + +NiFi supports fetching NAR files for the autoloading feature from external sources. This can be achieved by using NAR Providers. A NAR Provider serves as a connector between an external data store +and NiFi. + +When configured, a NAR Provider polls the external source for available NAR files and offers them to the framework. The framework then fetches new NAR files and copies them to +the `nifi.nar.library.autoload.directory` for autoloading. + +NAR Provider can be configured by adding the `nifi.nar.library.provider..implementation` property with value containing the proper implementation class. Some implementations might need +further properties. These are defined by the implementation and must be prefixed with `nifi.nar.library.provider..`. + +The `` is arbitrary and serves to correlate multiple properties together for a single provider. Multiple providers might be set, with different ``. Currently NiFi supports HDFS based NAR provider. + +==== HDFS NAR Provider + +This implementation is capable of downloading NAR files from an HDFS file system. + +The value of the `nifi.nar.library.provider..implementation` must be `org.apache.nifi.nar.hadoop.HDFSNarProvider`. The following further properties are defined by the provider: + +[options="header"] +|=== +| Name | Description +| resources | List of HDFS resources, separated by comma. +| source.directory | The source directory of NAR files within HDFS. Note: the provider does not check for files recursively. +| kerberos.principal | Optional. Kerberos principal to authenticate as. +| kerberos.keytab | Optional. Kerberos keytab associated with the principal. +| kerberos.password | Optional. Kerberos password associated with the principal. +|=== + +Example configuration: + + nifi.nar.library.provider.hdfs1.implementation=org.apache.nifi.nar.hadoop.HDFSNarProvider + nifi.nar.library.provider.hdfs1.resources=/etc/hadoop/core-site.xml + nifi.nar.library.provider.hdfs1.source.directory=/customNars + + nifi.nar.library.provider.hdfs2.implementation=org.apache.nifi.nar.hadoop.HDFSNarProvider + nifi.nar.library.provider.hdfs2.resources=/etc/hadoop/core-site.xml + nifi.nar.library.provider.hdfs2.source.directory=/other/dir/for/customNars \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 378b4d18ce89..1cce99663fed 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -37,7 +37,6 @@ import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.kerberos.KerberosCredentialsService; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -51,7 +50,6 @@ import javax.security.auth.login.LoginException; import java.io.File; import java.io.IOException; -import java.lang.ref.WeakReference; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.Socket; @@ -62,8 +60,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; @@ -592,36 +588,6 @@ private boolean isFileSystemAccessDenied(final URI fileSystemUri) { return accessDenied; } - static protected class HdfsResources { - private final Configuration configuration; - private final FileSystem fileSystem; - private final UserGroupInformation userGroupInformation; - private final KerberosUser kerberosUser; - - public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation, KerberosUser kerberosUser) { - this.configuration = configuration; - this.fileSystem = fileSystem; - this.userGroupInformation = userGroupInformation; - this.kerberosUser = kerberosUser; - } - - public Configuration getConfiguration() { - return configuration; - } - - public FileSystem getFileSystem() { - return fileSystem; - } - - public UserGroupInformation getUserGroupInformation() { - return userGroupInformation; - } - - public KerberosUser getKerberosUser() { - return kerberosUser; - } - } - static protected class ValidationResources { private final ResourceReferences configResources; private final Configuration configuration; @@ -640,57 +606,4 @@ public Configuration getConfiguration() { } } - /** - * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be - * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load - * something that was previously not found, but might now be available. - * - * Reference the original getClassByNameOrNull from Configuration. - */ - static class ExtendedConfiguration extends Configuration { - - private final ComponentLog logger; - private final Map>>> CACHE_CLASSES = new WeakHashMap<>(); - - public ExtendedConfiguration(final ComponentLog logger) { - this.logger = logger; - } - - @Override - public Class getClassByNameOrNull(String name) { - final ClassLoader classLoader = getClassLoader(); - - Map>> map; - synchronized (CACHE_CLASSES) { - map = CACHE_CLASSES.get(classLoader); - if (map == null) { - map = Collections.synchronizedMap(new WeakHashMap<>()); - CACHE_CLASSES.put(classLoader, map); - } - } - - Class clazz = null; - WeakReference> ref = map.get(name); - if (ref != null) { - clazz = ref.get(); - } - - if (clazz == null) { - try { - clazz = Class.forName(name, true, classLoader); - } catch (ClassNotFoundException e) { - logger.error(e.getMessage(), e); - return null; - } - // two putters can race here, but they'll put the same class - map.put(name, new WeakReference<>(clazz)); - return clazz; - } else { - // cache hit - return clazz; - } - } - - } - } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/ExtendedConfiguration.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/ExtendedConfiguration.java new file mode 100644 index 000000000000..4ce869b9879b --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/ExtendedConfiguration.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.logging.ComponentLog; +import org.slf4j.Logger; + +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.function.BiConsumer; + +/** + * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be + * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load + * something that was previously not found, but might now be available. + * + * Reference the original getClassByNameOrNull from Configuration. + */ +public class ExtendedConfiguration extends Configuration { + + private final BiConsumer loggerMethod; + private final Map>>> CACHE_CLASSES = new WeakHashMap<>(); + + public ExtendedConfiguration(final Logger logger) { + this.loggerMethod = logger::error; + } + + public ExtendedConfiguration(final ComponentLog logger) { + this.loggerMethod = logger::error; + } + + @Override + public Class getClassByNameOrNull(String name) { + final ClassLoader classLoader = getClassLoader(); + + Map>> map; + synchronized (CACHE_CLASSES) { + map = CACHE_CLASSES.get(classLoader); + if (map == null) { + map = Collections.synchronizedMap(new WeakHashMap<>()); + CACHE_CLASSES.put(classLoader, map); + } + } + + Class clazz = null; + WeakReference> ref = map.get(name); + if (ref != null) { + clazz = ref.get(); + } + + if (clazz == null) { + try { + clazz = Class.forName(name, true, classLoader); + } catch (ClassNotFoundException e) { + loggerMethod.accept(e.getMessage(), e); + return null; + } + // two putters can race here, but they'll put the same class + map.put(name, new WeakReference<>(clazz)); + return clazz; + } else { + // cache hit + return clazz; + } + } + +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HdfsResources.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HdfsResources.java new file mode 100644 index 000000000000..775587bd3876 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HdfsResources.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.security.krb.KerberosUser; + +public class HdfsResources { + private final Configuration configuration; + private final FileSystem fileSystem; + private final UserGroupInformation userGroupInformation; + private final KerberosUser kerberosUser; + + public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation, KerberosUser kerberosUser) { + this.configuration = configuration; + this.fileSystem = fileSystem; + this.userGroupInformation = userGroupInformation; + this.kerberosUser = kerberosUser; + } + + public Configuration getConfiguration() { + return configuration; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public UserGroupInformation getUserGroupInformation() { + return userGroupInformation; + } + + public KerberosUser getKerberosUser() { + return kerberosUser; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarAutoLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarAutoLoader.java index 8f1ab336140c..8415abd68536 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarAutoLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarAutoLoader.java @@ -17,6 +17,7 @@ package org.apache.nifi.nar; import org.apache.nifi.util.FileUtils; +import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,37 +27,44 @@ import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchService; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; +import java.util.UUID; /** * Starts a thread to monitor the auto-load directory for new NARs. */ public class NarAutoLoader { - private static final Logger LOGGER = LoggerFactory.getLogger(NarAutoLoader.class); + private static final String NAR_PROVIDER_PREFIX = "nifi.nar.library.provider."; + private static final String IMPLEMENTATION_PROPERTY = "implementation"; private static final long POLL_INTERVAL_MS = 5000; - private final File autoLoadDir; + private final NiFiProperties properties; private final NarLoader narLoader; + private final ExtensionManager extensionManager; + private volatile Set narProviderTasks; private volatile NarAutoLoaderTask narAutoLoaderTask; private volatile boolean started = false; - public NarAutoLoader(final File autoLoadDir, final NarLoader narLoader) { - this.autoLoadDir = Objects.requireNonNull(autoLoadDir); + public NarAutoLoader(final NiFiProperties properties, final NarLoader narLoader, final ExtensionManager extensionManager) { + this.properties = Objects.requireNonNull(properties); this.narLoader = Objects.requireNonNull(narLoader); + this.extensionManager = Objects.requireNonNull(extensionManager); } - public synchronized void start() throws IOException { + public synchronized void start() throws IllegalAccessException, InstantiationException, ClassNotFoundException, IOException { if (started) { return; } + final File autoLoadDir = properties.getNarAutoLoadDirectory(); FileUtils.ensureDirectoryExistAndCanRead(autoLoadDir); final WatchService watcher = FileSystems.getDefault().newWatchService(); - final Path autoLoadPath = autoLoadDir.toPath(); autoLoadPath.register(watcher, StandardWatchEventKinds.ENTRY_CREATE); @@ -69,19 +77,43 @@ public synchronized void start() throws IOException { LOGGER.info("Starting NAR Auto-Loader for directory {} ...", new Object[]{autoLoadPath}); - final Thread thread = new Thread(narAutoLoaderTask); - thread.setName("NAR Auto-Loader"); - thread.setDaemon(true); - thread.start(); + final Thread autoLoaderThread = new Thread(narAutoLoaderTask); + autoLoaderThread.setName("NAR Auto-Loader"); + autoLoaderThread.setDaemon(true); + autoLoaderThread.start(); + + narProviderTasks = new HashSet<>(); + + for (final String externalSourceName : properties.getDirectSubsequentTokens(NAR_PROVIDER_PREFIX)) { + LOGGER.info("NAR Provider {} found in configuration", externalSourceName); + + final NarProviderInitializationContext context = new PropertyBasedNarProviderInitializationContext(properties, externalSourceName); + final String implementationClass = properties.getProperty(NAR_PROVIDER_PREFIX + externalSourceName + "." + IMPLEMENTATION_PROPERTY); + final String providerId = UUID.randomUUID().toString(); + final NarProvider provider = NarThreadContextClassLoader.createInstance(extensionManager, implementationClass, NarProvider.class, properties, providerId); + provider.initialize(context); - LOGGER.info("NAR Auto-Loader started"); - started = true; + final ClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(providerId); + final ClassLoader providerClassLoader = instanceClassLoader == null ? provider.getClass().getClassLoader() : instanceClassLoader; + final NarProviderTask task = new NarProviderTask(provider, providerClassLoader, properties.getNarAutoLoadDirectory(), POLL_INTERVAL_MS); + narProviderTasks.add(task); + + final Thread providerThread = new Thread(task); + providerThread.setName("NAR Provider Task - " + externalSourceName); + providerThread.setDaemon(true); + providerThread.setContextClassLoader(provider.getClass().getClassLoader()); + providerThread.start(); + } } public synchronized void stop() { started = false; narAutoLoaderTask.stop(); + narAutoLoaderTask = null; + + narProviderTasks.forEach(NarProviderTask::stop); + narProviderTasks = null; + LOGGER.info("NAR Auto-Loader stopped"); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarAutoLoaderTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarAutoLoaderTask.java index a12b7a16d2f9..1a6ade61252a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarAutoLoaderTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarAutoLoaderTask.java @@ -172,7 +172,6 @@ public Builder narLoader(final NarLoader narLoader) { public NarAutoLoaderTask build() { return new NarAutoLoaderTask(this); } - } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarProviderTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarProviderTask.java new file mode 100644 index 000000000000..44996859a92c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/NarProviderTask.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +final class NarProviderTask implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(NarProviderTask.class); + private static final String NAR_EXTENSION = "nar"; + + // A unique id is necessary for temporary files not to collide with temporary files from other instances. + private final String id = UUID.randomUUID().toString(); + + private final NarProvider narProvider; + private final ClassLoader narProviderClassLoader; + private final long pollTimeInMs; + private final File extensionDirectory; + + private volatile boolean stopped = false; + + NarProviderTask(final NarProvider narProvider, final ClassLoader narProviderClassLoader, final File extensionDirectory, final long pollTimeInMs) { + this.narProvider = narProvider; + this.narProviderClassLoader = narProviderClassLoader; + this.pollTimeInMs = pollTimeInMs; + this.extensionDirectory = extensionDirectory; + } + + @Override + public void run() { + LOGGER.info("Nar provider task is started"); + + while (!stopped) { + try { + LOGGER.debug("Task starts fetching NARs from provider"); + final Set loadedNars = getLoadedNars(); + final Collection availableNars; + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(narProviderClassLoader)) { + availableNars = narProvider.listNars(); + } + + for (final String availableNar : availableNars) { + if (!loadedNars.contains(availableNar)) { + final long startedAt = System.currentTimeMillis(); + final InputStream inputStream; + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(narProviderClassLoader)) { + inputStream = narProvider.fetchNarContents(availableNar); + } + + final File tempFile = new File(extensionDirectory, ".tmp_" + id + ".nar"); + final File targetFile = new File(extensionDirectory, availableNar); + Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + tempFile.renameTo(targetFile); + + LOGGER.info("Downloaded NAR {} in {} ms", availableNar, (System.currentTimeMillis() - startedAt)); + } + } + + LOGGER.debug("Task finished fetching NARs from provider"); + } catch (final Throwable e) { + LOGGER.error("Error during reaching the external source", e); + } + + try { + Thread.sleep(pollTimeInMs); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("NAR autoloader external source task is interrupted"); + stopped = true; + } + } + } + + private Set getLoadedNars() { + return Arrays.stream(extensionDirectory.listFiles(file -> file.isFile() && file.getName().toLowerCase().endsWith("." + NAR_EXTENSION))) + .map(file -> file.getName()) + .collect(Collectors.toSet()); + } + + void stop() { + LOGGER.info("Nar provider task is stopped"); + stopped = true; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/PropertyBasedNarProviderInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/PropertyBasedNarProviderInitializationContext.java new file mode 100644 index 000000000000..528b079b846a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/main/java/org/apache/nifi/nar/PropertyBasedNarProviderInitializationContext.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import org.apache.nifi.util.NiFiProperties; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A facade at front of {@code NiFiProperties} for auto loader extensions. Also limits the scope of the reachable properties. + */ +public class PropertyBasedNarProviderInitializationContext implements NarProviderInitializationContext { + private static Set GUARDED_PROPERTIES = new HashSet<>(Arrays.asList("implementation")); + static final String BASIC_PREFIX = "nifi.nar.library.provider."; + + private final Map properties; + private final String name; + + public PropertyBasedNarProviderInitializationContext(final NiFiProperties properties, final String name) { + this.properties = extractProperties(properties, name); + this.name = name; + } + + @Override + public Map getProperties() { + return properties; + } + + public Map extractProperties(final NiFiProperties properties, final String name) { + final String prefix = BASIC_PREFIX + name + "."; + final Map candidates = properties.getPropertiesWithPrefix(prefix); + final Map result = new HashMap<>(); + + for (final Map.Entry entry : candidates.entrySet()) { + final String parameterKey = entry.getKey().substring(prefix.length()); + + if (!parameterKey.isEmpty() && !GUARDED_PROPERTIES.contains(parameterKey)) { + result.put(parameterKey, entry.getValue()); + } + } + + return result; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/test/java/org/apache/nifi/nar/TestPropertyBasedNarProviderInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/test/java/org/apache/nifi/nar/TestPropertyBasedNarProviderInitializationContext.java new file mode 100644 index 000000000000..d95d6995eb6d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-loading-utils/src/test/java/org/apache/nifi/nar/TestPropertyBasedNarProviderInitializationContext.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import org.apache.nifi.util.NiFiProperties; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashMap; +import java.util.Map; + +@RunWith(MockitoJUnitRunner.class) +public class TestPropertyBasedNarProviderInitializationContext { + private static final String PROVIDER_NAME = "external"; + + private static final String PREFIX = PropertyBasedNarProviderInitializationContext.BASIC_PREFIX + PROVIDER_NAME + "."; + + @Mock + NiFiProperties properties; + + @Test + public void testEmptyProperties() { + // when + final PropertyBasedNarProviderInitializationContext testSubject = new PropertyBasedNarProviderInitializationContext(properties, PROVIDER_NAME); + final Map result = testSubject.getProperties(); + + // then + Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX); + Assert.assertTrue(result.isEmpty()); + } + + @Test + public void testGuardedPropertiesAreNotReturned() { + // given + final Map availableProperties = new HashMap<>(); + availableProperties.put(PREFIX + "implementation", "value"); + Mockito.when(properties.getPropertiesWithPrefix(PREFIX)).thenReturn(availableProperties); + + // when + final PropertyBasedNarProviderInitializationContext testSubject = new PropertyBasedNarProviderInitializationContext(properties, PROVIDER_NAME); + final Map result = testSubject.getProperties(); + + // then + Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX); + Assert.assertTrue(result.isEmpty()); + } + + @Test + public void testPropertiesWouldHaveEmptyKeyAreNotReturned() { + // given + final Map availableProperties = new HashMap<>(); + availableProperties.put(PREFIX, "value"); + Mockito.when(properties.getPropertiesWithPrefix(PREFIX)).thenReturn(availableProperties); + + // when + final PropertyBasedNarProviderInitializationContext testSubject = new PropertyBasedNarProviderInitializationContext(properties, PROVIDER_NAME); + final Map result = testSubject.getProperties(); + + // then + Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX); + Assert.assertTrue(result.isEmpty()); + } + + @Test + public void testPrefixIsRemoved() { + // given + final Map availableProperties = new HashMap<>(); + availableProperties.put(PREFIX + "key1", "value1"); + availableProperties.put(PREFIX + "key2", "value2"); + Mockito.when(properties.getPropertiesWithPrefix(PREFIX)).thenReturn(availableProperties); + + // when + final PropertyBasedNarProviderInitializationContext testSubject = new PropertyBasedNarProviderInitializationContext(properties, PROVIDER_NAME); + final Map result = testSubject.getProperties(); + + // then + Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX); + Assert.assertEquals(2, result.size()); + Assert.assertTrue(result.containsKey("key1")); + Assert.assertTrue(result.containsKey("key2")); + Assert.assertEquals("value1", result.get("key1")); + Assert.assertEquals("value2", result.get("key2")); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java index 61c96c7b7cb0..f96aee709cde 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java @@ -62,6 +62,13 @@ public interface ExtensionManager { */ InstanceClassLoader removeInstanceClassLoader(String instanceIdentifier); + /** + * Registers the given instance class loader so that it can be later retrieved via {@link #getInstanceClassLoader(String)} + * @param instanceIdentifier the instance identifier + * @param instanceClassLoader the class loader + */ + void registerInstanceClassLoader(String instanceIdentifier, InstanceClassLoader instanceClassLoader); + /** * Closes the given ClassLoader if it is an instance of URLClassLoader. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java index 982fd0b9b9c3..949d207342d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.nar; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.authentication.LoginIdentityProvider; import org.apache.nifi.authorization.AccessPolicyProvider; import org.apache.nifi.authorization.Authorizer; @@ -37,6 +38,7 @@ import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.util.NiFiProperties; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Constructor; @@ -44,8 +46,13 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Enumeration; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; +import java.util.UUID; /** * THREAD SAFE @@ -175,6 +182,7 @@ Class[] getExecutionStack() { } } + /** * Constructs an instance of the given type using either default no args * constructor or a constructor which takes a NiFiProperties object @@ -190,7 +198,27 @@ Class[] getExecutionStack() { * @throws ClassNotFoundException if the class cannot be found */ public static T createInstance(final ExtensionManager extensionManager, final String implementationClassName, final Class typeDefinition, final NiFiProperties nifiProperties) - throws InstantiationException, IllegalAccessException, ClassNotFoundException { + throws InstantiationException, IllegalAccessException, ClassNotFoundException { + return createInstance(extensionManager, implementationClassName, typeDefinition, nifiProperties, UUID.randomUUID().toString()); + } + + /** + * Constructs an instance of the given type using either default no args + * constructor or a constructor which takes a NiFiProperties object + * (preferred). + * + * @param the type to create an instance for + * @param implementationClassName the implementation class name + * @param typeDefinition the type definition + * @param nifiProperties the NiFiProperties instance + * @param instanceId the UUID of the instance + * @return constructed instance + * @throws InstantiationException if there is an error instantiating the class + * @throws IllegalAccessException if there is an error accessing the type + * @throws ClassNotFoundException if the class cannot be found + */ + public static T createInstance(final ExtensionManager extensionManager, final String implementationClassName, final Class typeDefinition, final NiFiProperties nifiProperties, + final String instanceId) throws InstantiationException, IllegalAccessException, ClassNotFoundException { final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { final List bundles = extensionManager.getBundles(implementationClassName); @@ -202,11 +230,11 @@ public static T createInstance(final ExtensionManager extensionManager, fina } final Bundle bundle = bundles.get(0); - final ClassLoader detectedClassLoaderForType = bundle.getClassLoader(); - final Class rawClass = Class.forName(implementationClassName, true, detectedClassLoaderForType); + final ClassLoader instanceClassLoader = createClassLoader(implementationClassName, instanceId, bundle, extensionManager); + final Class instanceClass = Class.forName(implementationClassName, true, instanceClassLoader); - Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); - final Class desiredClass = rawClass.asSubclass(typeDefinition); + Thread.currentThread().setContextClassLoader(instanceClassLoader); + final Class desiredClass = instanceClass.asSubclass(typeDefinition); if(nifiProperties == null){ return typeDefinition.cast(desiredClass.newInstance()); } @@ -235,4 +263,47 @@ public static T createInstance(final ExtensionManager extensionManager, fina Thread.currentThread().setContextClassLoader(originalClassLoader); } } + + private static ClassLoader createClassLoader(final String implementationClassName, final String instanceId, final Bundle bundle, final ExtensionManager extensionManager) + throws ClassNotFoundException { + final ClassLoader bundleClassLoader = bundle.getClassLoader(); + final Class rawClass = Class.forName(implementationClassName, true, bundleClassLoader); + + final RequiresInstanceClassLoading instanceClassLoadingAnnotation = rawClass.getAnnotation(RequiresInstanceClassLoading.class); + if (instanceClassLoadingAnnotation == null) { + return bundleClassLoader; + } + + final Set instanceUrls = new LinkedHashSet<>(); + final Set narNativeLibDirs = new LinkedHashSet<>(); + + final NarClassLoader narBundleClassLoader = (NarClassLoader) bundleClassLoader; + narNativeLibDirs.add(narBundleClassLoader.getNARNativeLibDir()); + instanceUrls.addAll(Arrays.asList(narBundleClassLoader.getURLs())); + + ClassLoader ancestorClassLoader = narBundleClassLoader.getParent(); + + if (instanceClassLoadingAnnotation.cloneAncestorResources()) { + while (ancestorClassLoader instanceof NarClassLoader) { + final Bundle ancestorNarBundle = extensionManager.getBundle(ancestorClassLoader); + + // stop including ancestor resources when we reach one of the APIs, or when we hit the Jetty NAR + if (ancestorNarBundle == null || ancestorNarBundle.getBundleDetails().getCoordinate().getId().equals(NarClassLoaders.JETTY_NAR_ID)) { + break; + } + + final NarClassLoader ancestorNarClassLoader = (NarClassLoader) ancestorClassLoader; + + narNativeLibDirs.add(ancestorNarClassLoader.getNARNativeLibDir()); + Collections.addAll(instanceUrls, ancestorNarClassLoader.getURLs()); + + ancestorClassLoader = ancestorNarClassLoader.getParent(); + } + } + + final InstanceClassLoader instanceClassLoader = new InstanceClassLoader(instanceId, implementationClassName, instanceUrls, + Collections.emptySet(), narNativeLibDirs, ancestorClassLoader); + extensionManager.registerInstanceClassLoader(instanceId, instanceClassLoader); + return instanceClassLoader; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java index 6a68bafb0ef5..563775a798b6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java @@ -101,6 +101,7 @@ public StandardExtensionDiscoveringManager() { definitionMap.put(ContentRepository.class, new HashSet<>()); definitionMap.put(StateProvider.class, new HashSet<>()); definitionMap.put(StatusAnalyticsModel.class, new HashSet<>()); + definitionMap.put(NarProvider.class, new HashSet<>()); } @Override @@ -462,6 +463,11 @@ public InstanceClassLoader removeInstanceClassLoader(final String instanceIdenti return classLoader; } + @Override + public void registerInstanceClassLoader(final String instanceIdentifier, final InstanceClassLoader instanceClassLoader) { + instanceClassloaderLookup.putIfAbsent(instanceIdentifier, instanceClassLoader); + } + @Override public void closeURLClassLoader(final String instanceIdentifier, final ClassLoader classLoader) { if ((classLoader instanceof URLClassLoader)) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java index efcf271d501a..5beb7b557a6d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java @@ -1225,7 +1225,7 @@ public void start() { extensionMapping, this); - narAutoLoader = new NarAutoLoader(props.getNarAutoLoadDirectory(), narLoader); + narAutoLoader = new NarAutoLoader(props, narLoader, extensionManager); narAutoLoader.start(); // dump the application url after confirming everything started successfully diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/HDFSNarProvider.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/HDFSNarProvider.java new file mode 100644 index 000000000000..6c852b707e3d --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/HDFSNarProvider.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar.hadoop; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.nar.NarProvider; +import org.apache.nifi.nar.NarProviderInitializationContext; +import org.apache.nifi.nar.hadoop.util.ExtensionFilter; +import org.apache.nifi.processors.hadoop.ExtendedConfiguration; +import org.apache.nifi.processors.hadoop.HdfsResources; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +@RequiresInstanceClassLoading(cloneAncestorResources = true) +public class HDFSNarProvider implements NarProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(HDFSNarProvider.class); + + private static final String RESOURCES_PARAMETER = "resources"; + private static final String SOURCE_DIRECTORY_PARAMETER = "source.directory"; + private static final String KERBEROS_PRINCIPAL_PARAMETER = "kerberos.principal"; + private static final String KERBEROS_KEYTAB_PARAMETER = "kerberos.keytab"; + private static final String KERBEROS_PASSWORD_PARAMETER = "kerberos.password"; + + private static final String NAR_EXTENSION = "nar"; + private static final String DELIMITER = "/"; + private static final int BUFFER_SIZE_DEFAULT = 4096; + private static final Object RESOURCES_LOCK = new Object(); + + private volatile List resources = null; + private volatile Path sourceDirectory = null; + + private volatile NarProviderInitializationContext context; + + private volatile boolean initialized = false; + + public void initialize(final NarProviderInitializationContext context) { + resources = Arrays.stream(Objects.requireNonNull( + context.getProperties().get(RESOURCES_PARAMETER)).split(",")).map(s -> s.trim()).filter(s -> !s.isEmpty()).collect(Collectors.toList()); + + if (resources.isEmpty()) { + throw new IllegalArgumentException("At least one HDFS configuration resource is necessary"); + } + + final String sourceDirectory = context.getProperties().get(SOURCE_DIRECTORY_PARAMETER); + + if (sourceDirectory == null || sourceDirectory.isEmpty()) { + throw new IllegalArgumentException("Provider needs the source directory to be set"); + } + + this.sourceDirectory = new Path(sourceDirectory); + + this.context = context; + this.initialized = true; + } + + @Override + public Collection listNars() throws IOException { + if (!initialized) { + LOGGER.error("Provider is not initialized"); + } + + final HdfsResources hdfsResources = getHdfsResources(); + + + try { + final FileStatus[] fileStatuses = hdfsResources.getUserGroupInformation() + .doAs((PrivilegedExceptionAction) () -> hdfsResources.getFileSystem().listStatus(sourceDirectory, new ExtensionFilter(NAR_EXTENSION))); + + final List result = Arrays.stream(fileStatuses) + .filter(fileStatus -> fileStatus.isFile()) + .map(fileStatus -> fileStatus.getPath().getName()) + .collect(Collectors.toList()); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("The following NARs were found: " + String.join(", ", result)); + } + + return result; + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Provider cannot list NARs", e); + } + } + + @Override + public InputStream fetchNarContents(final String location) throws IOException { + if (!initialized) { + LOGGER.error("Provider is not initialized"); + } + + + final Path path = getNarLocation(location); + final HdfsResources hdfsResources = getHdfsResources(); + + try { + if (hdfsResources.getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> !hdfsResources.getFileSystem().exists(path))) { + throw new IOException("Provider cannot find " + location); + } + + return hdfsResources.getUserGroupInformation() + .doAs((PrivilegedExceptionAction) () -> hdfsResources.getFileSystem().open(path, BUFFER_SIZE_DEFAULT)); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Error during acquiring file", e); + } + } + + private Path getNarLocation(final String location) { + String result = sourceDirectory.toString(); + + if (!result.endsWith(DELIMITER)) { + result += DELIMITER; + } + + return new Path(result + location); + } + + private HdfsResources getHdfsResources() throws IOException { + final Configuration config = new ExtendedConfiguration(LOGGER); + config.setClassLoader(Thread.currentThread().getContextClassLoader()); + + for (final String resource : resources) { + config.addResource(new Path(resource)); + } + + // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout + checkHdfsUriForTimeout(config); + + // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete restart + final String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); + config.set(disableCacheName, "true"); + + // If kerberos is enabled, create the file system as the kerberos principal + // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time + FileSystem fs; + UserGroupInformation ugi; + KerberosUser kerberosUser; + + synchronized (RESOURCES_LOCK) { + if (SecurityUtil.isSecurityEnabled(config)) { + final String principal = context.getProperties().get(KERBEROS_PRINCIPAL_PARAMETER); + final String keyTab = context.getProperties().get(KERBEROS_KEYTAB_PARAMETER); + final String password = context.getProperties().get(KERBEROS_PASSWORD_PARAMETER); + + if (keyTab != null) { + kerberosUser = new KerberosKeytabUser(principal, keyTab); + } else if (password != null) { + kerberosUser = new KerberosPasswordUser(principal, password); + } else { + throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided"); + } + + ugi = SecurityUtil.getUgiForKerberosUser(config, kerberosUser); + } else { + config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); + config.set("hadoop.security.authentication", "simple"); + ugi = SecurityUtil.loginSimple(config); + kerberosUser = null; + } + + fs = getFileSystemAsUser(config, ugi); + } + LOGGER.debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{ugi, kerberosUser}); + + final Path workingDir = fs.getWorkingDirectory(); + LOGGER.debug("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", + new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()}); + + if (!fs.exists(sourceDirectory)) { + throw new IllegalArgumentException("Source directory is not existing"); + } + + return new HdfsResources(config, fs, ugi, kerberosUser); + } + + private void checkHdfsUriForTimeout(final Configuration config) throws IOException { + final URI hdfsUri = FileSystem.getDefaultUri(config); + final String address = hdfsUri.getAuthority(); + final int port = hdfsUri.getPort(); + + if (address == null || address.isEmpty() || port < 0) { + return; + } + + final InetSocketAddress namenode = NetUtils.createSocketAddr(address, port); + final SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config); + Socket socket = null; + + try { + socket = socketFactory.createSocket(); + NetUtils.connect(socket, namenode, 1000); // 1 second timeout + } finally { + IOUtils.closeQuietly(socket); + } + } + + private FileSystem getFileSystemAsUser(final Configuration config, final UserGroupInformation ugi) throws IOException { + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return FileSystem.get(config); + } + }); + } catch (final InterruptedException e) { + throw new IOException("Unable to create file system: " + e.getMessage(), e); + } + } +} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/util/ExtensionFilter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/util/ExtensionFilter.java new file mode 100644 index 000000000000..0f47a12f6483 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/util/ExtensionFilter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar.hadoop.util; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * HDFS listing filter which selects files based on extension. + */ +public class ExtensionFilter implements PathFilter { + private final String extension; + + public ExtensionFilter(final String extension) { + this.extension = extension; + } + + @Override + public boolean accept(final Path path) { + final String fileName = path.getName().toLowerCase(); + return fileName.endsWith("." + extension); + } +} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.nar.NarProvider b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.nar.NarProvider new file mode 100644 index 000000000000..dcda6e075cfa --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.nar.NarProvider @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.nar.hadoop.HDFSNarProvider diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/nar/hadoop/util/ExtensionFilterTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/nar/hadoop/util/ExtensionFilterTest.java new file mode 100644 index 000000000000..914cc2b03824 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/nar/hadoop/util/ExtensionFilterTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar.hadoop.util; + +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +public class ExtensionFilterTest { + + @Test + public void testValid() { + // given + final ExtensionFilter testSubject = new ExtensionFilter("txt"); + final Path path = new Path("test.txt"); + + // when + final boolean result = testSubject.accept(path); + + // then + Assert.assertTrue(result); + } + + @Test + public void testValidWhenUppercase() { + // given + final ExtensionFilter testSubject = new ExtensionFilter("txt"); + final Path path = new Path("test.TXT"); + + // when + final boolean result = testSubject.accept(path); + + // then + Assert.assertTrue(result); + } + + @Test + public void testInvalidWhenDifferentExtension() { + // given + final ExtensionFilter testSubject = new ExtensionFilter("txt"); + final Path path = new Path("test.json"); + + // when + final boolean result = testSubject.accept(path); + + // then + Assert.assertFalse(result); + } + + @Test + public void testInvalidWhenMistypedExtension() { + // given + final ExtensionFilter testSubject = new ExtensionFilter("txt"); + final Path path = new Path("test.ttxt"); + + // when + final boolean result = testSubject.accept(path); + + // then + Assert.assertFalse(result); + } + + @Test + public void testInvalidWhenMultipleExtension() { + // given + final ExtensionFilter testSubject = new ExtensionFilter("txt"); + final Path path = new Path("test.txt.json"); + + // when + final boolean result = testSubject.accept(path); + + // then + Assert.assertFalse(result); + } + + @Test + public void testFolder() { + // given + final ExtensionFilter testSubject = new ExtensionFilter("ttxt"); + final Path path = new Path("testtxt"); + + // when + final boolean result = testSubject.accept(path); + + // then + Assert.assertFalse(result); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java index 459bc47fe4c9..ab49d638bf1e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java @@ -43,7 +43,7 @@ import static org.mockito.Mockito.when; public class GetHDFSSequenceFileTest { - private AbstractHadoopProcessor.HdfsResources hdfsResources; + private HdfsResources hdfsResources; private GetHDFSSequenceFile getHDFSSequenceFile; private Configuration configuration; private FileSystem fileSystem; @@ -55,7 +55,7 @@ public void setup() throws IOException { configuration = mock(Configuration.class); fileSystem = mock(FileSystem.class); userGroupInformation = mock(UserGroupInformation.class); - hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, userGroupInformation, null); + hdfsResources = new HdfsResources(configuration, fileSystem, userGroupInformation, null); getHDFSSequenceFile = new TestableGetHDFSSequenceFile(); getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class); reloginTried = false; @@ -86,7 +86,7 @@ public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws @Test public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception { - hdfsResources = new AbstractHadoopProcessor.HdfsResources(configuration, fileSystem, null, null); + hdfsResources = new HdfsResources(configuration, fileSystem, null, null); init(); SequenceFileReader reader = mock(SequenceFileReader.class); Path file = mock(Path.class);