Skip to content

No async free Lunch

Andrea Falconi edited this page Jan 13, 2022 · 2 revisions

Is there any async IO low-hanging fruit we can pick?

So we're experimenting with Python async IO—see PR #610. That's great but how much work would it be to convert QuantumLeap to use async IO efficiently? Is there any low-hanging fruit we can pick?

From the PR comments:

it would be ideal to go async with all external calls, this would be just a first step

Yea, that's definitely an avenue we can explore but we've got to think about this at the architecture level since to reap the benefits we'll have to move away from the current thread-per-request architecture to an event-loop one, which is quite a bit of work. In fact, just sprinkling our code with await/async keywords won't help much I think. Here's what I mean.

Synchronous land

One of the goals of PR #610 is to speed up the process of adding geo-coding info to incoming NGSI entities. Before the PR, the code was iterating a list of entities and for each entity, it would call an external service to look up geo-coding info and then add it to the entity. This happened sequentially, so in particular the calls to the geo-coding service would happen one after the other. Abstracting away the details, the computation pattern was like this

def expensive_io_task(x: int) -> int:
    time.sleep(1)  # simulate expensive call to external service
    return x * 2


def for_each(xs: [int]) -> [int]:
    return [expensive_io_task(x) for x in xs]


def main():
    return for_each([1, 2, 3, 4, 5])  # simulate incoming entities

Let's plug this code into a simple benchmark script.

import time


def expensive_io_task(x: int) -> int:
    time.sleep(1)  # simulate expensive call to external service
    return x * 2


def for_each(xs: [int]) -> [int]:
    return [expensive_io_task(x) for x in xs]


def main():
    return for_each([1, 2, 3, 4, 5])  # simulate incoming entities


if __name__ == '__main__':
    start_time = time.time_ns()

    result = main()

    end_time = time.time_ns()
    diff_in_seconds = (end_time - start_time) / 1e9

    print(f"result: {result}")
    print(f"took: {diff_in_seconds} seconds")

On my box, the script runs in 5.012012 on average. The basic idea of PR #610 is to improve on this figure by taking advantage of async IO to run the expensive tasks concurrently. At first sight, it seems a pretty straightforward thing to do, but, as we'll see, the devil is in the details.

First stab at async IO

PR #610 makes the geo-coding call through an async-IO-enabled lib and then executes each call with asyncio.run. PR #610's approach to concurrency is equivalent to changing the for_each and expensive_io_task functions as shown below.

async def expensive_io_task(x: int) -> int:
    await asyncio.sleep(1)
    return x * 2


def for_each(xs: [int]) -> [int]:
    return [asyncio.run(expensive_io_task(x)) for x in xs]

Let's replace the old functions with the new ones. Here's the updated benchmark script.

import asyncio
import time


async def expensive_io_task(x: int) -> int:
    await asyncio.sleep(1)
    return x * 2


def for_each(xs: [int]) -> [int]:
    return [asyncio.run(expensive_io_task(x)) for x in xs]


def main():
    return for_each([1, 2, 3, 4, 5])


if __name__ == '__main__':
    start_time = time.time_ns()

    result = main()

    end_time = time.time_ns()
    diff_in_seconds = (end_time - start_time) / 1e9

    print(f"result: {result}")
    print(f"took: {diff_in_seconds} seconds")

On my machine, it runs in 5.026106 seconds on average. Uh? Yep, even if I made the IO task asynchronous, there's no improvement over the initial synchronous version. Actually, it's slower! What the heck is going on?

As it turns out, execution is still sequential. But there's more. We pay an extra cost for setting up the event loop every time we execute the task—the asyncio.run call—and if you take out the 5 second sleep time you realise we're paying double the price with the async version. Here's a rough description, in pseudo-code, of what's going on under the bonnet

for_each([1, 2, 3, 4, 5])

  set up event loop
  run on event loop: r1 = expensive_io_task(1)
  wait for task to complete
  tear down event loop

  set up event loop
  run on event loop: r2 = expensive_io_task(2)
  wait for task to complete
  tear down event loop

  ...

  set up event loop
  run on event loop: r5 = expensive_io_task(5)
  wait for task to complete
  tear down event loop

  return [r1, r2, ..., r5]

Ouch! Tricksy async IO!

Effective concurrency

Surely there's a relatively easy fix in this case, since the result of each task is independent of the others. So we can build a list of tasks, execute them concurrently and then replace each task in the list with its result—similar to map-reduce, but without the reduce bit. With the mods below, the script runs in 1.003046 seconds on average, so the expensive_io_task calls actually happen concurrently.

async def for_each(xs: [int]) -> [int]:
    tasks = [expensive_io_task(x) for x in xs]
    return await asyncio.gather(*tasks)


def main():
    return asyncio.run(for_each([1, 2, 3, 4, 5]))

Paradigm shift

So we've got a many-fold performance improvement with this fix. But notice the paradigm shift: async stuff bubbled up all the way to main and we're thinking in terms of sets of tasks rather than single items. And this was the easy case to tackle. Most of QuantumLeap is written in terms of a sequence of tasks that depend on each other

r1 = t1(r0)
r2 = t2(r1)
...

As we've seen, sprinkling them with async/await keywords isn't actually going to make them go any faster—e.g. t2 can only run after t1 returns, so in practice execution would still be sequential. To get real concurrency, we've got to rewrite those flows according to a concurrent design and we also have some shared global state to look out for if we go async. For example, in the script, like in the PR, over and above running tasks concurrently, we should also try minimising IO as much as possible, e.g. by caching task executions, stuff like

for_all([1, 2, 3, 4, 5, 1, 2, 3])

would be wasteful since 3 out of the 8 IO task executions could actually be avoided. But now you've got to think about how to share cached state safely among concurrent tasks...

Side note

On a side note, to make our geo-coding more efficient, we could also schedule a task to run on the work queue that would process batches of entities and do bulk inserts to add missing geo-coding data. But that's a paradigm-shift too and would take quite a bit of work to implement. Another downside is that optimised geo-coding wouldn't be available to folks who deploy QuantumLeap without the work queue.