-
Notifications
You must be signed in to change notification settings - Fork 985
DRILL-4726: Dynamic UDFs support #574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0baceaf
fe5aebe
6d74fe6
25b74e1
9e9586c
f9362d3
eb67ff8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,9 @@ | |
| import java.net.URL; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
|
|
||
| import com.google.common.collect.Lists; | ||
| import org.apache.drill.common.config.DrillConfig; | ||
| import org.apache.drill.common.scanner.persistence.ScanResult; | ||
|
|
||
|
|
@@ -75,4 +77,23 @@ public static ScanResult fromPrescan(DrillConfig config) { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Scans packages retrieved from config. | ||
| * Returns scan result with list of packages, classes and annotations found. | ||
| * Is used to scan specific jars not associated with classpath at runtime. | ||
| * | ||
| * @param config to retrieve the packages to scan | ||
| * @param markedPath list of paths where to scan | ||
| * @return the scan result with list of packages, classes and annotations found | ||
| */ | ||
| public static ScanResult dynamicPackageScan(DrillConfig config, Set<URL> markedPath) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain the purposes of this a bit more? What is a "marked path"? What is a "prescanned annotation?" At a higher level, what service does this provide? Allows me to locate (what?) given (what?)...
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. |
||
| List<String> packagePrefixes = ClassPathScanner.getPackagePrefixes(config); | ||
| return ClassPathScanner.scan( | ||
| markedPath, | ||
| packagePrefixes, | ||
| Lists.<String>newArrayList(), | ||
| PRESCANNED.getScannedAnnotations(), | ||
| ClassPathScanner.emptyResult()); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -324,6 +324,22 @@ if [ -n "$DRILL_CLASSPATH" ]; then | |
| CP="$CP:$DRILL_CLASSPATH" | ||
| fi | ||
|
|
||
| # Drill temporary directory is used as base for temporary storage of Dynamic UDF jars. | ||
| # If tmp dir is given, it must exist. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please explain that DRILL_TMP_DIR is used for temporary storage of Dynamic UDF jars. (This comment helps folks understand that this is not the same as the tmp dir used for spill-to-disk...)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| if [ -n "$DRILL_TMP_DIR" ]; then | ||
| if [[ ! -d "$DRILL_TMP_DIR" ]]; then | ||
| fatal_error "Temporary dir does not exist:" $DRILL_TMP_DIR | ||
| fi | ||
| else | ||
| # Otherwise, use the default | ||
| DRILL_TMP_DIR="/tmp" | ||
| fi | ||
|
|
||
| mkdir -p "$DRILL_TMP_DIR" | ||
| if [[ ! -d "$DRILL_TMP_DIR" || ! -w "$DRILL_TMP_DIR" ]]; then | ||
| fatal_error "Temporary directory does not exist or is not writable: $DRILL_TMP_DIR" | ||
| fi | ||
|
|
||
| # Test for cygwin | ||
| is_cygwin=false | ||
| case "`uname`" in | ||
|
|
@@ -371,6 +387,7 @@ if $is_cygwin; then | |
| DRILL_HOME=`cygpath -w "$DRILL_HOME"` | ||
| DRILL_CONF_DIR=`cygpath -w "$DRILL_CONF_DIR"` | ||
| DRILL_LOG_DIR=`cygpath -w "$DRILL_LOG_DIR"` | ||
| DRILL_TMP_DIR=`cygpath -w "$DRILL_TMP_DIR"` | ||
| CP=`cygpath -w -p "$CP"` | ||
| if [ -z "$HADOOP_HOME" ]; then | ||
| export HADOOP_HOME=${DRILL_HOME}/winutils | ||
|
|
@@ -391,6 +408,7 @@ export is_cygwin | |
| export DRILL_HOME | ||
| export DRILL_CONF_DIR | ||
| export DRILL_LOG_DIR | ||
| export DRILL_TMP_DIR | ||
| export CP | ||
| export JAVA_HOME | ||
| export JAVA | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,6 +114,11 @@ if "test%DRILL_LOG_DIR%" == "test" ( | |
| set DRILL_LOG_DIR=%DRILL_HOME%\log | ||
| ) | ||
|
|
||
| @rem Drill temporary directory is used as base for temporary storage of Dynamic UDF jars. | ||
| if "test%DRILL_TMP_DIR%" == "test" ( | ||
| set DRILL_TMP_DIR=%TEMP% | ||
| ) | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. %DRILL_CONF_DIR%\udf |
||
| rem ---- | ||
| rem Deal with Hadoop JARs, if HADOOP_HOME was specified | ||
| rem ---- | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,8 @@ | |
| "REFRESH", | ||
| "METADATA", | ||
| "DATABASE", | ||
| "IF" | ||
| "IF", | ||
| "JAR" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these be alphabetical?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really, previous keywords were added in order of appearance. |
||
| ] | ||
|
|
||
| # List of methods for parsing custom SQL statements. | ||
|
|
@@ -53,7 +54,9 @@ | |
| "SqlShowFiles()", | ||
| "SqlCreateTable()", | ||
| "SqlDropTable()", | ||
| "SqlRefreshMetadata()" | ||
| "SqlRefreshMetadata()", | ||
| "SqlCreateFunction()", | ||
| "SqlDropFunction()" | ||
| ] | ||
|
|
||
| # List of methods for parsing custom literals. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -297,4 +297,44 @@ SqlNode SqlDescribeSchema() : | |
| { | ||
| return new SqlDescribeSchema(pos, schema); | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks fine conceptually, but have someone double-check this who is more familiar with the details of the parser implementation. |
||
|
|
||
| /** | ||
| * Parse create UDF statement | ||
| * CREATE FUNCTION USING JAR 'jar_name' | ||
| */ | ||
| SqlNode SqlCreateFunction() : | ||
| { | ||
| SqlParserPos pos; | ||
| SqlNode jar; | ||
| } | ||
| { | ||
| <CREATE> { pos = getPos(); } | ||
| <FUNCTION> | ||
| <USING> | ||
| <JAR> | ||
| jar = StringLiteral() | ||
| { | ||
| return new SqlCreateFunction(pos, jar); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Parse drop UDF statement | ||
| * DROP FUNCTION USING JAR 'jar_name' | ||
| */ | ||
| SqlNode SqlDropFunction() : | ||
| { | ||
| SqlParserPos pos; | ||
| SqlNode jar; | ||
| } | ||
| { | ||
| <DROP> { pos = getPos(); } | ||
| <FUNCTION> | ||
| <USING> | ||
| <JAR> | ||
| jar = StringLiteral() | ||
| { | ||
| return new SqlDropFunction(pos, jar); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,6 @@ | |
| import org.apache.drill.exec.physical.impl.common.HashTable; | ||
| import org.apache.drill.exec.rpc.user.InboundImpersonationManager; | ||
| import org.apache.drill.exec.server.options.OptionValidator; | ||
| import org.apache.drill.exec.server.options.TypeValidators.AdminOptionValidator; | ||
| import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator; | ||
| import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator; | ||
| import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValidator; | ||
|
|
@@ -106,10 +105,23 @@ public interface ExecConstants { | |
| String RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS = | ||
| "drill.exec.debug.return_error_for_failure_in_cancelled_fragments"; | ||
|
|
||
| String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types"; | ||
|
|
||
| /** | ||
| * Configuration properties connected with dynamic UDFs support | ||
| */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these only for dynamic UDFs? Do we have settings for (non-dynamic) UDFs? If so, should we have drill.exec.udf
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't have properties for non-dynamic UDFs. |
||
| String UDF_RETRY_ATTEMPTS = "drill.exec.udf.retry-attempts"; | ||
| String UDF_DIRECTORY_FS = "drill.exec.udf.directory.fs"; | ||
| String UDF_DIRECTORY_ROOT = "drill.exec.udf.directory.root"; | ||
| String UDF_DIRECTORY_LOCAL = "drill.exec.udf.directory.local"; | ||
| String UDF_DIRECTORY_STAGING = "drill.exec.udf.directory.staging"; | ||
| String UDF_DIRECTORY_REGISTRY = "drill.exec.udf.directory.registry"; | ||
| String UDF_DIRECTORY_TMP = "drill.exec.udf.directory.tmp"; | ||
|
|
||
|
|
||
| String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types"; | ||
| /** | ||
| * Local temporary directory is used as base for temporary storage of Dynamic UDF jars. | ||
| */ | ||
| String DRILL_TMP_DIR = "drill.tmp-dir"; | ||
|
|
||
| String OUTPUT_FORMAT_OPTION = "store.format"; | ||
| OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet"); | ||
|
|
@@ -296,15 +308,13 @@ public interface ExecConstants { | |
| * such as changing system options. | ||
| */ | ||
| String ADMIN_USERS_KEY = "security.admin.users"; | ||
| StringValidator ADMIN_USERS_VALIDATOR = | ||
| new AdminOptionValidator(ADMIN_USERS_KEY, ImpersonationUtil.getProcessUserName()); | ||
| StringValidator ADMIN_USERS_VALIDATOR = new StringValidator(ADMIN_USERS_KEY, ImpersonationUtil.getProcessUserName(), true); | ||
|
|
||
| /** | ||
| * Option whose value is a comma separated list of admin usergroups. | ||
| */ | ||
| String ADMIN_USER_GROUPS_KEY = "security.admin.user_groups"; | ||
| StringValidator ADMIN_USER_GROUPS_VALIDATOR = new AdminOptionValidator(ADMIN_USER_GROUPS_KEY, ""); | ||
|
|
||
| StringValidator ADMIN_USER_GROUPS_VALIDATOR = new StringValidator(ADMIN_USER_GROUPS_KEY, "", true); | ||
| /** | ||
| * Option whose value is a string representing list of inbound impersonation policies. | ||
| * | ||
|
|
@@ -337,4 +347,7 @@ public interface ExecConstants { | |
| String CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS = "prepare.statement.create_timeout_ms"; | ||
| OptionValidator CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR = | ||
| new PositiveLongValidator(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS, Integer.MAX_VALUE, 10000); | ||
|
|
||
| String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support"; | ||
| BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. exec.udf.dynamic.enable ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are separated since exec.udf.dynamic.enable is an option, and other 6 are configuration properties, they are stored in different places in ExecContants.java. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,8 +31,12 @@ | |
| import org.apache.curator.framework.recipes.cache.PathChildrenCache; | ||
| import org.apache.drill.common.collections.ImmutableEntry; | ||
| import org.apache.drill.common.exceptions.DrillRuntimeException; | ||
| import org.apache.drill.exec.exception.VersionMismatchException; | ||
| import org.apache.drill.exec.store.sys.store.DataChangeVersion; | ||
| import org.apache.zookeeper.CreateMode; | ||
| import org.apache.zookeeper.KeeperException; | ||
| import org.apache.zookeeper.KeeperException.NodeExistsException; | ||
| import org.apache.zookeeper.data.Stat; | ||
|
|
||
| /** | ||
| * A namespace aware Zookeeper client. | ||
|
|
@@ -133,13 +137,52 @@ public byte[] get(final String path) { | |
| * the check is eventually consistent. | ||
| * | ||
| * @param path target path | ||
| * @param consistent consistency flag | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps explan this a bit more. The comment line above says "the check is eventually consistent." How does the consistent flag change the semantics of this check?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Diff didn't show the whole comment which is: |
||
| */ | ||
| public byte[] get(final String path, final boolean consistent) { | ||
| return get(path, consistent, null); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the value corresponding to the given key, null otherwise. | ||
| * | ||
| * The check is consistent as it is made against Zookeeper directly. | ||
| * | ||
| * Passes version holder to get data change version. | ||
| * | ||
| * @param path target path | ||
| * @param version version holder | ||
| */ | ||
| public byte[] get(final String path, DataChangeVersion version) { | ||
| return get(path, true, version); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the value corresponding to the given key, null otherwise. | ||
| * | ||
| * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise, | ||
| * the check is eventually consistent. | ||
| * | ||
| * If consistency flag is set to true and version holder is not null, passes version holder to get data change version. | ||
| * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed. | ||
| * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes | ||
| * | ||
| * @param path target path | ||
| * @param consistent consistency check | ||
| * @param version version holder | ||
| */ | ||
| public byte[] get(final String path, final boolean consistent, final DataChangeVersion version) { | ||
| Preconditions.checkNotNull(path, "path is required"); | ||
|
|
||
| final String target = PathUtils.join(root, path); | ||
| if (consistent) { | ||
| try { | ||
| if (version != null) { | ||
| Stat stat = new Stat(); | ||
| final byte[] bytes = curator.getData().storingStatIn(stat).forPath(target); | ||
| version.setVersion(stat.getVersion()); | ||
| return bytes; | ||
| } | ||
| return curator.getData().forPath(target); | ||
| } catch (final Exception ex) { | ||
| throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex); | ||
|
|
@@ -179,6 +222,26 @@ public void create(final String path) { | |
| * @param data data to store | ||
| */ | ||
| public void put(final String path, final byte[] data) { | ||
| put(path, data, null); | ||
| } | ||
|
|
||
| /** | ||
| * Puts the given byte sequence into the given path. | ||
| * | ||
| * If path does not exists, this call creates it. | ||
| * | ||
| * If version holder is not null and path already exists, passes given version for comparison. | ||
| * Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed. | ||
| * If we pass version that doesn't match the actual version of the data, | ||
| * the update will fail {@link org.apache.zookeeper.KeeperException.BadVersionException}. | ||
| * We catch such exception and re-throw it as {@link VersionMismatchException}. | ||
| * Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes | ||
| * | ||
| * @param path target path | ||
| * @param data data to store | ||
| * @param version version holder | ||
| */ | ||
| public void put(final String path, final byte[] data, DataChangeVersion version) { | ||
| Preconditions.checkNotNull(path, "path is required"); | ||
| Preconditions.checkNotNull(data, "data is required"); | ||
|
|
||
|
|
@@ -199,9 +262,45 @@ public void put(final String path, final byte[] data) { | |
| } | ||
| } | ||
| if (hasNode) { | ||
| curator.setData().forPath(target, data); | ||
| if (version != null) { | ||
| try { | ||
| curator.setData().withVersion(version.getVersion()).forPath(target, data); | ||
| } catch (final KeeperException.BadVersionException e) { | ||
| throw new VersionMismatchException("Unable to put data. Version mismatch is detected.", version.getVersion(), e); | ||
| } | ||
| } else { | ||
| curator.setData().forPath(target, data); | ||
| } | ||
| } | ||
| getCache().rebuildNode(target); | ||
| } catch (final VersionMismatchException e) { | ||
| throw e; | ||
| } catch (final Exception e) { | ||
| throw new DrillRuntimeException("unable to put ", e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Puts the given byte sequence into the given path if path is does not exist. | ||
| * | ||
| * @param path target path | ||
| * @param data data to store | ||
| * @return null if path was created, else data stored for the given path | ||
| */ | ||
| public byte[] putIfAbsent(final String path, final byte[] data) { | ||
| Preconditions.checkNotNull(path, "path is required"); | ||
| Preconditions.checkNotNull(data, "data is required"); | ||
|
|
||
| final String target = PathUtils.join(root, path); | ||
| try { | ||
| try { | ||
| curator.create().withMode(mode).forPath(target, data); | ||
| getCache().rebuildNode(target); | ||
| return null; | ||
| } catch (NodeExistsException e) { | ||
| // do nothing | ||
| } | ||
| return curator.getData().forPath(target); | ||
| } catch (final Exception e) { | ||
| throw new DrillRuntimeException("unable to put ", e); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for? Is the caller responsible for loading the defaults? If this is primarily for unit testing, then the comment should say so and describe how to use this. (The config, for example, must provide all required properties; nothing will be picked up from defaults.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We pass Config object which contains URL to drill-module.conf in our jar. drill-module.conf contains packages that allowed to be scanned. Based on this URL, drill config is created. During creation drill finds all packages listed in drill-module.conf and adds them in drill config.
Example of created drill config:
{
# jar:file:/home/osboxes/git_repo/drill/exec/java-exec/target/1473346938237-0/DrillUDF-1.0.jar!/drill-module.conf: 1
"drill" : {
# jar:file:/home/osboxes/git_repo/drill/exec/java-exec/target/1473346938237-0/DrillUDF-1.0.jar!/drill-module.conf: 1
"classpath" : {
# jar:file:/home/osboxes/git_repo/drill/exec/java-exec/target/1473346938237-0/DrillUDF-1.0.jar!/drill-module.conf: 1
"scanning" : {
# jar:file:/home/osboxes/git_repo/drill/exec/java-exec/target/1473346938237-0/DrillUDF-1.0.jar!/drill-module.conf: 1
"packages" : [
# jar:file:/home/osboxes/git_repo/drill/exec/java-exec/target/1473346938237-0/DrillUDF-1.0.jar!/drill-module.conf: 1
"com.drill.udf"
]
}
}
}
}