Skip to content

Commit

Permalink
adds notifications processing after every PQexec
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Konoval authored and dvarrazzo committed Oct 10, 2024
1 parent 362cb00 commit 282360d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 6 deletions.
10 changes: 10 additions & 0 deletions psycopg/connection_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,11 @@ conn_set_session(connectionObject *self, int autocommit,
}
}

Py_BLOCK_THREADS;
conn_notifies_process(self);
conn_notice_process(self);
Py_UNBLOCK_THREADS;

if (autocommit != SRV_STATE_UNCHANGED) {
self->autocommit = autocommit;
}
Expand Down Expand Up @@ -1408,6 +1413,11 @@ conn_set_client_encoding(connectionObject *self, const char *pgenc)
goto endlock;
}

Py_BLOCK_THREADS;
conn_notifies_process(self);
conn_notice_process(self);
Py_UNBLOCK_THREADS;

endlock:
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
Expand Down
2 changes: 2 additions & 0 deletions psycopg/pqpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ pq_abort(connectionObject *conn)
retvalue = pq_abort_locked(conn, &_save);

Py_BLOCK_THREADS;
conn_notifies_process(conn);
conn_notice_process(conn);
Py_UNBLOCK_THREADS;

Expand Down Expand Up @@ -539,6 +540,7 @@ pq_reset(connectionObject *conn)

Py_BLOCK_THREADS;
conn_notice_process(conn);
conn_notifies_process(conn);
Py_UNBLOCK_THREADS;

pthread_mutex_unlock(&conn->lock);
Expand Down
67 changes: 61 additions & 6 deletions tests/test_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

import os
import unittest
from collections import deque

import psycopg2
from psycopg2 import extensions
from psycopg2.extensions import Notify
from .testutils import ConnectingTestCase, skip_if_crdb, slow
from .testutils import ConnectingTestCase, skip_if_crdb, skip_if_windows, slow
from .testconfig import dsn

import sys
Expand Down Expand Up @@ -74,7 +75,9 @@ def notify(self, name, sec=0, payload=None):
module=psycopg2.__name__,
dsn=dsn, sec=sec, name=name, payload=payload))

return Popen([sys.executable, '-c', script], stdout=PIPE)
env = os.environ.copy()
env.pop("PSYCOPG_DEBUG", None)
return Popen([sys.executable, '-c', script], stdout=PIPE, env=env)

@slow
def test_notifies_received_on_poll(self):
Expand Down Expand Up @@ -127,16 +130,68 @@ def test_notifies_received_on_execute(self):
self.assertEqual('foo', self.conn.notifies[0][1])

@slow
@skip_if_windows
def test_notifies_received_on_commit(self):
self.listen("foo")
self.listen('foo')
self.conn.commit()
self.conn.cursor().execute("select 1;")
pid = int(self.notify("foo").communicate()[0])
self.conn.cursor().execute('select 1;')
pid = int(self.notify('foo').communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
self.conn.commit()
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual("foo", self.conn.notifies[0][1])
self.assertEqual('foo', self.conn.notifies[0][1])

@slow
@skip_if_windows
def test_notifies_received_on_rollback(self):
self.listen('foo')
self.conn.commit()
self.conn.cursor().execute('select 1;')
pid = int(self.notify('foo').communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
self.conn.rollback()
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])

@slow
@skip_if_windows
def test_notifies_received_on_reset(self):
self.listen('foo')
self.conn.commit()
pid = int(self.notify('foo').communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
self.conn.reset()
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])

@slow
@skip_if_windows
def test_notifies_received_on_set_session(self):
self.listen('foo')
self.conn.commit()
pid = int(self.notify('foo').communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
self.conn.set_session(autocommit=True, readonly=True)
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])

@slow
@skip_if_windows
def test_notifies_received_on_set_client_encoding(self):
self.listen('foo')
self.conn.commit()
pid = int(self.notify('foo').communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
self.conn.set_client_encoding(
'LATIN1' if self.conn.encoding != 'LATIN1' else 'UTF8'
)
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])

@slow
def test_notify_object(self):
Expand Down

0 comments on commit 282360d

Please sign in to comment.