Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to register application background tasks within an event loop #1092

Closed
f0t0n opened this issue Aug 16, 2016 · 9 comments
Closed

Allow to register application background tasks within an event loop #1092

f0t0n opened this issue Aug 16, 2016 · 9 comments
Labels

Comments

@f0t0n
Copy link
Contributor

f0t0n commented Aug 16, 2016

Long story short

I run an aiohttp application with Gunicorn behind nginx.
The application exposes two websocket connection endpoints for mobile clients: one for RPC and other for Pub/Sub.

When some event is occurred in the system (e,g, there're some news from company for clients) I want to notify all the connected websockets that I store e.g. in my app['pub_sub_websockets']. So basically I would iterate them in a loop and send bytes to each one.

Each application instance that Gunicorn runs in a separate process has own collection of connected websockets. But I have to notify all the websockets connected to all workers (or even to all workers on all server nodes). Therefore I would proxy the event through some messaging system like Redis Pub/Sub or ZeroMQ. In my case it's ZeroMQ.

The problem is I can't easily setup the "listener" coroutine that will run within the application's event loop to subscribe to ZeroMQ proxy and forward messages to connected websockets.

Therefore I feel like I have to create own ZeroMQ socket zmq.SUB for each of connected websockets inside my websocket request handler and there gather two coroutines - the first will be listening on ws and the second one will be listening on ZeroMQ's socket. So that the amount of ZeroMQ's zmq.SUB sockets will grow linearly depending to the amount of clients connected to the application process via websockets.

Even with this approach I can't tell the application (actually GunicornWebWorker) to use zmq.asyncio.ZMQEventLoop that's required to use with zmq.asyncio.

Expected behaviour

1) I propose to provide an interface from the web.Application like Application.register_background_task(coro) that will allow to register as many tasks as needed during the application instance setup to run along with GunicornWebWorker._runner within the event loop the worker creates, say using asyncio.gather().

In this way in my particular case I'd create only one ZeroMQ socket to listen for a topic that will run within the event loop instead of one ZMQ socket per one connected WebSocket.

And if I have to listen more event sources (message queues from different providers, etc.) I'll add one more socket (or other consuming object depending to the event source provider) and not one more thousand sockets for each thousand of currently connected websockets.

2) Also I think there should be some API to choose which event loop class to use inside the GunicornWebWorker.init_process().
Say if I understand the ZMQ's documentation correctly I must use zmq.asyncio.ZMQEventLoop to deal with ZeroMQ in my asynchronous application but I can't set this type of loop in the worker without creating of own worker class inherited from GunicornWebWorker.

Maybe the loop creation logic should be separated in own public method that could be overridden or there should be just an option to choose a class of the event loop to use within the worker. By the way maybe then we won't need a separate GunicornUVLoopWebWorker class if we'll be able to set an event loop class somewhere?

Please tell me if it's possible to introduce the stuff described above in the aiohttp library? Maybe it's just my architectural approach is totally wrong itself? Then I'd be glad to hear some good guidance to correct it.

@f0t0n f0t0n changed the title Allow to register application background tasks within event loop Allow to register application background tasks within an event loop Aug 17, 2016
@asvetlov
Copy link
Member

From my point of view you are requested for on_startup signal in Application.

It should be pretty close to already existing .on_shutdown and .on_cleanup signals (see Graceful Shutdown doc section.

If you want to contribute the new signal -- you are welcome. I really appreciate it.
The change is trivial from my perspective but unfortunately it's not number one priority, so please don't wait for me but create a Pull Request. I'll help you by code review.

Regarding to zmq.asyncio.ZMQEventLoop.
It's based on zmq.Poller which in turn is based on zeromq C++ implementation.
It's not bad but poller is not built on top of epoll syscall but poll for non-zmq sockets IIRC.
It means the poller is super fast for zmq sockets but has performance problem with many opened regular file descriptors (read it a the poller is bad for supporting many parallel http connections and especially web-sockets).
aiozmq has loop-less implementation which works with epoll in native way (used by default). BTW it allows you to run zmq sockets with uvloop.
I'd love to see loop-less support in zmq.asyncio but personally now I have no motivation to contribute the change.
I've used zmq for a while, that's why @popravich and me have created aiozmq library. But now we don't use it for our daily job.
But if anybody will raise an issue for zmq project I can point on required operations (it's really tricky) and participate in reviewing. But, sorry, I don't want to write code/tests/docs for solving the problem.

@f0t0n
Copy link
Contributor Author

f0t0n commented Aug 20, 2016

@asvetlov Thanks for such thoughtful answer.

I've started to check the signals and even to write some code. But if we gather the app.startup() with the request handler (see a code snippet below) within web.run_app() (or within aiohttp.worker.GunicornWebWorker), all the on_startup signal handlers will run consecutively in a for loop.
In this way we can't add a few tasks that will run independently along with the request handler in the event loop. Say if one on_startup task is still listening on ZMQ socket, the other one can't start to listen something else, say Redis or whatever. How do you think we should deal with this?

# inside web.run_app() function:
server = loop.create_server(handler, host, port,
                            ssl=ssl_context, backlog=backlog)

loop.run_until_complete(asyncio.gather(server, app.startup()))

UPDATE

By the way can we use nested asyncio.gather()? For example:


def some_stuff_to_run_first():
    pass


async def listen_zmq(app):
    pass


async def listen_redis(app):
    pass


async def listen_for_notifications(app):
    return await asyncio.gather(listen_zmq(app), listen_redis(app), loop=app.loop)


app.on_startup.append(some_stuff_to_run_first)
app.on_startup.append(listen_for_notifications)

And then it will be called in the web.run_app() and in the gunicorn worker as

loop.run_until_complete(asyncio.gather(server, app.startup()))

In this way we'll run everything that need to complete first in that for loop inside Signal and after that will run long running tasks that will leave till application is alive or e.g. till their sockets will receive a termination message.

@f0t0n
Copy link
Contributor Author

f0t0n commented Aug 20, 2016

@asvetlov I've started to work on this but got problems with tests I don't get how to solve (see: f0t0n#1 (comment)). I'd appreciate any input on this.

@asvetlov
Copy link
Member

Your code snippet is incorrect a little bit: .on_startup is a list-like object but not callable.
To subscribe on signal use .on_startup.append(create_redis_listener).
.on_cleanup may send cancellation to all registered long-running tasks.

@f0t0n
Copy link
Contributor Author

f0t0n commented Aug 20, 2016

Ah sure, you're right. Just edited it.

@asvetlov
Copy link
Member

Should we close it or you do prefer to fix gunicorn worker before?

@f0t0n
Copy link
Contributor Author

f0t0n commented Aug 22, 2016

Let me fix worker, yea.

@asvetlov
Copy link
Member

All work is done.
@f0t0n thank you again

@lock
Copy link

lock bot commented Oct 29, 2019

This thread has been automatically locked since there has not been
any recent activity after it was closed. Please open a new issue for
related bugs.

If you feel like there's important points made in this discussion,
please include those exceprts into that new issue.

@lock lock bot added the outdated label Oct 29, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Oct 29, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants