Skip to content

Commit

Permalink
add trackers.
Browse files Browse the repository at this point in the history
  • Loading branch information
starimpact committed Feb 23, 2018
1 parent ee1ee75 commit 4769dfc
Show file tree
Hide file tree
Showing 71 changed files with 8,794 additions and 0 deletions.
42 changes: 42 additions & 0 deletions dmlc-core/tracker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
DMLC Tracker
============
Job submission and tracking script for DMLC. To submit your job to cluster.
Use the following command

```bash
dmlc-submit --mode <cluster-mode> [arguments] [command]
```

DMLC job will start executors, each act as role of worker or server.
It works for both parameter server based jobs as well as rabit allreduce jobs.

Parameters
----------
The following is a list of frequently used arguments available in the dmlc-submit command.
To get full list of arguments, you can run
```bash
dmlc-submit -h
```

- ```--cluster``` string, {'mpi', 'yarn', 'local', 'sge'}, default to ${DMLC_SUBMIT_CLUSTER}
- Job submission mode.
- ```--num-workers``` integer, required
- Number of workers in the job.
- ```--num-servers``` integer, default=0
- Number of servers in the job.
- ```--worker-cores``` integer, default=1
- Number of cores needed to be allocated for worker job.
- ```--server-cores``` integer, default=1
- Number of cores needed to be allocated for server job.
- ```--worker-memory``` string, default='1g'
- Memory needed for server job.
- ```--server-memory``` string, default='1g'
- Memory needed for server job.
- ```--jobname``` string, default=auto specify
- Name of the job.
- ```--queue``` string, default='default'
- The submission queue we should submit the job to.
- ```--log-level``` string, {INFO, DEBUG}
- The logging level.
- ```--log-file``` string, default='None'
- Output log to the specific log file, the log is still printed on stderr.
9 changes: 9 additions & 0 deletions dmlc-core/tracker/dmlc-submit
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env python
import sys
import os
curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
sys.path.insert(0, curr_path)

from dmlc_tracker import submit

submit.main()
2 changes: 2 additions & 0 deletions dmlc-core/tracker/dmlc_tracker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"""DMLC Tracker modules for running jobs on different platforms."""
from __future__ import absolute_import
81 changes: 81 additions & 0 deletions dmlc-core/tracker/dmlc_tracker/launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python
# pylint: disable=invalid-name
"""The container launcher script that launches DMLC with the right env variable."""
import glob
import sys
import os
import subprocess

def unzip_archives(ar_list, env):
for fname in ar_list:
if not os.path.exists(fname):
continue
if fname.endswith('.zip'):
subprocess.call(args=['unzip', fname], env=env)
elif fname.find('.tar') != -1:
subprocess.call(args=['tar', '-xf', fname], env=env)

def main():
"""Main moduke of the launcher."""
if len(sys.argv) < 2:
print('Usage: launcher.py your command')
sys.exit(0)

hadoop_home = os.getenv('HADOOP_HOME')
hdfs_home = os.getenv('HADOOP_HDFS_HOME')
java_home = os.getenv('JAVA_HOME')
hadoop_home = os.getenv('HADOOP_PREFIX') if hadoop_home is None else hadoop_home
cluster = os.getenv('DMLC_JOB_CLUSTER')

assert cluster is not None, 'need to have DMLC_JOB_CLUSTER'

env = os.environ.copy()
library_path = ['./']
class_path = []

if cluster == 'yarn':
assert hadoop_home is not None, 'need to set HADOOP_HOME'
assert hdfs_home is not None, 'need to set HADOOP_HDFS_HOME'
assert java_home is not None, 'need to set JAVA_HOME'

if cluster == 'sge':
num_worker = int(env['DMLC_NUM_WORKER'])
task_id = int(env['DMLC_TASK_ID'])
if task_id < num_worker:
env['DMLC_ROLE'] = 'worker'
else:
env['DMLC_ROLE'] = 'server'

if hadoop_home:
library_path.append('%s/lib/native' % hdfs_home)
library_path.append('%s/lib' % hdfs_home)
(classpath, _) = subprocess.Popen('%s/bin/hadoop classpath' % hadoop_home,
stdout=subprocess.PIPE, shell=True,
env=os.environ).communicate()
for f in classpath.split(':'):
class_path += glob.glob(f)

if java_home:
library_path.append('%s/jre/lib/amd64/server' % java_home)

env['CLASSPATH'] = '${CLASSPATH}:' + (':'.join(class_path))

# setup hdfs options
if 'DMLC_HDFS_OPTS' in env:
env['LIBHDFS_OPTS'] = env['DMLC_HDFS_OPTS']
elif 'LIBHDFS_OPTS' not in env:
env['LIBHDFS_OPTS'] = '--Xmx128m'

LD_LIBRARY_PATH = env['LD_LIBRARY_PATH'] if 'LD_LIBRARY_PATH' in env else ''
env['LD_LIBRARY_PATH'] = LD_LIBRARY_PATH + ':' + ':'.join(library_path)

# unzip the archives.
if 'DMLC_JOB_ARCHIVES' in env:
unzip_archives(env['DMLC_JOB_ARCHIVES'].split(':'), env)

ret = subprocess.call(args=sys.argv[1:], env=env)
sys.exit(ret)


if __name__ == '__main__':
main()
72 changes: 72 additions & 0 deletions dmlc-core/tracker/dmlc_tracker/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Submission job for local jobs."""
# pylint: disable=invalid-name
from __future__ import absolute_import

import sys
import os
import subprocess
import logging
from threading import Thread
from . import tracker

def exec_cmd(cmd, role, taskid, pass_env):
"""Execute the command line command."""
if cmd[0].find('/') == -1 and os.path.exists(cmd[0]) and os.name != 'nt':
cmd[0] = './' + cmd[0]
cmd = ' '.join(cmd)
env = os.environ.copy()
for k, v in pass_env.items():
env[k] = str(v)

env['DMLC_TASK_ID'] = str(taskid)
env['DMLC_ROLE'] = role
env['DMLC_JOB_CLUSTER'] = 'local'

num_retry = 0
if 'DMLC_NUM_ATTEMPT' in env:
num_retry = env['DMLC_NUM_ATTEMPT']

while True:
if os.name == 'nt':
ret = subprocess.call(cmd, shell=True, env=env)
else:
ret = subprocess.call(cmd, shell=True, executable='bash', env=env)
if ret == 0:
logging.debug('Thread %d exit with 0', taskid)
return
else:
num_retry -= 1
if num_retry >= 0:
continue
if os.name == 'nt':
sys.exit(-1)
else:
raise RuntimeError('Get nonzero return code=%d' % ret)


def submit(args):
"""Submit function of local jobs."""
def mthread_submit(nworker, nserver, envs):
"""
customized submit script, that submit nslave jobs, each must contain args as parameter
note this can be a lambda function containing additional parameters in input
Parameters
----------
nworker: number of slave process to start up
nserver: number of server nodes to start up
envs: enviroment variables to be added to the starting programs
"""
procs = {}
for i in range(nworker + nserver):
if i < nworker:
role = 'worker'
else:
role = 'server'
procs[i] = Thread(target=exec_cmd, args=(args.command, role, i, envs))
procs[i].setDaemon(True)
procs[i].start()

# call submit, with nslave, the commands to run each job and submit function
tracker.submit(args.num_workers, args.num_servers, fun_submit=mthread_submit,
pscmd=(' '.join(args.command)))
104 changes: 104 additions & 0 deletions dmlc-core/tracker/dmlc_tracker/mesos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python
"""
DMLC submission script by mesos
One need to make sure all slaves machines are ssh-able.
"""
from __future__ import absolute_import

import os
import sys
import json
import uuid
import logging
from threading import Thread
from . import tracker
try:
import pymesos.subprocess
logging.getLogger('pymesos').setLevel(logging.WARNING)

def _run(prog, env, resources):
cwd = os.getcwd()
pymesos.subprocess.check_call(
prog, shell=True, env=env, cwd=cwd,
cpus=resources['cpus'], mem=resources['mem']
)

_USE_PYMESOS = True

except ImportError:
import subprocess
DEVNULL = open(os.devnull, 'w')

def _run(prog, env, resources):
master = os.environ['MESOS_MASTER']
if ':' not in master:
master += ':5050'

name = str(uuid.uuid4())
cwd = os.getcwd()
prog = "cd %s && %s" % (cwd, prog)

resources = ';'.join('%s:%s' % (k, v) for k, v in resources.items())
prog = prog.replace('\'', '\\\'')
env = json.dumps(env).replace('\'', '\\\'')
resources = resources.replace('\'', '\\\'')
cmd = (
'mesos-execute --master=%s --name=\'%s\''
' --command=\'%s\' --env=\'%s\' --resources=\'%s\'' %
(master, name, prog, env, resources)
)

subprocess.check_call(
cmd,
shell=True,
stdout=DEVNULL,
stderr=subprocess.STDOUT)

_USE_PYMESOS = False

def get_env():
# get system envs
keys = set(['OMP_NUM_THREADS', 'KMP_AFFINITY', 'LD_LIBRARY_PATH'])
return {k: v for k, v in os.environ.items() if k in keys}


def submit(args):
def mesos_submit(nworker, nserver, pass_envs):
"""
customized submit script
"""
# launch jobs
for i in range(nworker + nserver):
resources = {}
pass_envs['DMLC_ROLE'] = 'server' if i < nserver else 'worker'
if i < nserver:
pass_envs['DMLC_SERVER_ID'] = i
resources['cpus'] = args.server_cores
resources['mem'] = args.server_memory_mb
else:
pass_envs['DMLC_WORKER_ID'] = i - nserver
resources['cpus'] = args.worker_cores
resources['mem'] = args.worker_memory_mb

env = {str(k): str(v) for k, v in pass_envs.items()}
env.update(get_env())
prog = ' '.join(args.command)
thread = Thread(target=_run, args=(prog, env, resources))
thread.setDaemon(True)
thread.start()

return mesos_submit

if not _USE_PYMESOS:
logging.warning('No PyMesos found, use mesos-execute instead,'
' no task output available')

if args.mesos_master:
os.environ['MESOS_MASTER'] = args.mesos_master

assert 'MESOS_MASTER' in os.environ, 'No mesos master configured!'

tracker.submit(args.num_workers, args.num_servers,
fun_submit=mesos_submit,
pscmd=(' '.join(args.command)))
82 changes: 82 additions & 0 deletions dmlc-core/tracker/dmlc_tracker/mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
DMLC submission script, MPI version
"""
# pylint: disable=invalid-name
from __future__ import absolute_import

import sys
import subprocess, logging
from threading import Thread
from . import tracker

def get_mpi_env(envs):
"""get the mpirun command for setting the envornment
support both openmpi and mpich2
"""

# windows hack: we will use msmpi
if sys.platform == 'win32':
for k, v in envs.items():
cmd += ' -env %s %s' % (k, str(v))
return cmd

# decide MPI version.
(_, err) = subprocess.Popen('mpirun',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
cmd = ''
if 'Open MPI' in err:
for k, v in envs.items():
cmd += ' -x %s=%s' % (k, str(v))
elif 'mpich' in err:
for k, v in envs.items():
cmd += ' -env %s %s' % (k, str(v))
else:
raise RuntimeError('Unknown MPI Version')
return cmd


def submit(args):
"""Submission script with MPI."""
def mpi_submit(nworker, nserver, pass_envs):
"""Internal closure for job submission."""
def run(prog):
"""run the program"""
subprocess.check_call(prog, shell=True)

cmd = ''
if args.host_file is not None:
cmd = '--hostfile %s ' % (args.host_file)
cmd += ' ' + ' '.join(args.command)

pass_envs['DMLC_JOB_CLUSTER'] = 'mpi'

# start workers
if nworker > 0:
logging.info('Start %d workers by mpirun' % nworker)
pass_envs['DMLC_ROLE'] = 'worker'
if sys.platform == 'win32':
prog = 'mpiexec -n %d %s %s' % (nworker, get_mpi_env(pass_envs), cmd)
else:
prog = 'mpirun -n %d %s %s' % (nworker, get_mpi_env(pass_envs), cmd)
thread = Thread(target=run, args=(prog,))
thread.setDaemon(True)
thread.start()


# start servers
if nserver > 0:
logging.info('Start %d servers by mpirun' % nserver)
pass_envs['DMLC_ROLE'] = 'server'
if sys.platform == 'win32':
prog = 'mpiexec -n %d %s %s' % (nworker, get_mpi_env(pass_envs), cmd)
else:
prog = 'mpirun -n %d %s %s' % (nserver, get_mpi_env(pass_envs), cmd)
thread = Thread(target=run, args=(prog,))
thread.setDaemon(True)
thread.start()


tracker.submit(args.num_workers, args.num_servers,
fun_submit=mpi_submit,
pscmd=(' '.join(args.command)))
Loading

0 comments on commit 4769dfc

Please sign in to comment.