Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async actor microbenchmark Script #8275

Merged
merged 3 commits into from
May 3, 2020
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions python/ray/ray_perf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This is the script for `ray microbenchmark`."""

import asyncio
import os
import time
import numpy as np
Expand All @@ -22,6 +23,22 @@ def small_value_batch(self, n):
ray.get([small_value.remote() for _ in range(n)])


@ray.remote
class AsyncActor:
async def private_coroutine(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we even need this, it's adding some noise to the benchmark. What we want this to show is how much overhead async actors have vs. sync actors, and this is adding some additional overhead from asyncio coroutines. I would just have the handlers return immediately like the other benchmarks.

Copy link
Contributor Author

@rkooo567 rkooo567 May 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually tested both versions with / without it, and the perf looks almost identical (without this was ~1% better). But I understand this will be noise. Will remove it!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

await asyncio.sleep(0)
return b"ok"

async def small_value(self):
return await self.private_coroutine()

async def small_value_with_arg(self, x):
return await self.private_coroutine()

async def small_value_batch(self, n):
await asyncio.wait([small_value.remote() for _ in range(n)])


@ray.remote(num_cpus=0)
class Client:
def __init__(self, servers):
Expand Down Expand Up @@ -190,6 +207,51 @@ def actor_multi2_direct_arg():
timeit("n:n actor calls with arg async", actor_multi2_direct_arg,
n * len(clients))

a = AsyncActor.remote()

def actor_sync():
ray.get(a.small_value.remote())

timeit("1:1 async-actor calls sync", actor_sync)

a = AsyncActor.remote()

def async_actor():
ray.get([a.small_value.remote() for _ in range(1000)])

timeit("1:1 async-actor calls async", async_actor, 1000)

a = AsyncActor.remote()

def async_actor():
ray.get([a.small_value_with_arg.remote(i) for i in range(1000)])

timeit("1:1 async-actor calls with args async", async_actor, 1000)

n = 5000
n_cpu = multiprocessing.cpu_count() // 2
actors = [AsyncActor.remote() for _ in range(n_cpu)]
client = Client.remote(actors)

def async_actor_async():
ray.get(client.small_value_batch.remote(n))

timeit("1:n async-actor calls async", async_actor_async, n * len(actors))

n = 5000
m = 4
n_cpu = multiprocessing.cpu_count() // 2
a = [AsyncActor.remote() for _ in range(n_cpu)]

@ray.remote
def async_actor_work(actors):
ray.get([actors[i % n_cpu].small_value.remote() for i in range(n)])

def async_actor_multi():
ray.get([async_actor_work.remote(a) for _ in range(m)])

timeit("n:n async-actor calls async", async_actor_multi, m * n)


if __name__ == "__main__":
main()