Skip to content

Connection not being returned to the pool after connection loss #220

Closed
@GabrielSalla

Description

@GabrielSalla
  • asyncpg version: 0.13.0
  • PostgreSQL version: 9.4
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
    the issue with a local PostgreSQL install?
    : using docker image aidanlister/postgres-hstore
  • Python version: 3.6.3
  • Platform: Fedora 27
  • Do you use pgbouncer?: no
  • Did you install asyncpg with pip?: yes
  • If you built asyncpg locally, which version of Cython did you use?:
  • Can the issue be reproduced under both asyncio and
    uvloop?
    : didn't try uvloop

While my application is running some queries I interrupt the connection by removing the ethernet cable from the computer. After doing so some connections are never returned to the pool, even though the timeout is set for the acquire() and fetch() methods. I know they are never returned to the pool because I print the queue size every time it finishes.

I can't send the whole code because it's quite extensive, but the database operations are concentrated in a single file:

import src.controllers.configs as configs_controller
import asyncio
import logging
import asyncpg
import traceback
import decimal

QUERY_TRIES = 2
POOL_MAX_SIZE = 3
_databases = dict()
_logger = logging.getLogger("DatabaseController")


async def _create_pool(access_information):
    return await asyncpg.create_pool(
        **access_information,
        min_size=0,
        max_size=POOL_MAX_SIZE,
        max_queries=30,
        timeout=5,
        command_timeout=10,
        max_inactive_connection_lifetime=180
    )

async def connect():
    # Create a connection pool for each database defined in the configuration
    global _databases
    _databases = {
        database_name: await _create_pool(
            configs_controller.database_access[database_name])
        for database_name in configs_controller.database_access
    }


async def close_connections():
    for database_name, database_pool in _databases.items():
        await database_pool.close()


def check_database(database):
    if database not in _databases:
        error = f"Database '{database}' not initialized"
        _logger.error(error)
        raise Exception(error)


async def execute(database, query, *args):
    # Acquire a connection
    check_database(database)
    async with _databases[database].acquire() as connection:
        await connection.execute(query, *args)


async def executemany(database, query, *args):
    # Acquire a connection
    check_database(database)
    async with _databases[database].acquire() as connection:
        await connection.executemany(query, *args)


def _decimal_to_float(data):
    for row in data:
        for key, value in row.items():
            if isinstance(value, decimal.Decimal):
                row[key] = float(value)


async def _fetch_data(database, query, *args):
    # Acquire a connection
    async with _databases[database].acquire(timeout=20) as connection:
        try:
            result = await connection.fetch(query, *args)
            result = [dict(row) for row in result]
            _decimal_to_float(result)
            return result
        # Any exception while fetching the data shouldn't trigger a retry, so
        # they are caught here
        except asyncio.TimeoutError:
            _logger.error(f"Query timed out\n{query}{args}")

async def print_counts():
    for database_name, database in _databases.items():
        print(database_name, database._queue.qsize(), POOL_MAX_SIZE)

async def fetch(database, query, *args):
    check_database(database)
    # Try to run the query a number of times
    count = 0
    while count != QUERY_TRIES:
        count += 1
        try:
            return await _fetch_data(database, query, *args)
        # The following exceptions may retry to fetch the data
        # If caught SerializationError
        except asyncpg.exceptions.SerializationError:
            _logger.info("Conflict with recovery, retrying")
        # If caught TimeoutError (a connection timeout, not a query timeout)
        except asyncio.TimeoutError:
            _logger.info("Connection timed out, retrying")
        # Return None if caught any other exception
        except:
            _logger.error(f"{traceback.format_exc()}\n{query} {args}")
            return None
        # Delay before retrying
        await asyncio.sleep(1)

After removing the ethernet cable, I wait for some time so an external timeout is triggered (await asyncio.wait(futures, timeout=30)). When this happens, the application should have finished all the tasks (if everything went well) and I would be able to finish it safelly. Before letting the loop close, there's a delay and I interrupt the execution using Ctrl+C. It works fine when there are no pending tasks, but when the previous event happens, some of the tasks "lost" are interrupted, generating the a stack trace like the following one.

[2017-11-01 00:09:25,800] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<Pool.release.<locals>._release_impl() running at /usr/local/lib/python3.6/site-packages/asyncpg/pool.py:465> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f109d68>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:672]>
[2017-11-01 00:09:25,804] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<Pool.release.<locals>._release_impl() running at /usr/local/lib/python3.6/site-packages/asyncpg/pool.py:465> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f0891f8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:672]>
[2017-11-01 00:09:25,808] (ERROR) asyncio: Fatal write error on socket transport
protocol: <asyncpg.protocol.protocol.Protocol object at 0x7f830f6bb588>
transport: <_SelectorSocketTransport fd=9>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
    await self._con.reset()
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
    await self.execute(reset_query)
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 762, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <coroutine object Pool.release.<locals>._release_impl at 0x7f830f197678>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 465, in _release_impl
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 203, in release
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
  File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'
[2017-11-01 00:09:25,813] (ERROR) asyncio: Fatal write error on socket transport
protocol: <asyncpg.protocol.protocol.Protocol object at 0x7f830f6bb6d8>
transport: <_SelectorSocketTransport fd=10>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
    await self._con.reset()
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
    await self.execute(reset_query)
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 762, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <coroutine object Pool.release.<locals>._release_impl at 0x7f830f197990>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 465, in _release_impl
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 203, in release
  File "/usr/local/lib/python3.6/site-packages/asyncpg/pool.py", line 192, in release
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 986, in reset
  File "/usr/local/lib/python3.6/site-packages/asyncpg/connection.py", line 238, in execute
  File "asyncpg/protocol/protocol.pyx", line 296, in query
AttributeError: 'weakref' object has no attribute 'cline_in_traceback'
[2017-11-01 00:09:25,817] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<DefaultModule.run() running at ./src/models/module.py:52> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f1093a8>()]>>
[2017-11-01 00:09:25,821] (ERROR) asyncio: Task was destroyed but it is pending!
task: <Task pending coro=<DefaultModule.run() running at ./src/models/module.py:52> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f830f089198>()]>>
[2017-11-01 00:09:25,825] (ERROR) DatabaseController: Traceback (most recent call last):
  File "./src/controllers/database.py", line 102, in fetch
    _logger.info("Conflict with recovery, retrying")
GeneratorExit

I've tried adding some timeouts in other places, but there's nothing I can do to make it go back to the pool. I even tried to add some logs trying to track where it's happening, but couldn't find it.

A simple version of the application is:

async def run():
    queries = []
    futures = [database_controller.fetch(query) for query in queries]
    await asyncio.wait(futures, timeout=30) # Connection drops while executing this line
    await database_controller.print_counts() # Prints a queue size smaller than the pool max size when the connection was lost
    await asyncio.sleep(1000) # Interrupting the execution here after waiting a lot more than every timeout set in the code

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions