Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validate that code inside Task.async_run is not blocking #4

Closed
komuw opened this issue Feb 23, 2019 · 6 comments
Closed

validate that code inside Task.async_run is not blocking #4

komuw opened this issue Feb 23, 2019 · 6 comments

Comments

@komuw
Copy link
Owner

komuw commented Feb 23, 2019

The code inside async_run; https://github.com/komuw/xyzabc/blob/ef3238de831e6b0c857328f8823b05295851bb17/xyzabc/task.py#L68

should actually be async code with no blocking calls.

However, if a user were to write something like

class BlockingHttptask(xyzabc.task.Task):
    async def async_run(self):
        import requests

        # blocks for 23 seconds
        url = "https://httpbin.org/delay/23"
        resp = requests.get(url)


class NonBlockingTask(xyzabc.task.Task):
    async def async_run(self, bbb, a=5.5):
        result = bbb * a
        print("result: ", result)
        return res


blocking_task = BlockingHttptask(the_broker="the_broker", queue_name="HttpQueue")
non_blocking_task = NonBlockingTask(the_broker="the_broker", queue_name="MultiplierTaskQueue")

blocking_task_worker = xyzabc.Worker(the_task=blocking_task)
non_blocking_task_worker = xyzabc.Worker(the_task=non_blocking_task)


if __name__ == "__main__":

    async def async_main():
        gather_tasks = asyncio.gather(
            blocking_task_worker.consume_forever(), non_blocking_task_worker.consume_forever()
        )
        await gather_tasks

    asyncio.run(async_main(), debug=True)

The execution of blocking_task will also block execution of non_blocking_task.

This is because, python asyncio runs everything in one thread(in an event loop).
It is thus important that users only put non-blocking code inside async_run methods.
However, users may sometimes fail to do that.
If they put blocking IO(or cpu), we should figure out how to;

  • inform them
  • run that code in it's own thread[1] if task is IO-bound or processor[2] if task is cpu-bound.

We have already institued measures to make sure that users do not change signature of async_run method to be non async; https://github.com/komuw/xyzabc/blob/e046a061b538586e5ddbd7aa2b47ed448f3f28c8/xyzabc/task.py#L233-L242

We however need a way to check if code inside async_run is blocking and act.

  • The worker class should have an attribute, task_is_blocking=False
  • when we execute the first task(or any other), we measure how long it took to execute
  • if it took longer than Xmilliseconds, we set task_is_blocking=True
    -if that attribute is True, we start a thread and all other task executions of that task will be done on the thread.
  • Bonus: if we can detect whether it was IO or cpu bound and start the appropriate type of thread

ref:

  1. https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
  2. https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor
@komuw komuw changed the title validate that code inside Task. async_run is not blocking validate that code inside Task.async_run is not blocking Feb 23, 2019
@komuw
Copy link
Owner Author

komuw commented Feb 23, 2019

  • when we execute the first task(or any other), we measure how long it took to execute

It appears that the method outlined above may not work

@komuw
Copy link
Owner Author

komuw commented Feb 23, 2019

David Beazley, and Nathaniel J. Smith have weighed[1] in on the matter.
It appears that we could somehow do it, but it gets complicated[2] very fast.

screen shot 2019-02-23 at 12 41 00

ref:

  1. https://twitter.com/komu_wairagu/status/1099052582968922113
  2. Add a blocked task watchdog python-trio/trio#591

@komuw
Copy link
Owner Author

komuw commented Feb 23, 2019

PR: to add task blocked watchdog to trio

ref:

  1. Add task blocked watchdog. python-trio/trio#596

@komuw komuw mentioned this issue Feb 23, 2019
@komuw
Copy link
Owner Author

komuw commented Feb 23, 2019

Victor Stinner suggested[1];

It should be possible to write something on top of `ptrace` \
or something like that. \
Blocking I/O syscalls should be avoided from the thread running the event loop.

and also[2]

I implemented this simple debugging tool early in asyncio to detect blocking IO \
blocking the event loop.       \
It uses a threshold of 100 ms by default, it is configurable
  1. https://twitter.com/VictorStinner/status/1099355649786474496
  2. https://twitter.com/VictorStinner/status/1099354977531768833

@komuw
Copy link
Owner Author

komuw commented Feb 28, 2019

Fixed by:

  1. wiji/xyzabc/task.py

    Lines 233 to 242 in e046a06

    if not asyncio.iscoroutinefunction(self.async_run):
    raise ValueError(
    "The method: `async_run` of a class derived from: `xyzabc.task.Task` should be a python coroutine."
    "\nHint: did you forget to define the method using `async def` syntax?"
    )
    if not inspect.iscoroutinefunction(self.async_run):
    raise ValueError(
    "The method: `async_run` of a class derived from: `xyzabc.task.Task` should be a python coroutine."
    "\nHint: did you forget to define the method using `async def` syntax?"
    )

  2. https://github.com/komuw/wiji/blob/4f0d4c94b3c9e05d522c4f359ce615610d50b82a/wiji/watchdog.py

  3. wiji/wiji/worker.py

    Lines 117 to 138 in 65b7a8c

    self.watchdog.notify_alive_before()
    try:
    return_value = await self.the_task.async_run(*task_args, **task_kwargs)
    if self.the_task.chain:
    # TODO: (komuw) make sure that chains wait for the parents retries to end before running
    # Celery solves this by using/listening celery.exceptions.Retry(which you should never swallow)
    # enqueue the chained task using the return_value
    await self.the_task.chain.async_delay(return_value)
    except Exception as e:
    self._log(
    logging.ERROR,
    {
    "event": "wiji.Worker.run",
    "stage": "end",
    "state": "task execution error",
    "error": str(e),
    },
    )
    finally:
    if self.watchdog is not None:
    self.watchdog.notify_alive_after()

@komuw komuw closed this as completed Feb 28, 2019
@komuw
Copy link
Owner Author

komuw commented Mar 2, 2019

Another option is to use python signal.alarm

https://github.com/ask/mode/blob/2ffae126df4893c3b2036cf6bb05d933362961c2/mode/debug.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant