Skip to content

Commit

Permalink
[Serve] Ensure strict traffic splitting (ray-project#5929)
Browse files Browse the repository at this point in the history
* [Serve] Ensure strict traffic splitting

* Fix test
  • Loading branch information
simon-mo authored Oct 21, 2019
1 parent bc4a0de commit 6b36ef1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 35 deletions.
58 changes: 29 additions & 29 deletions python/ray/experimental/serve/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,22 @@ def __init__(self):
# service_name -> traffic_policy
self.traffic = defaultdict(dict)

# backend_name -> worker queue
# backend_name -> worker request queue
self.workers = defaultdict(deque)

# backend_name -> worker payload queue
self.buffer_queues = defaultdict(deque)

def is_ready(self):
return True

def _serve_metric(self):
return {
"service_{}_queue_size".format(service_name): {
"backend_{}_queue_size".format(backend_name): {
"value": len(queue),
"type": "counter",
}
for service_name, queue in self.queues.items()
for backend_name, queue in self.buffer_queues.items()
}

def enqueue_request(self, service, request_args, request_kwargs,
Expand Down Expand Up @@ -129,39 +132,36 @@ def _get_available_backends(self, service):
return list(backends_in_policy.intersection(available_workers))

def _flush(self):
# perform traffic splitting for requests
for service, queue in self.queues.items():
# while there are incoming requests and there are backends
while len(queue) and len(self.traffic[service]):
backend_names = list(self.traffic[service].keys())
backend_weights = list(self.traffic[service].values())
chosen_backend = np.random.choice(
backend_names, p=backend_weights).squeeze()

request = queue.popleft()
self.buffer_queues[chosen_backend].append(request)

# distach buffer queues to work queues
for service in self.queues.keys():
ready_backends = self._get_available_backends(service)

while len(queue) and len(ready_backends):
# Fast path, only one backend available.
if len(ready_backends) == 1:
backend = ready_backends[0]
request, work = (queue.popleft(),
self.workers[backend].popleft())
ray.worker.global_worker.put_object(
work.work_object_id, request)

# We have more than one backend available.
# We will roll a dice among the multiple backends.
else:
backend_weights = np.array([
self.traffic[service][backend_name]
for backend_name in ready_backends
])
# Normalize the weights to 1.
backend_weights /= backend_weights.sum()
chosen_backend = np.random.choice(
ready_backends, p=backend_weights).squeeze()

for backend in ready_backends:
# no work available
if len(self.buffer_queues[backend]) == 0:
continue

buffer_queue = self.buffer_queues[backend]
work_queue = self.workers[backend]
while len(buffer_queue) and len(work_queue):
request, work = (
queue.popleft(),
self.workers[chosen_backend].popleft(),
buffer_queue.popleft(),
work_queue.popleft(),
)
ray.worker.global_worker.put_object(
work.work_object_id, request)

ready_backends = self._get_available_backends(service)


@ray.remote
class CentralizedQueuesActor(CentralizedQueues):
Expand Down
13 changes: 7 additions & 6 deletions python/ray/experimental/serve/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ def test_single_prod_cons_queue(serve_instance):
def test_alter_backend(serve_instance):
q = CentralizedQueues()

q.set_traffic("svc", {"backend-1": 1})
result_object_id = q.enqueue_request("svc", 1, "kwargs", None)
work_object_id = q.dequeue_request("backend-1")
q.set_traffic("svc", {"backend-1": 1})
got_work = ray.get(ray.ObjectID(work_object_id))
assert got_work.request_args == 1
ray.worker.global_worker.put_object(got_work.result_object_id, 2)
assert ray.get(ray.ObjectID(result_object_id)) == 2

q.set_traffic("svc", {"backend-2": 1})
result_object_id = q.enqueue_request("svc", 1, "kwargs", None)
work_object_id = q.dequeue_request("backend-2")
q.set_traffic("svc", {"backend-2": 1})
got_work = ray.get(ray.ObjectID(work_object_id))
assert got_work.request_args == 1
ray.worker.global_worker.put_object(got_work.result_object_id, 2)
Expand All @@ -39,12 +39,13 @@ def test_alter_backend(serve_instance):
def test_split_traffic(serve_instance):
q = CentralizedQueues()

q.enqueue_request("svc", 1, "kwargs", None)
q.enqueue_request("svc", 1, "kwargs", None)
q.set_traffic("svc", {})
q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5})
# assume 50% split, the probability of all 20 requests goes to a
# single queue is 0.5^20 ~ 1-6
for _ in range(20):
q.enqueue_request("svc", 1, "kwargs", None)
work_object_id_1 = q.dequeue_request("backend-1")
work_object_id_2 = q.dequeue_request("backend-2")
q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5})

got_work = ray.get(
[ray.ObjectID(work_object_id_1),
Expand Down

0 comments on commit 6b36ef1

Please sign in to comment.