Skip to content

Commit

Permalink
Added standalone and distributed setup
Browse files Browse the repository at this point in the history
  • Loading branch information
Adarsh Mishra committed Dec 9, 2021
1 parent 9024992 commit a24bba7
Show file tree
Hide file tree
Showing 4,160 changed files with 894,594 additions and 0 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
Binary file added .DS_Store
Binary file not shown.
Binary file added Distributed Setup/.DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions Distributed Setup/metrics.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
while [ $(docker ps -q $1 | wc -l) -gt 0 ];do docker stats --no-stream | tee --append metrics.csv;sleep 1;done
135 changes: 135 additions & 0 deletions Distributed Setup/run/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from fabric import Connection
from fabric import SerialGroup


def schedulerEnv(rootUri,n_ps,n_w):
file=open("scheduler","w")
L = ["DMLC_ROLE=scheduler\n","DMLC_PS_ROOT_URI="+str(rootUri)+"\n","DMLC_PS_ROOT_PORT=9091\n","DMLC_NUM_SERVER="+str(n_ps)+"\n","DMLC_NUM_WORKER="+str(n_w)+"\n"]
file.writelines(L)
file.close()
Connection(rootUri).run('sudo mkdir -p /tmp/mxnet_env && sudo chmod 777 /tmp/mxnet_env')
Connection(rootUri).put('scheduler', '/tmp/mxnet_env')
print(rootUri+" scheduler env set")

def serverEnv(rootUri,nodeHost,n_ps,n_w):
file=open("server","w")
L = ["DMLC_ROLE=server\n","DMLC_PS_ROOT_URI="+str(rootUri)+"\n","DMLC_PS_ROOT_PORT=9091\n","DMLC_NODE_HOST="+str(nodeHost)+"\n","DMLC_SERVER_ID=0\n""DMLC_NUM_SERVER="+str(n_ps)+"\n","DMLC_NUM_WORKER="+str(n_w)+"\n"]
file.writelines(L)
file.close()
Connection(nodeHost).run('sudo mkdir -p /tmp/mxnet_env && sudo chmod 777 /tmp/mxnet_env')
Connection(nodeHost).put('server', '/tmp/mxnet_env')
print(nodeHost+" server env set")


def WorkerEnv(rootUri,nodeHost,n_ps,n_w):
file=open("worker","w")
L = ["DMLC_ROLE=worker\n","DMLC_PS_ROOT_URI="+str(rootUri)+"\n","DMLC_PS_ROOT_PORT=9091\n","DMLC_NODE_HOST="+str(nodeHost)+"\n","DMLC_SERVER_ID=0\n""DMLC_NUM_SERVER="+str(n_ps)+"\n","DMLC_NUM_WORKER="+str(n_w)+"\n"]
file.writelines(L)
file.close()
Connection(nodeHost).run('sudo mkdir -p /tmp/mxnet_env && sudo chmod 777 /tmp/mxnet_env')
Connection(nodeHost).put('worker', '/tmp/mxnet_env')
print(nodeHost+" worker env set")

def startScript(name,idx,nodeHost,ps,worker,job):
file=open("run.sh","w")
tempName = name
name = str(name)+"_"+str(idx)
name = str(name)
# print(name)
ps = str(ps)
worker = str(worker)
L = list()
check = "if [[ $(docker ps -q $1 | wc -l) -gt 0 ]] ; then docker stop $(docker ps -a -q) && docker rm $(docker ps -a -q); fi \n"
L.append(check)
w = "docker run -d --env-file /tmp/mxnet_env/worker --name " + name + " -v /mnt/mxnet-test/incubator-mxnet:/incubator-mxnet -w /incubator-mxnet/example/image-classification/ --net=host abhin99/mxnet python3 train_cifar10.py --network resnet --num-layers 110 --batch-size 64 --num-epochs 5 --disp-batches 1 --loss ce --kv-store dist_sync\n"
sch = "docker run -d --env-file /tmp/mxnet_env/scheduler --name " + name + " -v /mnt/mxnet-test/incubator-mxnet:/incubator-mxnet -w /incubator-mxnet/example/image-classification/ --net=host abhin99/mxnet python3 train_cifar10.py --network resnet --num-layers 110 --batch-size 64 --num-epochs 5 --disp-batches 1 --loss ce --kv-store dist_sync\n"
if(tempName=='scheduler'):
name='server'
name = str(name)+"_"+str(idx)
s = "docker run -d --env-file /tmp/mxnet_env/server --name " + name + " -v /mnt/mxnet-test/incubator-mxnet:/incubator-mxnet -w /incubator-mxnet/example/image-classification/ --net=host abhin99/mxnet python3 train_cifar10.py --network resnet --num-layers 110 --batch-size 64 --num-epochs 5 --disp-batches 1 --loss ce --kv-store dist_sync\n"
workerLogs = 'docker logs -f '+name+' &> '+job+'_'+'logs_'+name+'_'+ps+'_'+worker+'.log &\n'
if tempName=='worker' :
print("in worker")
L.append(w)
L.append(workerLogs)
elif tempName=='server':
print("in server")
L.append(s)
elif tempName=='scheduler':
print("in scheduler")
L.append(sch)
L.append(s)
# metrics = 'while [ $(docker ps -q $1 | wc -l) -gt 0 ];do docker stats --no-stream | tee --append stats_'+name+'_'+ps+'_'+worker+'.csv;sleep 1;done &'
# L.append(metrics
# print(L)
file.writelines(L)
file.close()
Connection(nodeHost).run('sudo mkdir -p /benchmarks && sudo chmod 777 /benchmarks')
Connection(nodeHost).put('run.sh', '/benchmarks')
Connection(nodeHost).run('sudo chmod 777 /benchmarks/run.sh')
print(nodeHost+" "+name+" start script set")



# assume 1 VM = 1 Container

MasterIP = '10.142.0.2'
hosts = [MasterIP,'10.142.0.3','10.142.0.4','10.142.0.6']
rootUri = hosts[0]
hostsDict = dict()
ps = 2
worker = 2
job = "resnet-110_64-cifar10"

# sanity check

if((ps + worker)>len(hosts)):
print("Error resouces not sufficient")

else:

toallocServer = 1 # track allocated resources

for i in range(ps):
if(i==0):
schedulerEnv(rootUri,ps,worker)
# print("scheduler",hosts[i])
hostsDict[hosts[i]] = "server_"+str(toallocServer)
serverEnv(rootUri,hosts[i],ps,worker)
# print("server",hosts[i])
startScript('scheduler',toallocServer,hosts[i],ps,worker,job)
else:
# print("server",hosts[i])
hostsDict[hosts[i]] = "server_"+str(toallocServer)
serverEnv(rootUri,hosts[i],ps,worker)
startScript('server',toallocServer,hosts[i],ps,worker,job)
toallocServer = toallocServer + 1

# serverEnv(rootUri,hosts[i],ps,worker)

toallocWorker = 1
for i in range(ps,ps+worker):
# print("worker",hosts[i])
hostsDict[hosts[i]] = "worker_"+str(toallocWorker)
WorkerEnv(rootUri,hosts[i],ps,worker)
startScript('worker',toallocWorker,hosts[i],ps,worker,job)
toallocWorker = toallocWorker + 1


req = list()
for i in range(ps+worker):
# print(hosts[i])
req.append(hosts[i])
# Connection(hosts[i]).run("cd /benchmarks && sudo bash run.sh")
# print("running script on ",hosts[i])


SerialGroup(*req).run("cd /benchmarks && sudo bash run.sh")

for i in range(ps+worker):
Connection(hosts[i]).run('cd /benchmarks && sudo rm -f metrics.csv && bash -c "( (nohup ./metrics.sh >'+job+'_'+'metrics_'+hostsDict[hosts[i]]+'_'+str(ps)+'_'+str(worker)+'.csv 2>&1 &) )"')
print("metrics",hosts[i])

# result = Connection('10.142.0.2').run('uname -s',hide = True)
# msg = "Ran {0.command!r} on {0.connection.host}, got stdout:\n{0.stdout}"
# print(msg.format(result))
17 changes: 17 additions & 0 deletions Distributed Setup/run/mxnet-mkl.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM mxnet/python

LABEL MAINTAINER="abhin99"

RUN apt-get update

RUN pip3 uninstall mxnet -y

RUN pip3 install mxnet-mkl

ENV OMP_NUM_THREADS=2






3 changes: 3 additions & 0 deletions Distributed Setup/run/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
if [[ $(docker ps -q $1 | wc -l) -gt 0 ]] ; then docker stop $(docker ps -a -q) && docker rm $(docker ps -a -q); fi
docker run -d --env-file /tmp/mxnet_env/worker --name worker_2 -v /mnt/mxnet-test/incubator-mxnet:/incubator-mxnet -w /incubator-mxnet/example/image-classification/ --net=host abhin99/mxnet python3 train_cifar10.py --network resnet --num-layers 110 --batch-size 64 --num-epochs 5 --disp-batches 1 --loss ce --kv-store dist_sync
docker logs -f worker_2 &> resnet-110_64-cifar10_logs_worker_2_2_2.log &
5 changes: 5 additions & 0 deletions Distributed Setup/run/scheduler
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DMLC_ROLE=scheduler
DMLC_PS_ROOT_URI=10.142.0.2
DMLC_PS_ROOT_PORT=9091
DMLC_NUM_SERVER=2
DMLC_NUM_WORKER=2
7 changes: 7 additions & 0 deletions Distributed Setup/run/server
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DMLC_ROLE=server
DMLC_PS_ROOT_URI=10.142.0.2
DMLC_PS_ROOT_PORT=9091
DMLC_NODE_HOST=10.142.0.3
DMLC_SERVER_ID=0
DMLC_NUM_SERVER=2
DMLC_NUM_WORKER=2
7 changes: 7 additions & 0 deletions Distributed Setup/run/worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DMLC_ROLE=worker
DMLC_PS_ROOT_URI=10.142.0.2
DMLC_PS_ROOT_PORT=9091
DMLC_NODE_HOST=10.142.0.6
DMLC_SERVER_ID=0
DMLC_NUM_SERVER=2
DMLC_NUM_WORKER=2
90 changes: 90 additions & 0 deletions Epoch Estimation/EpochEstimator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import matplotlib.pyplot as plt
import numpy as np
import re
import argparse
import sys
import time
import threading
import math
from scipy.optimize import curve_fit
import random


def __loss_fit_func(x, a, b, c):
return (1/(a*x+b))+c

def _loss_curve_fitting(epochs_arr, losses_arr):
param_bounds = ([0, 0, 0], [np.inf, np.inf, np.inf])
sigma = np.ones(len(epochs_arr))
NUM_SEGMENTS = 3
for i in range(len(epochs_arr)):
exp = int(math.floor(i/(math.ceil(1.0*len(epochs_arr)/NUM_SEGMENTS))))
sigma[i] /= 4 ** exp

params = curve_fit(__loss_fit_func, epochs_arr, losses_arr, sigma=np.array(sigma), absolute_sigma=False, bounds=param_bounds)
return params[0]

def est_epoch(val_losses):
if len(val_losses) >= 3:
epoch_list = []
loss_list = []
for epoch, loss in val_losses.items():
epoch_list.append(epoch)
loss_list.append(loss)

try:
[a, b, c] = _loss_curve_fitting(epoch_list, loss_list) # could throw exception since the loss may not descend at the beginning

except Exception as e:
print("loss curve fitting error: ", e)
return -1
epoch = 0
fitted_losses = []
while True:
fitted_losses.append(__loss_fit_func(epoch, a, b, c))
flag = True
if len(fitted_losses) >= 8:
for i in reversed(range(8)):
if fitted_losses[epoch - i] - fitted_losses[epoch] > 0.005:
flag = False
break
else:
epoch +=1
continue
if not flag:
epoch += 1
if epoch > 100: # each job must have at most 100 epochs
return -1
else:
return epoch
else:
return -1

def main():
n = 5
end_epoch = 6
while n<=end_epoch:
end_epoch = est_epoch({k: loss_re[k] for k in idx[:n]})
print("Curent Epoch:"+str(n)+" Remaining Epoch:"+str(end_epoch-n))
n = n+1


LOSS_RE = re.compile('.*?]\sTrain-cross-entropy=([\d\.]+)')

log = open("resnet110.log").read()

loss_re = [float(x) for x in LOSS_RE.findall(log)]
idx = np.arange(len(loss_re))
ep = 10
[a, b, c] = _loss_curve_fitting(idx[:ep], loss_re[:ep])
curvefitted = [__loss_fit_func(x,a,b,c) for x in idx]
plt.figure(figsize=(8, 6))
plt.xlabel("Epoch")
plt.ylabel("LOSS")
plt.plot(idx, loss_re, 'o', linestyle='-', color="b",
label="Actual Training loss")
plt.plot(idx, curvefitted, 'o', linestyle='-', color="r",
label="Predicted Training loss")
plt.legend(loc="best")
#plt.show()
main()
Loading

0 comments on commit a24bba7

Please sign in to comment.