Skip to content

Commit

Permalink
added job trigger with synchronization (#66)
Browse files Browse the repository at this point in the history
* added synchronization

* add sync

* checkpoint

* added tests

* fix job allocate bugs

* updated test

* updated test

* changed condition

* updated test

* updated comments

Co-authored-by: Ubuntu <ubuntu@ip-10-65-154-198.us-west-2.compute.internal>
Co-authored-by: Aurick Qiao <aurickq@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 1, 2020
1 parent 2f82aee commit 5c342c6
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 61 deletions.
196 changes: 137 additions & 59 deletions sched/adaptdl_sched/allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,98 @@ class AdaptDLAllocator(object):
def __init__(self, expander):
self._core_api = kubernetes.client.CoreV1Api()
self._objs_api = kubernetes.client.CustomObjectsApi()
self._custom_resource = ("adaptdl.petuum.com", "v1",
"", "adaptdljobs")
self._cluster_expander = expander
self._policy = PolluxPolicy()
# lock for the two corountines in run()
self._lock = asyncio.Lock()

async def run(self):
# two functionality: (1) watch for new job and start if possible.
# (2) periodically optimize existing jobs
await asyncio.gather(
self._allocate_one_loop(),
self._optimize_all_loop()
)

async def _allocate_one_loop(self):
async with kubernetes.watch.Watch() as watch:
while True:
async for event in watch.stream(
self._objs_api.list_namespaced_custom_object,
*self._custom_resource, timeout_seconds=60):
if event["type"] == "ADDED": # there is a n arriving job
async with self._lock:
await self._allocate_one(event)

async def _allocate_one(self, event):
# re-read the job , compared with the previous readjob
job = event["object"]
namespace = job["metadata"]["namespace"]
name = job["metadata"]["name"]

try:
job = await self._objs_api.get_namespaced_custom_object(
"adaptdl.petuum.com", "v1", namespace, "adaptdljobs", name
)
except kubernetes.client.rest.ApiException as exc:
if exc.status == 404:
return
raise # unexpected

# some other coroutine has handled this
if job.get("status", {}).get("allocation") is not None or\
job.get("status", {}).get("group") is not None:
return

namespace = job["metadata"]["namespace"]
name = job["metadata"]["name"]
# if this is a restarted job, skip i
LOG.info("detected an added job %s/%s.",
namespace, name)
# parse the job infomation
job_info = self._get_job_info(job)

# find available nodes.
node_infos, _ = await self._find_nodes()

# get the node to allocate
new_allocation = self._policy.allocate_job(
job_info, node_infos)
patch = {"status": {"allocation": new_allocation}}
LOG.info("Patch AdaptdlJob %s/%s: %s ",
namespace, name, patch)
await patch_job_status(self._objs_api, namespace, name, patch)

async def _optimize_all_loop(self):
while True:
LOG.info("Running allocator loop")
nodes, node_template = await self._find_nodes()
LOG.info("Node resources: %s",
{k: v.resources for k, v in nodes.items()})
jobs, prev_allocations = await self._find_jobs_and_allocations()
LOG.info("Job resources: %s",
{k: v.resources for k, v in jobs.items()})
start = time.time()
allocations = self._allocate(jobs, nodes, prev_allocations,
node_template)
duration = time.time() - start
LOG.info("Allocations (in %.3f sec): %s", duration, allocations)
await self._update_allocations(allocations)
# try to gain lock
async with self._lock:
await self._optimize_all()

LOG.info("Sleep for 60 seconds")
await asyncio.sleep(60)

async def _optimize_all(self):
LOG.info("Running allocator loop")
nodes, node_template = await self._find_nodes(
pod_label_selector="!adaptdl/job"
)
LOG.info("Node resources: %s",
{k: v.resources for k, v in nodes.items()})
jobs, prev_allocations = \
await self._find_jobs_and_allocations()
LOG.info("Job resources: %s",
{k: v.resources for k, v in jobs.items()})
start = time.time()
allocations = self._allocate(jobs, nodes, prev_allocations,
node_template)
duration = time.time() - start
LOG.info("Allocations (in %.3f sec): %s", duration,
allocations)
await self._update_allocations(allocations)

async def _update_allocations(self, allocations):
job_list = await self._objs_api.list_namespaced_custom_object(
"adaptdl.petuum.com", "v1", "", "adaptdljobs")
Expand All @@ -72,7 +143,7 @@ async def _update_allocations(self, allocations):
LOG.info("Patch AdaptDLJob %s/%s: %s", namespace, name, patch)
await patch_job_status(self._objs_api, namespace, name, patch)

async def _find_nodes(self):
async def _find_nodes(self, pod_label_selector=None):
node_infos = {}
node_list = await self._core_api.list_node()
# Find all non-AdaptDL pods which are taking up resources and subtract
Expand All @@ -81,7 +152,7 @@ async def _find_nodes(self):
# also check if we have reached the pod limit on the node. This number
# denotes (allocatable pods - Non-terminated pods) on that node.
pod_list = await self._core_api.list_pod_for_all_namespaces(
label_selector="!adaptdl/job")
label_selector=pod_label_selector)
for node in node_list.items:
if allowed_taints(node.spec.taints):
resources = get_node_unrequested(node, pod_list.items)
Expand All @@ -102,62 +173,69 @@ async def _find_nodes(self):
node_template = NodeInfo(max_resources, True)
return node_infos, node_template

def _get_job_info(self, job):
job["spec"]["template"]["spec"] = \
set_default_resources(job["spec"]["template"]["spec"])
resources = get_pod_requests(job["spec"]["template"]["spec"])
hints = job.get("status", {}).get("train", {})
max_replicas = max(2 * hints.get("maxProfiledReplicas", 0), 1)
if job["spec"].get("maxReplicas"):
max_replicas = min(max_replicas, job["spec"]["maxReplicas"])
min_replicas = job["spec"].get("minReplicas", 0)
# max_replicas should be greater or equal to min_replicas
max_replicas = max(max_replicas, min_replicas)
preemptible = job["spec"].get("preemptible", True)
if {"perfParams", "initBatchSize"} <= hints.keys() and preemptible:
max_batch_size = (hints.get("maxBatchSize")
or hints["initBatchSize"])
if hints.get("localBszBounds"):
min_local_bsz = hints["localBszBounds"][0] or 1
# Make sure max_batch_size / replicas >= min_local_bsz
if max_batch_size < min_local_bsz * max_replicas:
max_replicas = int(max_batch_size / min_local_bsz)
perf_params = PerfParams(*[hints["perfParams"][k]
for k in PERF_PARAMS.keys()])
if "gradParams" in hints:
grad_params = GradParams(hints["gradParams"]["norm"],
hints["gradParams"]["var"])
else:
grad_params = GradParams(0.0, 1.0)
goodput_fn = GoodputFunction(perf_params, grad_params,
hints["initBatchSize"])
speedup_fn = SpeedupFunction(
goodput_fn,
hints.get("maxBatchSize"),
hints.get("localBszBounds"),
hints.get("gradientAccumulation", False))
else:
speedup_fn = lambda n, r: r # noqa: E731
creation_ts = dateutil.parser.isoparse(
job["metadata"]["creationTimestamp"])
return JobInfo(
resources, speedup_fn, creation_ts, min_replicas,
max_replicas, preemptible)

async def _find_jobs_and_allocations(self):
job_list = await self._objs_api.list_namespaced_custom_object(
"adaptdl.petuum.com", "v1", "", "adaptdljobs")
job_infos = {}
allocations = {}

for job in job_list["items"]:
if job.get("status", {}).get("phase") \
not in ["Pending", "Running", "Starting", "Stopping"]:
continue

namespace = job["metadata"]["namespace"]
name = job["metadata"]["name"]

if "allocation" in job.get("status", {}):
namespace = job["metadata"]["namespace"]
name = job["metadata"]["name"]
allocations[namespace, name] = \
list(job["status"]["allocation"])
job["spec"]["template"]["spec"] = \
set_default_resources(job["spec"]["template"]["spec"])
resources = get_pod_requests(job["spec"]["template"]["spec"])
hints = job.get("status", {}).get("train", {})
max_replicas = max(2 * hints.get("maxProfiledReplicas", 0), 1)
if job["spec"].get("maxReplicas"):
max_replicas = min(max_replicas, job["spec"]["maxReplicas"])
min_replicas = job["spec"].get("minReplicas", 0)
# max_replicas should be greater or equal to min_replicas
max_replicas = max(max_replicas, min_replicas)
preemptible = job["spec"].get("preemptible", True)
if {"perfParams", "initBatchSize"} <= hints.keys() and preemptible:
max_batch_size = (hints.get("maxBatchSize")
or hints["initBatchSize"])
if hints.get("localBszBounds"):
min_local_bsz = hints["localBszBounds"][0] or 1
# Make sure max_batch_size / replicas >= min_local_bsz
if max_batch_size < min_local_bsz * max_replicas:
max_replicas = int(max_batch_size / min_local_bsz)
perf_params = PerfParams(*[hints["perfParams"][k]
for k in PERF_PARAMS.keys()])
if "gradParams" in hints:
grad_params = GradParams(hints["gradParams"]["norm"],
hints["gradParams"]["var"])
else:
grad_params = GradParams(0.0, 1.0)
goodput_fn = GoodputFunction(perf_params, grad_params,
hints["initBatchSize"])
speedup_fn = SpeedupFunction(
goodput_fn,
hints.get("maxBatchSize"),
hints.get("localBszBounds"),
hints.get("gradientAccumulation", False))
else:
speedup_fn = lambda n, r: r # noqa: E731
creation_ts = dateutil.parser.isoparse(
job["metadata"]["creationTimestamp"])
namespace = job["metadata"]["namespace"]
name = job["metadata"]["name"]
job_infos[(namespace, name)] = JobInfo(
resources, speedup_fn, creation_ts, min_replicas,
max_replicas, preemptible)

job_info = self._get_job_info(job)
job_infos[(namespace, name)] = job_info

return job_infos, allocations

def _allocate(self, jobs, nodes, prev_allocations, node_template):
Expand Down
39 changes: 37 additions & 2 deletions sched/adaptdl_sched/policy/pollux.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,40 @@ def __init__(self):
self._min_util = 0.35
self._max_util = 0.65

def allocate_job(self, job_info, nodes):
"""
A simple strategy that find the first available node for a new job.
This method is intended to allocate a single arriving job. It expects
the node resources to take into account adaptdl and non-adaptdl pods.
Arguments:
job_info (JobInfo): JobInfo object of the job
nodes (dict): dict from node name to node_info
Returns:
list(str): allocation of the job,
e.g. [node name 0, node name 1, ...] if found available
node, else an empty list.
"""
job_resources = job_info.resources
min_replicas = max(job_info.min_replicas, 1)
node_list = []
nodes = self._sort_nodes(nodes)
for node_name, node in nodes.items():
# number of replica fit in this node
replica_this = min(node.resources.get(key, 0) // val
for key, val in job_resources.items())
if replica_this >= min_replicas:
node_list = [node_name] * min_replicas
return node_list
else:
return []

def _sort_nodes(self, nodes):
return OrderedDict( # Sort preemptible nodes last.
sorted(nodes.items(), key=lambda kv: (kv[1].preemptible,
kv[0])))

def _allocations_to_state(self, allocations, jobs, nodes):
jobs_index = {key: idx for idx, key in enumerate(jobs)}
nodes_index = {key: idx for idx, key in enumerate(nodes)}
Expand Down Expand Up @@ -110,6 +144,8 @@ def _desired_nodes(self, utilities, values, nodes):
def optimize(self, jobs, nodes, base_allocations, node_template):
"""
Run one optimization cycle of the Pollux scheduling policy.
This method expects the node resources to only take into account
non-adaptdl pods.
Arguments:
jobs (dict): map from job keys to `JobInfo` objects which
Expand Down Expand Up @@ -143,8 +179,7 @@ def ispinned(key, job):
key=lambda kv: (not ispinned(kv[0], kv[1]),
kv[1].min_replicas,
kv[1].creation_timestamp)))
nodes = OrderedDict( # Sort preemptible nodes last.
sorted(nodes.items(), key=lambda kv: (kv[1].preemptible, kv[0])))
nodes = self._sort_nodes(nodes)
base_state = np.concatenate(
(self._allocations_to_state(base_allocations, jobs, nodes),
np.zeros((len(jobs), len(nodes)), dtype=np.int)), axis=1)
Expand Down
30 changes: 30 additions & 0 deletions sched/adaptdl_sched/policy/pollux_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,36 @@ def test_optimize(num_nodes, total_devices=16):
assert count <= nodes[node_key].resources["pods"]


def test_allocate_job():
nodes = {
"0": NodeInfo({"gpu": 1, "cpu": 500, "pods": 32}, preemptible=False),
"1": NodeInfo({"gpu": 2, "cpu": 2000, "pods": 32}, preemptible=False),
"2": NodeInfo({"gpu": 2, "cpu": 3000, "pods": 32}, preemptible=True),
}
perf_params = PerfParams(0.121, 0.00568, 0.0236, 0.00634,
0.0118, 0.00317, 1.14)
grad_params = GradParams(sqr=0.00136, var=0.000502)
goodput_fn = GoodputFunction(perf_params, grad_params, 128)
speedup_fn = SpeedupFunction(goodput_fn, max_batch_size=1280,
atomic_bsz_range=(64, 256))
now = datetime.now()
min_replicas = 0
job_1 = JobInfo({"gpu": 1, "cpu": 500, "pods": 1}, speedup_fn,
now + timedelta(minutes=0), min_replicas, max_replicas=1)
job_2 = JobInfo({"gpu": 1, "cpu": 1000, "pods": 1}, speedup_fn,
now + timedelta(minutes=1), min_replicas, max_replicas=1)
job_3 = JobInfo({"gpu": 1, "cpu": 1000, "pods": 1}, speedup_fn,
now + timedelta(minutes=1), 2, max_replicas=2)
job_4 = JobInfo({"gpu": 1, "cpu": 2000, "pods": 1}, speedup_fn,
now + timedelta(minutes=1), 2, max_replicas=2)
policy = PolluxPolicy()

assert(policy.allocate_job(job_1, nodes) == ["0"])
assert(policy.allocate_job(job_2, nodes) == ["1"])
assert(policy.allocate_job(job_3, nodes) == ["1", "1"])
assert(policy.allocate_job(job_4, nodes) == [])


def test_unusable_node():
# Test where one of the nodes can't be used due to one resource type.
nodes = {
Expand Down

0 comments on commit 5c342c6

Please sign in to comment.