Skip to content

Commit

Permalink
indexes preprocessed lzos on EMR
Browse files Browse the repository at this point in the history
  • Loading branch information
nellore committed Feb 14, 2016
1 parent 1cc28a0 commit 274779b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/hadoop/relevant-elephant/SplitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static List<List<InputSplit>> getCombinedSplits(
List<InputSplit> oneInputSplits, long maxCombinedSplitSize, Configuration conf)
throws IOException, InterruptedException {
int combinedSplitCount = conf.getInt(COMBINED_SPLIT_COUNT, -1);
LOG.info("Combining splits into " + combinedSplitCount + " task(s).");
LOG.info("Distributing/combining splits into " + combinedSplitCount + " task(s).");
if (combinedSplitCount <= 0) {
// Do whatever Pig does if combined split count is unspecified or invalid
LOG.warn("Combined split count is either unspecified or invalid; combining splits using combine split size.");
Expand Down
20 changes: 18 additions & 2 deletions src/rna/driver/rna_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ def step(name, inputs, output,
action_on_failure='TERMINATE_JOB_FLOW', jar=_hadoop_streaming_jar,
tasks=0, partition_options=None, sort_options=None, archives=None,
files=None, multiple_outputs=False, mod_partitioner=False,
inputformat=None, extra_args=[]):
inputformat=None, outputformat=None, extra_args=[]):
""" Outputs JSON for a given step.
name: name of step
Expand All @@ -705,6 +705,7 @@ def step(name, inputs, output,
mod_partitioner: True iff the mod partitioner should be used for
the step; this partitioner assumes the key is a tuple of integers
inputformat: -inputformat option
outputformat: -outputformat option; overridden by multiple_outputs
extra_args: extra '-D' args
Return value: step dictionary
Expand Down Expand Up @@ -787,6 +788,10 @@ def step(name, inputs, output,
to_return['HadoopJarStep']['Args'].extend([
'-outputformat', 'edu.jhu.cs.MultipleOutputFormat'
])
elif outputformat is not None:
to_return['HadoopJarStep']['Args'].extend([
'-outputformat', outputformat
])
if inputformat is not None:
to_return['HadoopJarStep']['Args'].extend([
'-inputformat', inputformat
Expand Down Expand Up @@ -950,6 +955,8 @@ def steps(protosteps, action_on_failure, jar, step_dir, reducer_count,
),
inputformat=(protostep['inputformat']
if 'inputformat' in protostep else None),
outputformat=(protostep['outputformat']
if 'outputformat' in protostep else None),
extra_args=([extra_arg.format(task_count=reducer_count)
for extra_arg in protostep['extra_args']]
if 'extra_args' in protostep else [])
Expand Down Expand Up @@ -3708,9 +3715,18 @@ def protosteps(base, prep_dir, push_dir, elastic=False):
'no_input_prefix' : True,
'output' : push_dir if elastic else prep_dir,
'no_output_prefix' : True,
'outputformat' : (
'com.twitter.elephantbird.mapred.output'
'.DeprecatedLzoTextOutputFormat'
),
'inputformat' : (
'org.apache.hadoop.mapred.lib.NLineInputFormat'
)
),
'extra_args' : [
('mapreduce.output.fileoutputformat.compress.codec='
'org.apache.hadoop.io.compress.DefaultCodec'),
'elephantbird.lzo.output.index=true'
]
},
]
return steps_to_return
Expand Down

0 comments on commit 274779b

Please sign in to comment.