-
Notifications
You must be signed in to change notification settings - Fork 49
No async free Lunch
Is there any async IO low-hanging fruit we can pick?
So we're experimenting with Python async IO—see PR #610. That's great but how much work would it be to convert QuantumLeap to use async IO efficiently? Is there any low-hanging fruit we can pick?
From the PR comments:
it would be ideal to go async with all external calls, this would be just a first step
Yea, that's definitely an avenue we can explore but we've got to think about this at the architecture level since to reap the benefits we'll have to move away from the current thread-per-request architecture to an event-loop one, which is quite a bit of work. In fact, just sprinkling our code with await/async keywords won't help much I think. Here's what I mean.
One of the goals of PR #610 is to speed up the process of adding geo-coding info to incoming NGSI entities. Before the PR, the code was iterating a list of entities and for each entity, it would call an external service to look up geo-coding info and then add it to the entity. This happened sequentially, so in particular the calls to the geo-coding service would happen one after the other. Abstracting away the details, the computation pattern was like this
def expensive_io_task(x: int) -> int:
time.sleep(1) # simulate expensive call to external service
return x * 2
def for_each(xs: [int]) -> [int]:
return [expensive_io_task(x) for x in xs]
def main():
return for_each([1, 2, 3, 4, 5]) # simulate incoming entities
Let's plug this code into a simple benchmark script.
import time
def expensive_io_task(x: int) -> int:
time.sleep(1) # simulate expensive call to external service
return x * 2
def for_each(xs: [int]) -> [int]:
return [expensive_io_task(x) for x in xs]
def main():
return for_each([1, 2, 3, 4, 5]) # simulate incoming entities
if __name__ == '__main__':
start_time = time.time_ns()
result = main()
end_time = time.time_ns()
diff_in_seconds = (end_time - start_time) / 1e9
print(f"result: {result}")
print(f"took: {diff_in_seconds} seconds")
On my box, the script runs in 5.012012
on average. The basic idea
of PR #610 is to improve on this figure by taking advantage
of async IO to run the expensive tasks concurrently. At first sight,
it seems a pretty straightforward thing to do, but, as we'll see, the
devil is in the details.
PR #610 makes the geo-coding call through an async-IO-enabled
lib and then executes each call with asyncio.run
. PR #610's
approach to concurrency is equivalent to changing the for_each
and
expensive_io_task
functions as shown below.
async def expensive_io_task(x: int) -> int:
await asyncio.sleep(1)
return x * 2
def for_each(xs: [int]) -> [int]:
return [asyncio.run(expensive_io_task(x)) for x in xs]
Let's replace the old functions with the new ones. Here's the updated benchmark script.
import asyncio
import time
async def expensive_io_task(x: int) -> int:
await asyncio.sleep(1)
return x * 2
def for_each(xs: [int]) -> [int]:
return [asyncio.run(expensive_io_task(x)) for x in xs]
def main():
return for_each([1, 2, 3, 4, 5])
if __name__ == '__main__':
start_time = time.time_ns()
result = main()
end_time = time.time_ns()
diff_in_seconds = (end_time - start_time) / 1e9
print(f"result: {result}")
print(f"took: {diff_in_seconds} seconds")
On my machine, it runs in 5.026106
seconds on average. Uh? Yep, even
if I made the IO task asynchronous, there's no improvement over the
initial synchronous version. Actually, it's slower! What the heck is
going on?
As it turns out, execution is still sequential. But there's more. We
pay an extra cost for setting up the event loop every time we execute
the task—the asyncio.run
call—and if you take out the 5
second sleep
time you realise we're paying double the price with the async version.
Here's a rough description, in pseudo-code, of what's going on under
the bonnet
for_each([1, 2, 3, 4, 5])
set up event loop
run on event loop: r1 = expensive_io_task(1)
wait for task to complete
tear down event loop
set up event loop
run on event loop: r2 = expensive_io_task(2)
wait for task to complete
tear down event loop
...
set up event loop
run on event loop: r5 = expensive_io_task(5)
wait for task to complete
tear down event loop
return [r1, r2, ..., r5]
Ouch! Tricksy async IO!
Surely there's a relatively easy fix in this case, since the result
of each task is independent of the others. So we can build a list
of tasks, execute them concurrently and then replace each task in the
list with its result—similar to map-reduce, but without the reduce
bit. With the mods below, the script runs in 1.003046
seconds on
average, so the expensive_io_task
calls actually happen concurrently.
async def for_each(xs: [int]) -> [int]:
tasks = [expensive_io_task(x) for x in xs]
return await asyncio.gather(*tasks)
def main():
return asyncio.run(for_each([1, 2, 3, 4, 5]))
So we've got a many-fold performance improvement with this fix. But
notice the paradigm shift: async stuff bubbled up all the way to main
and we're thinking in terms of sets of tasks rather than single items.
And this was the easy case to tackle. Most of QuantumLeap is written
in terms of a sequence of tasks that depend on each other
r1 = t1(r0)
r2 = t2(r1)
...
As we've seen, sprinkling them with async
/await
keywords isn't
actually going to make them go any faster—e.g. t2
can only run after
t1
returns, so in practice execution would still be sequential. To
get real concurrency, we've got to rewrite those flows according to
a concurrent design and we also have some shared global state to look
out for if we go async. For example, in the script, like in the PR,
over and above running tasks concurrently, we should also try minimising
IO as much as possible, e.g. by caching task executions, stuff like
for_all([1, 2, 3, 4, 5, 1, 2, 3])
would be wasteful since 3 out of the 8 IO task executions could actually be avoided. But now you've got to think about how to share cached state safely among concurrent tasks...
On a side note, to make our geo-coding more efficient, we could also schedule a task to run on the work queue that would process batches of entities and do bulk inserts to add missing geo-coding data. But that's a paradigm-shift too and would take quite a bit of work to implement. Another downside is that optimised geo-coding wouldn't be available to folks who deploy QuantumLeap without the work queue.
Developer Track
- Cookbook
- Gauging performance
- Mother of all queries
- Enteater
- Work a Q
- No async free lunch
- Release procedure
User Track