From d41d2ec87cb5aaa73ef77f690f032a2167f312b3 Mon Sep 17 00:00:00 2001 From: Christophe Taton Date: Wed, 30 Jul 2014 10:02:56 -0700 Subject: [PATCH] EXP-454. Allow express.py to specify JAR files to exclude from the classpath - Remove the call to TmpJarsTool. - Adds new user-classpath entry precendence config key. - Cleanup how java lib paths are computed. issue: https://jira.kiji.org/browse/EXP-454 review: https://review.kiji.org/r/2080 --- .../src/main/scripts/express.py | 160 ++++++++++++++---- 1 file changed, 124 insertions(+), 36 deletions(-) diff --git a/kiji-express-tools/src/main/scripts/express.py b/kiji-express-tools/src/main/scripts/express.py index 96a19c17..1dcc024a 100755 --- a/kiji-express-tools/src/main/scripts/express.py +++ b/kiji-express-tools/src/main/scripts/express.py @@ -42,7 +42,6 @@ EXPRESS_TOOL = "org.kiji.express.flow.ExpressTool" -TMP_JARS_TOOL = "org.kiji.express.tool.TmpJarsTool" EXPRESS_HOME = "EXPRESS_HOME" HADOOP_HOME = "HADOOP_HOME" @@ -225,6 +224,9 @@ def classpath(self): return self._classpath +# -------------------------------------------------------------------------------------------------- + + class HadoopTool(HomedTool): _RE_HADOOP_VERSION = re.compile(r"^Hadoop (.*)$") @@ -257,6 +259,52 @@ def major_version(self): """Returns: the major version of this Hadoop installation (eg. 1 or 2).""" return self.version.split(".")[0] # Pick major version + def _acquire_platform_name(self): + cmd = [ + "java", + "-cp", + ":".join(self.classpath), + "-Xmx32m", + "org.apache.hadoop.util.PlatformName" + ] + output = subprocess.check_output(cmd, universal_newlines=True) + java_platform = output.strip() + logging.info("Using Hadoop platform: %r", java_platform) + return java_platform + + @property + def platform_name(self): + """Returns: the Hadoop platform name.""" + if not hasattr(self, "_platform_name"): + self._platform_name = self._acquire_platform_name() + return self._platform_name + + def list_native_lib_paths(self): + """Lists the paths of the Hadoop native libraries to specify in "java.library.path". + + Native libraries are expected under $HADOOP_HOME/lib/native + and under $HADOOP_HOME/lib/native/${platform}. + + Returns: + An iterable of directory paths to the Hadoop native libraries. + """ + lib_paths = list() + + native_dir_path = os.path.join(self.home_dir, "lib", "native") + if os.path.isdir(native_dir_path): + lib_paths.append(native_dir_path) + + # Hadoop wants a certain platform version, then we hope to use it + native_dirs = os.path.join(native_dir_path, self.platform_name.replace(" ", "_")) + if os.path.isdir(native_dirs): + lib_paths.append(native_dirs) + + return lib_paths + + + +# -------------------------------------------------------------------------------------------------- + class HBaseTool(HomedTool): @property @@ -316,7 +364,19 @@ def list_libdir_jars(home_env_key=None, home=None, lib=None): class ExpressTool(object): - def __init__(self, env=os.environ): + """Wrapper for the KijiExpress installation.""" + + def __init__( + self, + env=os.environ, + cp_filter=None, + ): + """Initializes a new KijiExpress wrapper. + + Args: + env: Dictionary of environment variables. + cp_filter: Optional filter for classpath entries. + """ self._env = env assert (self.home_env_key in self._env), \ ("Environment variable undefined: %r" % self.home_env_key) @@ -327,6 +387,8 @@ def __init__(self, env=os.environ): self._hbase = HBaseTool(env=self._env) self._kiji = KijiTool(env=self._env) + self._cp_filter = cp_filter + @property def home_env_key(self): return "EXPRESS_HOME" @@ -339,17 +401,12 @@ def home_dir(self): def hadoop(self): return self._hadoop - @property - def hbase(self): - return self._hbase @property def kiji(self): return self._kiji def _list_classpath_entries(self): - # TODO: include --libjars - if KIJI_CLASSPATH in self._env: user_classpath = self._env[KIJI_CLASSPATH].split(":") yield from user_classpath @@ -363,6 +420,9 @@ def _list_classpath_entries(self): def get_classpath(self, lib_jars=()): """Reports the Express classpath. + Note: classpath entries are normalized through normalize_classpath(). + In particular, classpath wildcards are expanded, duplicates eliminated. + Args: lib_jars: Optional collection of user-specified JARs to include. Returns: @@ -370,26 +430,29 @@ def get_classpath(self, lib_jars=()): """ express_classpath = self._list_classpath_entries() classpath = itertools.chain(lib_jars, express_classpath) - return normalize_classpath(classpath) + classpath = normalize_classpath(classpath) + if self._cp_filter is not None: + classpath = filter(self._cp_filter, classpath) + return classpath def list_paths_for_dist_cache(self, lib_jars): """Lists the JAR files to send to the distributed cache. Args: - lib_jars: Collection of JAR files to prepare for an Express job. + lib_jars: Optional collection of user-specified JARs to include. Returns: - Iterable of paths to send to the distributed cache. + Iterable of fully-qualified path URIs to send to the distributed cache. """ - express_classpath = ":".join(self.get_classpath(lib_jars=lib_jars)) - # Note: we might be including too many things in the dist cache. - # Someday, we should investigate and determine what is the strict minimum. - cmd = ["java", "-classpath", express_classpath, TMP_JARS_TOOL, express_classpath] - logging.debug("Running command:\n%s\n", " \\\n\t".join(map(repr, cmd))) - output = subprocess.check_output(cmd, universal_newlines=True).strip() - jars = output.split(",") - jars = sorted(jars) - logging.debug("JARs sent to the distributed cache:\n%s", "\n".join(map(tab_indent, jars))) - return jars + dc_entries = list() + for cp_entry in self.get_classpath(lib_jars=lib_jars): + if os.path.isfile(cp_entry): + dc_entries.append("file://%s" % cp_entry) + else: + logging.debug("Skipping classpath entry for distributed cache: %r", cp_entry) + + logging.debug("JARs sent to the distributed cache:\n%s", + "\n".join(map(tab_indent, dc_entries))) + return dc_entries # -------------------------------------------------------------------------------------------------- @@ -474,7 +537,19 @@ def make_arg_parser(): epilog=ENV_VAR_HELP, formatter_class=text_formatter, ) + + # Global flags available in all commands: parser.add_argument("--log-level", default="info", help="Logging level.") + parser.add_argument( + "--cp-filter", + default="", + help=("Classpath entry filter, expressed as a Python expression. " + "Available symbols are: 'path', 'name', 'os' and 're'. " + "For example, the filter: --cp-filter='\"jasper\" not in path' " + "excludes entries whose path includes the word 'jasper'. " + """With regex: --cp-filter='not re.match(r"servlet.*[.]jar", name)' """ + "excludes any entry whose file name matchs 'servlet*.jar'."), + ) subparsers = parser.add_subparsers(title="command", dest="command", help="Command to perform.") classpath_parser = subparsers.add_parser( @@ -562,7 +637,26 @@ def __init__(self, flags, args, env=os.environ): self._flags = flags self._args = args self._env = env - self._express = ExpressTool(env=self._env) + + # Construct a classpath filter, if any: + cp_filter = None + classpath_filter = flags.cp_filter + if classpath_filter is not None: + classpath_filter = classpath_filter.strip() + if (classpath_filter is not None) and (len(classpath_filter) > 0): + logging.debug("Constructing classpath filter for %r", classpath_filter) + def custom_filter(path): + globals = dict( + path=path, + name=os.path.basename(path), + os=os, + re=re, + ) + return eval(classpath_filter, globals) + + cp_filter = custom_filter + + self._express = ExpressTool(env=self._env, cp_filter=cp_filter) @property def flags(self): @@ -648,16 +742,7 @@ def hadoop_native_libs(env): java_library_path.append(hadoop_native_dir_path) # Hadoop wants a certain platform version, then we hope to use it - cmd = [ - "java", - "-cp", - ":".join(hadoop.classpath), - "-Xmx32m", - "org.apache.hadoop.util.PlatformName" - ] - output = subprocess.check_output(cmd, universal_newlines=True) - java_platform = output.strip() - logging.info("Using Hadoop platform: %r", java_platform) + java_platform = self.hadoop.platform_name native_dirs = os.path.join(hadoop_native_dir_path, java_platform.replace(" ", "_")) if os.path.isdir(native_dirs): java_library_path.append(native_dirs) @@ -696,11 +781,9 @@ def job(self): java_opts.append("-Djava.security.krb5.kdc=") lib_path = self.env.get("JAVA_LIBRARY_PATH", "").split(":") - native_lib_path = self.hadoop_native_libs(self.env) - if len(native_lib_path) > 0: - lib_path.extend(native_lib_path) - lib_path = list(filter(None, map(str.strip, lib_path))) + lib_path.extend(self.express.hadoop.list_native_lib_paths()) if len(lib_path) > 0: + logging.debug("Using Java library paths:\n%s", map(tab_indent, lib_path)) java_opts.append("-Djava.library.path=%s" % ":".join(lib_path)) logging.debug("Using JVM options: %r", java_opts) @@ -709,9 +792,11 @@ def job(self): # Hadoop generic options: hadoop_opts = list() if self.flags.disable_user_jars_take_precedence: - hadoop_opts.append("-Dmapreduce.task.classpath.user.precedence=false") + hadoop_opts.append("-Dmapreduce.task.classpath.user.precedence=false") # Old key + hadoop_opts.append("-Dmapreduce.job.user.classpath.first=false") # New key else: hadoop_opts.append("-Dmapreduce.task.classpath.user.precedence=true") + hadoop_opts.append("-Dmapreduce.job.user.classpath.first=true") hadoop_opts.extend(list(self.flags.hadoop_opts)) dist_cache_jars = self.express.list_paths_for_dist_cache(lib_jars=lib_jars) hadoop_opts.append("-Dtmpjars=%s" % ",".join(dist_cache_jars)) @@ -865,6 +950,9 @@ def init(args): print(err) return os.EX_USAGE + logging.debug("Parsed global flags: %r", flags) + logging.debug("Unparsed command-line arguments: %r", unparsed_args) + # Run program: sys.exit(main(parser, flags, unparsed_args))