Skip to content

Commit

Permalink
EXP-454. Allow express.py to specify JAR files to exclude from the cl…
Browse files Browse the repository at this point in the history
…asspath

 - Remove the call to TmpJarsTool.
 - Adds new user-classpath entry precendence config key.
 - Cleanup how java lib paths are computed.

issue: https://jira.kiji.org/browse/EXP-454
review: https://review.kiji.org/r/2080
  • Loading branch information
Christophe Taton committed Jul 30, 2014
1 parent b2a9f85 commit d41d2ec
Showing 1 changed file with 124 additions and 36 deletions.
160 changes: 124 additions & 36 deletions kiji-express-tools/src/main/scripts/express.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@


EXPRESS_TOOL = "org.kiji.express.flow.ExpressTool"
TMP_JARS_TOOL = "org.kiji.express.tool.TmpJarsTool"

EXPRESS_HOME = "EXPRESS_HOME"
HADOOP_HOME = "HADOOP_HOME"
Expand Down Expand Up @@ -225,6 +224,9 @@ def classpath(self):
return self._classpath


# --------------------------------------------------------------------------------------------------


class HadoopTool(HomedTool):
_RE_HADOOP_VERSION = re.compile(r"^Hadoop (.*)$")

Expand Down Expand Up @@ -257,6 +259,52 @@ def major_version(self):
"""Returns: the major version of this Hadoop installation (eg. 1 or 2)."""
return self.version.split(".")[0] # Pick major version

def _acquire_platform_name(self):
cmd = [
"java",
"-cp",
":".join(self.classpath),
"-Xmx32m",
"org.apache.hadoop.util.PlatformName"
]
output = subprocess.check_output(cmd, universal_newlines=True)
java_platform = output.strip()
logging.info("Using Hadoop platform: %r", java_platform)
return java_platform

@property
def platform_name(self):
"""Returns: the Hadoop platform name."""
if not hasattr(self, "_platform_name"):
self._platform_name = self._acquire_platform_name()
return self._platform_name

def list_native_lib_paths(self):
"""Lists the paths of the Hadoop native libraries to specify in "java.library.path".
Native libraries are expected under $HADOOP_HOME/lib/native
and under $HADOOP_HOME/lib/native/${platform}.
Returns:
An iterable of directory paths to the Hadoop native libraries.
"""
lib_paths = list()

native_dir_path = os.path.join(self.home_dir, "lib", "native")
if os.path.isdir(native_dir_path):
lib_paths.append(native_dir_path)

# Hadoop wants a certain platform version, then we hope to use it
native_dirs = os.path.join(native_dir_path, self.platform_name.replace(" ", "_"))
if os.path.isdir(native_dirs):
lib_paths.append(native_dirs)

return lib_paths



# --------------------------------------------------------------------------------------------------


class HBaseTool(HomedTool):
@property
Expand Down Expand Up @@ -316,7 +364,19 @@ def list_libdir_jars(home_env_key=None, home=None, lib=None):


class ExpressTool(object):
def __init__(self, env=os.environ):
"""Wrapper for the KijiExpress installation."""

def __init__(
self,
env=os.environ,
cp_filter=None,
):
"""Initializes a new KijiExpress wrapper.
Args:
env: Dictionary of environment variables.
cp_filter: Optional filter for classpath entries.
"""
self._env = env
assert (self.home_env_key in self._env), \
("Environment variable undefined: %r" % self.home_env_key)
Expand All @@ -327,6 +387,8 @@ def __init__(self, env=os.environ):
self._hbase = HBaseTool(env=self._env)
self._kiji = KijiTool(env=self._env)

self._cp_filter = cp_filter

@property
def home_env_key(self):
return "EXPRESS_HOME"
Expand All @@ -339,17 +401,12 @@ def home_dir(self):
def hadoop(self):
return self._hadoop

@property
def hbase(self):
return self._hbase

@property
def kiji(self):
return self._kiji

def _list_classpath_entries(self):
# TODO: include --libjars

if KIJI_CLASSPATH in self._env:
user_classpath = self._env[KIJI_CLASSPATH].split(":")
yield from user_classpath
Expand All @@ -363,33 +420,39 @@ def _list_classpath_entries(self):
def get_classpath(self, lib_jars=()):
"""Reports the Express classpath.
Note: classpath entries are normalized through normalize_classpath().
In particular, classpath wildcards are expanded, duplicates eliminated.
Args:
lib_jars: Optional collection of user-specified JARs to include.
Returns:
An iterable of classpath entries.
"""
express_classpath = self._list_classpath_entries()
classpath = itertools.chain(lib_jars, express_classpath)
return normalize_classpath(classpath)
classpath = normalize_classpath(classpath)
if self._cp_filter is not None:
classpath = filter(self._cp_filter, classpath)
return classpath

def list_paths_for_dist_cache(self, lib_jars):
"""Lists the JAR files to send to the distributed cache.
Args:
lib_jars: Collection of JAR files to prepare for an Express job.
lib_jars: Optional collection of user-specified JARs to include.
Returns:
Iterable of paths to send to the distributed cache.
Iterable of fully-qualified path URIs to send to the distributed cache.
"""
express_classpath = ":".join(self.get_classpath(lib_jars=lib_jars))
# Note: we might be including too many things in the dist cache.
# Someday, we should investigate and determine what is the strict minimum.
cmd = ["java", "-classpath", express_classpath, TMP_JARS_TOOL, express_classpath]
logging.debug("Running command:\n%s\n", " \\\n\t".join(map(repr, cmd)))
output = subprocess.check_output(cmd, universal_newlines=True).strip()
jars = output.split(",")
jars = sorted(jars)
logging.debug("JARs sent to the distributed cache:\n%s", "\n".join(map(tab_indent, jars)))
return jars
dc_entries = list()
for cp_entry in self.get_classpath(lib_jars=lib_jars):
if os.path.isfile(cp_entry):
dc_entries.append("file://%s" % cp_entry)
else:
logging.debug("Skipping classpath entry for distributed cache: %r", cp_entry)

logging.debug("JARs sent to the distributed cache:\n%s",
"\n".join(map(tab_indent, dc_entries)))
return dc_entries


# --------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -474,7 +537,19 @@ def make_arg_parser():
epilog=ENV_VAR_HELP,
formatter_class=text_formatter,
)

# Global flags available in all commands:
parser.add_argument("--log-level", default="info", help="Logging level.")
parser.add_argument(
"--cp-filter",
default="",
help=("Classpath entry filter, expressed as a Python expression. "
"Available symbols are: 'path', 'name', 'os' and 're'. "
"For example, the filter: --cp-filter='\"jasper\" not in path' "
"excludes entries whose path includes the word 'jasper'. "
"""With regex: --cp-filter='not re.match(r"servlet.*[.]jar", name)' """
"excludes any entry whose file name matchs 'servlet*.jar'."),
)

subparsers = parser.add_subparsers(title="command", dest="command", help="Command to perform.")
classpath_parser = subparsers.add_parser(
Expand Down Expand Up @@ -562,7 +637,26 @@ def __init__(self, flags, args, env=os.environ):
self._flags = flags
self._args = args
self._env = env
self._express = ExpressTool(env=self._env)

# Construct a classpath filter, if any:
cp_filter = None
classpath_filter = flags.cp_filter
if classpath_filter is not None:
classpath_filter = classpath_filter.strip()
if (classpath_filter is not None) and (len(classpath_filter) > 0):
logging.debug("Constructing classpath filter for %r", classpath_filter)
def custom_filter(path):
globals = dict(
path=path,
name=os.path.basename(path),
os=os,
re=re,
)
return eval(classpath_filter, globals)

cp_filter = custom_filter

self._express = ExpressTool(env=self._env, cp_filter=cp_filter)

@property
def flags(self):
Expand Down Expand Up @@ -648,16 +742,7 @@ def hadoop_native_libs(env):
java_library_path.append(hadoop_native_dir_path)

# Hadoop wants a certain platform version, then we hope to use it
cmd = [
"java",
"-cp",
":".join(hadoop.classpath),
"-Xmx32m",
"org.apache.hadoop.util.PlatformName"
]
output = subprocess.check_output(cmd, universal_newlines=True)
java_platform = output.strip()
logging.info("Using Hadoop platform: %r", java_platform)
java_platform = self.hadoop.platform_name
native_dirs = os.path.join(hadoop_native_dir_path, java_platform.replace(" ", "_"))
if os.path.isdir(native_dirs):
java_library_path.append(native_dirs)
Expand Down Expand Up @@ -696,11 +781,9 @@ def job(self):
java_opts.append("-Djava.security.krb5.kdc=")

lib_path = self.env.get("JAVA_LIBRARY_PATH", "").split(":")
native_lib_path = self.hadoop_native_libs(self.env)
if len(native_lib_path) > 0:
lib_path.extend(native_lib_path)
lib_path = list(filter(None, map(str.strip, lib_path)))
lib_path.extend(self.express.hadoop.list_native_lib_paths())
if len(lib_path) > 0:
logging.debug("Using Java library paths:\n%s", map(tab_indent, lib_path))
java_opts.append("-Djava.library.path=%s" % ":".join(lib_path))

logging.debug("Using JVM options: %r", java_opts)
Expand All @@ -709,9 +792,11 @@ def job(self):
# Hadoop generic options:
hadoop_opts = list()
if self.flags.disable_user_jars_take_precedence:
hadoop_opts.append("-Dmapreduce.task.classpath.user.precedence=false")
hadoop_opts.append("-Dmapreduce.task.classpath.user.precedence=false") # Old key
hadoop_opts.append("-Dmapreduce.job.user.classpath.first=false") # New key
else:
hadoop_opts.append("-Dmapreduce.task.classpath.user.precedence=true")
hadoop_opts.append("-Dmapreduce.job.user.classpath.first=true")
hadoop_opts.extend(list(self.flags.hadoop_opts))
dist_cache_jars = self.express.list_paths_for_dist_cache(lib_jars=lib_jars)
hadoop_opts.append("-Dtmpjars=%s" % ",".join(dist_cache_jars))
Expand Down Expand Up @@ -865,6 +950,9 @@ def init(args):
print(err)
return os.EX_USAGE

logging.debug("Parsed global flags: %r", flags)
logging.debug("Unparsed command-line arguments: %r", unparsed_args)

# Run program:
sys.exit(main(parser, flags, unparsed_args))

Expand Down

0 comments on commit d41d2ec

Please sign in to comment.