-
Notifications
You must be signed in to change notification settings - Fork 16
Paginator __aexit__ coroutine not correctly setting finish event #39
Comments
I have added a proposed pull request to fix this issue: @hellysmile Would you check this? |
Hey, how You can call |
Maybe we should convert all events to |
Can You reproduce error with example? |
I am not sure to be honest. Asyncio is hard for me as it is, combining it with Threading is an overkill for my understanding. I am trying to get Cassandra to work locally to test if I can find a suitable example. The error happened to me when I used Paginator in an aiohttp request handler, and I cancelled the request. |
I have been trying to put together an example, but so far, I cannot simulate the problem. I think it has to deal with the fact that I am working with several hosts and tens of executor threads in Cassandra. I have two CQLs I am processing asynchronously - one process some data into one asyncio queue, the another one process the results from the queue into another queue that is then processed by a third (non Cassandra) consumer. Maybe the problem is that those queues have fixed sizes and once they are full, the task running the Paginator must wait until there is some free space. |
A small example is here: https://gist.github.com/mklokocka/1c459b2b3eadaa9bc4b229b4bc4470f5 If you cancel the request fast enough, you can sometimes find
When you are dealing with ~20m rows, this happens so often it sometimes make the whole app hang up and the memory fly to hell too. :( |
If You do close http connection to aiohttp, it automatically sends cancel into Your request handler, can You try the same without aiohttp? It is common issue for aiohttp |
Well that is the normal behaviour, the task retrieving data from Cassandra should and have to be cancelled. I do not need it bogging the rest of the application. I cancel those tasks myself anyway, in the finally clause you can see in snippet. |
I mean maybe it issue by aio-libs/aiohttp#2098 aio-libs/aiohttp#2374 |
The thing is I am not writing anything to the DB, just retrieving data by using a paginator. This should not lead to the behaviour experienced - the tasks processing the data from DB should be cancelled so the app does not go through all the ~20m rows of data without anything acting on them. |
I am really sorry, but will try to exmplain one more time... It is do not matter do You write or read, is it HEAD/GET/POST request to aiohttp If You doing something inside aiohttp request, and client disconnected You will get asyncio.CancelledError Can You please try to do same snippet without aiohttp? Cuz right now we are not sure does aiohttp produces this issue or aiocassandra itself |
What I am trying to explain is it should not matter if it is aiohttp or not. It is not aiohttp problem that I have a coroutine running the aiocassandra paginator that I, myself, cancel - that passes an asyncio.CancelledError into the coroutine. It is normal, expected behaviour. The problem is Paginator cannot handle this cancellation. I do not know the precise reason why, but I think it is because of wrong handling of the exit_event set, because changing it as I proposed fix the problem fully on my side. It is not just the error messages about Event.waits, but the fact that for some reason the Paginator keeps hoarding data and working, and I need to have them cancelled. I have tried to find an example without aiohttp, but so far I have not succeeded to emulate the same behaviour - I think it is because aiohttp handles requests as they come, starting such coroutines (as those handling Cassandra in my case) on the fly. But honestly, I do not have the time to spend days on this. I have not written this library and I do not claim to be a pro on asyncio and threading. I would be grateful if you looked at it, otherwise I am just going with my fork using the fix I proposed you somehow keep ignoring. |
Ah, I see the problem now. There are three events at the same time,
Never actually finishes. Therefore I think there must be a call to |
This sounds more clear, Ill try to find a fix/reproduce it over this weekend! |
Sure! I am also having some memory issues I am not sure are tied to this or not, so I will see about that (could be that the unfinished await in paginator keeps data in memory? I don't know). |
Any news @hellysmile ? |
Ping @hellysmile |
Hi @hellysmile I found the same behavior, could you take a look? Steps to reproduce:
import asyncio
from aiocassandra import aiosession
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
async def main():
cluster = Cluster(
contact_points=['127.0.0.1'],
port=9042,
)
session = cluster.connect("system")
aiosession(session)
stmt = SimpleStatement("SELECT * from system.size_estimates ;", fetch_size=100)
try:
for _ in range(3):
async with session.execute_futures(stmt) as paginator:
n = 0
async for row in paginator:
if n == 200:
break
n += 1
finally:
cluster.shutdown()
if __name__ == '__main__':
import logging
logging.basicConfig(
level=logging.ERROR,
format="%(asctime)s - [%(levelname)s] - [%(name)s] - %(filename)s:%(lineno)d - %(message)s",
)
asyncio.run(main()) Received output:
|
Hello,
there is a problem with this line:
aiocassandra/aiocassandra.py
Line 77 in 3ffd252
The
__aexit__
coroutine is not correctly setting theself._finish_event
. There are problems with the way aiocassandra uses threading, to correct this, we have to callself._loop.call_soon_threadsafe(self._finish_event.set)
.This leads to problems when a task working through results of the pagination is suddenly cancelled.
Would it be possible to fix this, or should I make a custom fork for my use?
The text was updated successfully, but these errors were encountered: