Replies: 6 comments 5 replies
-
Hello, your example is very difficult to reason on. It uses asyncio + psycopg2, which is not really a supported combination, it uses global variables, signals. I have no idea what goes on there, probably a problem at asyncio level, nothing wrong in psycopg there. You would be much better off to move to psycopg 3. Your program, working, is as simple as: import asyncio
import psycopg
db_name = "piro"
db_user = "piro"
async def connect_to_db_and_listen():
conn = await psycopg.AsyncConnection.connect(
f"dbname={db_name} user={db_user}", autocommit=True
)
await conn.execute("LISTEN test_update")
print("Listening for notifications on channel 'test_update'.")
async for notify in conn.notifies():
print(f"Got NOTIFY: {notify.pid}, {notify.channel}.")
async def main():
while True:
try:
await connect_to_db_and_listen()
except psycopg.OperationalError as e:
print(f"Caught an OperationalError: {e}")
print("Attempting to reconnect to the database in 5 seconds.")
await asyncio.sleep(5)
asyncio.run(main()) In shell 1:
In shell 2, running
In psycopg2 every connection is isolated, I don't believe it is a problem in psycopg but I think that something breaks probably with |
Beta Was this translation helpful? Give feedback.
-
Thank you very much! That works great in psycopg 3 - I am going to try migrating the project over. Btw, |
Beta Was this translation helpful? Give feedback.
-
In order to handle a ctrl-c and a graceful exit you can use normal Python exceptions: import asyncio
import psycopg
db_name = "piro"
db_user = "piro"
async def connect_to_db_and_listen() -> None:
conn = await psycopg.AsyncConnection.connect(
f"dbname={db_name} user={db_user}", autocommit=True
)
await conn.execute("LISTEN test_update")
print("Listening for notifications on channel 'test_update'.")
async for notify in conn.notifies():
print(f"Got NOTIFY: {notify.pid}, {notify.channel}.")
async def main() -> None:
while True:
try:
await connect_to_db_and_listen()
except psycopg.OperationalError as e:
print(f"Caught {type(e)}: {e}")
print("Attempting to reconnect to the database in 5 seconds.")
await asyncio.sleep(5)
except asyncio.CancelledError as e:
print(f"Caught {type(e)}")
break
try:
asyncio.run(main())
except KeyboardInterrupt as e:
print(f"Caught {type(e)}") |
Beta Was this translation helpful? Give feedback.
-
And, instead of having your own retry loop, you can use a pool with one connection and rely on it to handle the reconnection, which would use some backoff itself to wait for the server restart. You can check the pool behaviour from the logging messages: import asyncio
import logging
import psycopg
from psycopg_pool import AsyncConnectionPool
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logging.getLogger("psycopg").setLevel(logging.INFO)
logger = logging.getLogger()
db_name = "piro"
db_user = "piro"
pool = AsyncConnectionPool(
f"dbname={db_name} user={db_user}",
kwargs={"autocommit": True},
min_size=1,
open=False,
)
async def connect_to_db_and_listen() -> None:
async with pool.connection() as conn:
await conn.execute("LISTEN test_update")
logger.info("Listening for notifications on channel 'test_update'.")
async for notify in conn.notifies():
logger.info(f"Got NOTIFY: {notify.pid}, {notify.channel}.")
async def main() -> None:
await pool.open(wait=True)
while True:
try:
await connect_to_db_and_listen()
except psycopg.OperationalError as e:
logger.warning(f"Caught {type(e).__name__}: {e}")
try:
asyncio.run(main())
except KeyboardInterrupt as e:
logger.info(f"Caught {type(e).__name__}") Runtime may look like:
|
Beta Was this translation helpful? Give feedback.
-
Fantastic - using a connection pool like that seems to provide a great separation of concerns. I'll try to set that up in the migration. Thanks! |
Beta Was this translation helpful? Give feedback.
-
I noticed one functional difference between the original code and the new one, to do with 'packets' of notifications. In the original code I would sometimes get a list of notifications all in one go (in 'one packet'/generated in a single transaction) in the handler, whereas with the With the former method I could then batch-process the I'm loving the async features of psycopg 3. Is there a way to maintain the old behaviour (of processing 'packets' of To test, one can issue in psql:
|
Beta Was this translation helpful? Give feedback.
-
Thank you so much for this package!
Below is a script that demonstrates the problem. It
LISTEN
s to notifications on a channeltest_update
, and prints them when they arrive. It also reconnects and re-listens after a few seconds if the PostgreSQL server is restarted. For some reasonNOTIFY
s no longer come through after the reconnect + re-LISTEN
(but the connection works otherwise).To test:
In psql:
NOTIFY test_update;
results in the Python script printing:To restart PostgreSQL:
sudo systemctl restart postgresql
The Python script then outputs:
So far, so good. I can confirm that the reconnection works (e.g. by issuing a
SELECT 1+1;
and fetching the result). But now issuingNOTIFY test_update;
in psql yields no result in the Python script.What's even stranger is that if I create another connection (creating a copy of
conn
calledconn2
and a correspondinghandle_notify2
), then:Beta Was this translation helpful? Give feedback.
All reactions