Skip to content

Commit 57b57ea

Browse files
committed
qsub test added
1 parent 94b10af commit 57b57ea

File tree

3 files changed

+51
-21
lines changed

3 files changed

+51
-21
lines changed

.idea/workspace.xml

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dkeras/utils/qsub_functions.py

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,20 @@
1414
from dkeras.utils.sys_functions import get_port, get_addr
1515

1616

17+
def _cmd(cmd):
18+
return subprocess.check_output(cmd.split(' ')).decode()
19+
20+
1721
def _which_qsub():
18-
return subprocess.check_output(['which', 'qsub']).decode().replace('\n', ' ')
22+
return subprocess.check_output(['which', 'qsub']).decode().replace('\n', '')
1923

2024

2125
def _which_qstat():
22-
return subprocess.check_output(['which', 'qstat']).decode().replace('\n', ' ')
26+
return subprocess.check_output(['which', 'qstat']).decode().replace('\n', '')
2327

2428

2529
def _which_qdel():
26-
return subprocess.check_output(['which', 'qdel']).decode().replace('\n', ' ')
30+
return subprocess.check_output(['which', 'qdel']).decode().replace('\n', '')
2731

2832

2933
create_worker_script = """
@@ -42,19 +46,19 @@ def wait_for_workers(n_workers, timeout=300):
4246
print("Waiting for {} workers".format(n_workers))
4347
while True:
4448
n_nodes = ray.get(_get_n_nodes.remote())
45-
if len(n_nodes) >= n_workers:
49+
if n_nodes >= n_workers:
4650
return True
4751
if (start_time - time.time() >= timeout):
4852
return False
4953

5054

5155
def rm_existing_workers(qstat_path='qstat', qdel_path='qdel'):
52-
cmd = "{} | grep worker_script | cut - d ' ' - f1 | xargs {}".format(
56+
cmd = "{} | grep worker_script | cut - d ' ' -f1 | xargs {}".format(
5357
qstat_path, qdel_path)
5458
os.system(cmd)
5559

5660

57-
def init_pbs_ray(n_workers=3, rm_existing=True, iface_name='eno1', worker_time=3600):
61+
def init_pbs_ray(n_workers=3, rm_existing=True, iface_name='eno1', worker_time=3600, verbose=True):
5862
"""
5963
6064
:param n_workers:
@@ -65,25 +69,28 @@ def init_pbs_ray(n_workers=3, rm_existing=True, iface_name='eno1', worker_time=3
6569
"""
6670
if ray.is_initialized():
6771
if rm_existing:
68-
ray.shutdown()
72+
_cmd('ray stop')
6973

7074
qsub_path = _which_qsub()
7175
qstat_path = _which_qstat()
7276
qdel_path = _which_qdel()
7377

74-
rm_existing_workers(qstat_path=qstat_path, qdel_path=qdel_path)
78+
# rm_existing_workers(qstat_path=qstat_path, qdel_path=qdel_path)
79+
rm_existing_workers()
7580

7681
addresses = get_addr('eno1')
7782
addr = addresses[0]
7883

79-
print("Address: ", addr)
84+
if verbose:
85+
print("Address: ", addr)
8086
if addr == 'No IP addr':
8187
raise Exception("Address not found for {}".format(iface_name))
8288

8389
port = get_port()[1]
84-
print("Port ", port)
85-
print("ray start --head --redis-port={}".format(port))
86-
os.system('ray start --head --redis-port={}'.format(port))
90+
if verbose:
91+
print("Port ", port)
92+
print("ray start --head --redis-port={}".format(port))
93+
_cmd('ray start --head --redis-port={}'.format(port))
8794

8895
temp_dir = 'temp_{}'.format('_'.join(str(time.time()).split('.')))
8996
if not os.path.exists(temp_dir):
@@ -93,28 +100,29 @@ def init_pbs_ray(n_workers=3, rm_existing=True, iface_name='eno1', worker_time=3
93100
worker_script = create_worker_script.format(addr, port, worker_time)
94101
with open(worker_file, 'w') as f:
95102
f.write(worker_script)
96-
print("Worker file ", worker_file)
103+
104+
if verbose:
105+
print("Worker file ", worker_file)
97106

98107
qsub_pids = []
99108
for i in range(n_workers):
100-
print("{} {}".format(qsub_path, worker_file))
101-
print(list(qsub_path))
102-
print(list(worker_file))
109+
if verbose:
110+
print("{} {}".format(qsub_path, worker_file))
111+
print(list(qsub_path))
112+
print(list(worker_file))
103113

104-
qsub_pid = subprocess.check_output([qsub_path, '-l', 'nodes=1:ppn=2', worker_file])
114+
qsub_pid = subprocess.check_output([qsub_path, '-lselect=1', '-lplace=excl', worker_file])
105115
qsub_pid = qsub_pid.decode()[:-1].split('.')[0]
106116
qsub_pids.append(qsub_pid)
107117

108-
# os.system('{} -l nodes=1:ppn=2 {}'.format(qsub_path, worker_file))
109-
# print("{} {}".format(qsub_path, worker_file))
110-
print("{}:{}".format(addr, port))
111118
ray.init(redis_address='{}:{}'.format(addr, port))
112119
print("Ray initialized")
113120
return wait_for_workers(n_workers + 1), qsub_pids
114121

115122

116123
def main():
117124
init_pbs_ray()
125+
print(ray.nodes())
118126

119127

120128
if __name__ == "__main__":

testing/qsub_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from tensorflow.keras.applications import ResNet50
2+
from dkeras import dKeras
3+
from dkeras.utils.qsub_functions import init_pbs_ray
4+
import numpy as np
5+
import time
6+
import ray
7+
8+
init_pbs_ray()
9+
print(ray.nodes())
10+
11+
data = np.random.uniform(-1, 1, (10000, 224, 224, 3))
12+
13+
start_time = time.time()
14+
model = dKeras(ResNet50, init_ray=False, wait_for_workers=True, n_workers=500)
15+
elapsed = time.time() - start_time
16+
17+
print("Workers initialized after {}".format(elapsed))
18+
19+
start_time = time.time()
20+
preds = model.predict(data)
21+
elapsed = time.time() - start_time
22+
23+
print("Preds after {}".format(elapsed))

0 commit comments

Comments
 (0)