-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathlmr
executable file
·97 lines (82 loc) · 3.08 KB
/
lmr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/bin/bash
function usage() { echo "Usage: $0 [-k] <BLOCKSIZE> <NUM_HASHING_SEGS> <MAPPER> <REDUCER> <OUTPUT_DIR>" 1>&2; exit 1; }
keep_mapper_out=false
while getopts ":k" OPTION
do
case $OPTION in
k) keep_mapper_out=true;;
?) usage;;
esac
done
shift $((OPTIND-1))
[ $# -ne 5 ] && usage
BLOCKSIZE="$1"
NUM_HASHING_SEGS="$2"
MAPPER="$3"
REDUCER="$4"
OUTPUT_DIR="$5"
mkdir "$OUTPUT_DIR" || exit 1
function timer()
{
local stime=$1
[[ -z $stime ]] && stime=$START_TIME
[[ -z $stime ]] && stime=0
dt=$((SECONDS - stime))
date -d@$dt -u +%H:%M:%S 2>/dev/null || date -u -r $dt +%T
}
HASHING_SCRIPT=`mktemp hashing.py.XXXX`
{
echo $' \e[1;32m>>>\e[m Temporary mapper hashing script created: \e[1;32m'$HASHING_SCRIPT$'\e[m'
}>&2
TEMPDIR=`mktemp -d mapper_tmp.XXXX`
{
mkdir `seq -f $TEMPDIR'/reducer-%02g' 0 $((NUM_HASHING_SEGS-1))`
echo $' \e[1;32m>>>\e[m Temporary mapper output directory created: \e[1;32m'$TEMPDIR$'\e[m'
}>&2
function clean_up() {
echo $' \e[1;32m>>>\e[m Cleaning...'
sleep 1
if [ $keep_mapper_out = false ] ; then
rm -r "$TEMPDIR" &&
echo $' \e[1;32m>>>\e[m' Temporary directory deleted: $'\e[1;32m'"$TEMPDIR"$'\e[m' ||
echo $' \e[1;31m*\e[m' Failed to delete temporary directoy: $'\e[1;32m'"$TEMPDIR"$'\e[m'
else
echo $' \e[1;33m*\e[m' Mapper output directory: $'\e[1;32m'"$TEMPDIR"$'\e[m'
fi
rm "$HASHING_SCRIPT" &&
echo $' \e[1;32m>>>\e[m' Temporary hashing script deleted: $'\e[1;32m'"$HASHING_SCRIPT"$'\e[m' ||
echo $' \e[1;31m*\e[m' Failed to delete hashing script: $'\e[1;32m'"$HASHING_SCRIPT"$'\e[m'
} >&2
trap 'clean_up' EXIT
trap 'exit' SIGHUP SIGINT SIGTERM
cat <<EOF > "${HASHING_SCRIPT}"
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys, os, fileinput
N_REDUCER, MAPPER_ID, BASE_DIR = int(sys.argv[1]), int(sys.argv[2]), sys.argv[3]
seg_file = [open(os.path.join(BASE_DIR, f"reducer-{seg_id:02}", f"mapper-{MAPPER_ID:02}"), 'w')
for seg_id in range(N_REDUCER)]
for line in fileinput.input(sys.argv[4:]):
key, _, value = line.rstrip().partition('\t')
print(key, value, sep='\t', file=seg_file[hash(key) % N_REDUCER])
EOF
START_TIME=timer
echo $' \e[1;33m>>>\e[m' Mappers running... $'\e[1;33m'$(timer START_TIME)$'\e[m'
echo
parallel --pipe --block-size "${BLOCKSIZE}" --ungroup --halt now,fail=1 \
"echo -n $'\e[s\e[F\e[2K #{#}\e[u'; $MAPPER | PYTHONHASHSEED=0 python $HASHING_SCRIPT ${NUM_HASHING_SEGS} {#} $TEMPDIR" &&
{
echo $' \e[1;33m>>>\e[m' Sort parts running... $'\e[1;33m'$(timer START_TIME)$'\e[m'
parallel --bar --ungroup --halt now,fail=1 \
"LC_ALL=C sort -k1,1 -t $'\t' -o {} {}" ::: "${TEMPDIR}"/*/*
} &&
{
echo $' \e[1;33m>>>\e[m' Reducer running... $'\e[1;33m'$(timer START_TIME)$'\e[m'
parallel --bar --ungroup --halt now,fail=1 \
"LC_ALL=C sort -m -k1,1 -t $'\t' {}/* | $REDUCER > '$OUTPUT_DIR/{/.}'" ::: "${TEMPDIR}"/*
} &&
{
echo
echo $' \e[1;33m*\e[m' Output directory: $'\e[1;32m'"$OUTPUT_DIR"$'\e[m'
echo $' \e[1;33m*\e[m' Elasped time: $'\e[1;32m'$(timer START_TIME)$'\e[m'
} >&2