Description
- asyncpg version: 0.13.0
- PostgreSQL version: 9.4
- Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
the issue with a local PostgreSQL install?: using docker image aidanlister/postgres-hstore - Python version: 3.6.3
- Platform: Fedora 27
- Do you use pgbouncer?: no
- Did you install asyncpg with pip?: yes
- If you built asyncpg locally, which version of Cython did you use?:
- Can the issue be reproduced under both asyncio and
uvloop?: didn't try uvloop
While my application is running some queries I interrupt the connection by removing the ethernet cable from the computer. After doing so some connections are never returned to the pool, even though the timeout is set for the acquire() and fetch() methods. I know they are never returned to the pool because I print the queue size every time it finishes.
I can't send the whole code because it's quite extensive, but the database operations are concentrated in a single file:
import src.controllers.configs as configs_controller
import asyncio
import logging
import asyncpg
import traceback
import decimal
QUERY_TRIES = 2
POOL_MAX_SIZE = 3
_databases = dict()
_logger = logging.getLogger("DatabaseController")
async def _create_pool(access_information):
return await asyncpg.create_pool(
**access_information,
min_size=0,
max_size=POOL_MAX_SIZE,
max_queries=30,
timeout=5,
command_timeout=10,
max_inactive_connection_lifetime=180
)
async def connect():
# Create a connection pool for each database defined in the configuration
global _databases
_databases = {
database_name: await _create_pool(
configs_controller.database_access[database_name])
for database_name in configs_controller.database_access
}
async def close_connections():
for database_name, database_pool in _databases.items():
await database_pool.close()
def check_database(database):
if database not in _databases:
error = f"Database '{database}' not initialized"
_logger.error(error)
raise Exception(error)
async def execute(database, query, *args):
# Acquire a connection
check_database(database)
async with _databases[database].acquire() as connection:
await connection.execute(query, *args)
async def executemany(database, query, *args):
# Acquire a connection
check_database(database)
async with _databases[database].acquire() as connection:
await connection.executemany(query, *args)
def _decimal_to_float(data):
for row in data:
for key, value in row.items():
if isinstance(value, decimal.Decimal):
row[key] = float(value)
async def _fetch_data(database, query, *args):
# Acquire a connection
async with _databases[database].acquire(timeout=20) as connection:
try:
result = await connection.fetch(query, *args)
result = [dict(row) for row in result]
_decimal_to_float(result)
return result
# Any exception while fetching the data shouldn't trigger a retry, so
# they are caught here
except asyncio.TimeoutError:
_logger.error(f"Query timed out\n{query}{args}")
async def print_counts():
for database_name, database in _databases.items():
print(database_name, database._queue.qsize(), POOL_MAX_SIZE)
async def fetch(database, query, *args):
check_database(database)
# Try to run the query a number of times
count = 0
while count != QUERY_TRIES:
count += 1
try:
return await _fetch_data(database, query, *args)
# The following exceptions may retry to fetch the data
# If caught SerializationError
except asyncpg.exceptions.SerializationError:
_logger.info("Conflict with recovery, retrying")
# If caught TimeoutError (a connection timeout, not a query timeout)
except asyncio.TimeoutError:
_logger.info("Connection timed out, retrying")
# Return None if caught any other exception
except:
_logger.error(f"{traceback.format_exc()}\n{query} {args}")
return None
# Delay before retrying
await asyncio.sleep(1)
After removing the ethernet cable, I wait for some time so an external timeout is triggered (await asyncio.wait(futures, timeout=30)
). When this happens, the application should have finished all the tasks (if everything went well) and I would be able to finish it safelly. Before letting the loop close, there's a delay and I interrupt the execution using Ctrl+C. It works fine when there are no pending tasks, but when the previous event happens, some of the tasks "lost" are interrupted, generating the a stack trace like the following one.
[2017-11-01 00:09:25,800] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<Pool.release.<locals>._release_impl() running at /usr/local/lib/python3.6/site-packages/asyncpg/pool.py:465> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f109d68>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:672]>
[2017-11-01 00:09:25,804] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<Pool.release.<locals>._release_impl() running at /usr/local/lib/python3.6/site-packages/asyncpg/pool.py:465> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f0891f8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:672]>
[2017-11-01 00:09:25,808] (ERROR) asyncio: Fatal write error on socket transport
protocol: <asyncpg.protocol.protocol.Protocol object at 0x7f830f6bb588>
transport: <_SelectorSocketTransport fd=9>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
await self._con.reset()
File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
await self.execute(reset_query)
File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 762, in write
n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <coroutine object Pool.release.<locals>._release_impl at 0x7f830f197678>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 465, in _release_impl
File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 203, in release
File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'
[2017-11-01 00:09:25,813] (ERROR) asyncio: Fatal write error on socket transport
protocol: <asyncpg.protocol.protocol.Protocol object at 0x7f830f6bb6d8>
transport: <_SelectorSocketTransport fd=10>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
await self._con.reset()
File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
await self.execute(reset_query)
File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 762, in write
n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <coroutine object Pool.release.<locals>._release_impl at 0x7f830f197990>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 465, in _release_impl
File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 203, in release
File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'
[2017-11-01 00:09:25,817] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<DefaultModule.run() running at ./src/models/module.py:52> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f1093a8>()]>>
[2017-11-01 00:09:25,821] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<DefaultModule.run() running at ./src/models/module.py:52> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f089198>()]>>
[2017-11-01 00:09:25,825] (ERROR) DatabaseController: Traceback (most recent call last):
File "./src/controllers/database.py", line 102, in fetch
_logger.info("Conflict with recovery, retrying")
GeneratorExit
I've tried adding some timeouts in other places, but there's nothing I can do to make it go back to the pool. I even tried to add some logs trying to track where it's happening, but couldn't find it.
A simple version of the application is:
async def run():
queries = []
futures = [database_controller.fetch(query) for query in queries]
await asyncio.wait(futures, timeout=30) # Connection drops while executing this line
await database_controller.print_counts() # Prints a queue size smaller than the pool max size when the connection was lost
await asyncio.sleep(1000) # Interrupting the execution here after waiting a lot more than every timeout set in the code