Skip to content

Commit 72f3efe

Browse files
committed
Fix GH-9344: pgsql pipeline mode proposal.
Fix freeze pg_cancel_query. In pipeline mode it should be possible to receive part of the results.
1 parent d752406 commit 72f3efe

File tree

2 files changed

+110
-7
lines changed

2 files changed

+110
-7
lines changed

ext/pgsql/pgsql.c

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3517,6 +3517,9 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type)
35173517
pgsql_link_handle *link;
35183518
PGconn *pgsql;
35193519
PGresult *pgsql_result;
3520+
#ifdef LIBPQ_HAS_PIPELINING
3521+
bool is_pipeline_mode;
3522+
#endif
35203523

35213524
if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
35223525
RETURN_THROWS();
@@ -3526,10 +3529,17 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type)
35263529
CHECK_PGSQL_LINK(link);
35273530
pgsql = link->conn;
35283531

3529-
if (PQsetnonblocking(pgsql, 1)) {
3530-
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
3531-
RETURN_FALSE;
3532+
#ifdef LIBPQ_HAS_PIPELINING
3533+
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
3534+
if (!is_pipeline_mode) {
3535+
#endif
3536+
if (PQsetnonblocking(pgsql, 1)) {
3537+
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
3538+
RETURN_FALSE;
3539+
}
3540+
#ifdef LIBPQ_HAS_PIPELINING
35323541
}
3542+
#endif
35333543
switch(entry_type) {
35343544
case PHP_PG_ASYNC_IS_BUSY:
35353545
PQconsumeInput(pgsql);
@@ -3545,17 +3555,29 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type)
35453555
if (rc < 0) {
35463556
zend_error(E_WARNING, "cannot cancel the query: %s", err);
35473557
}
3548-
while ((pgsql_result = PQgetResult(pgsql))) {
3549-
PQclear(pgsql_result);
3558+
#ifdef LIBPQ_HAS_PIPELINING
3559+
if (!is_pipeline_mode) {
3560+
#endif
3561+
while ((pgsql_result = PQgetResult(pgsql))) {
3562+
PQclear(pgsql_result);
3563+
}
3564+
#ifdef LIBPQ_HAS_PIPELINING
35503565
}
3566+
#endif
35513567
PQfreeCancel(c);
35523568
break;
35533569
}
35543570
EMPTY_SWITCH_DEFAULT_CASE()
35553571
}
3556-
if (PQsetnonblocking(pgsql, 0)) {
3557-
php_error_docref(NULL, E_NOTICE, "Cannot set connection to blocking mode");
3572+
#ifdef LIBPQ_HAS_PIPELINING
3573+
if (!is_pipeline_mode) {
3574+
#endif
3575+
if (PQsetnonblocking(pgsql, 0)) {
3576+
php_error_docref(NULL, E_NOTICE, "Cannot set connection to blocking mode");
3577+
}
3578+
#ifdef LIBPQ_HAS_PIPELINING
35583579
}
3580+
#endif
35593581
convert_to_boolean(return_value);
35603582
}
35613583
/* }}} */

ext/pgsql/tests/pg_pipeline_sync.phpt

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,87 @@ if (($result = pg_get_result($db)) !== false) {
117117
}
118118
}
119119

120+
for ($i = 99; $i < 199; ++$i) {
121+
if (!pg_send_query_params($db, "select $1 as index, now() + ($1||' day')::interval as time, pg_sleep(0.001)", array($i))) {
122+
die('pg_send_query_params failed');
123+
}
124+
}
125+
126+
if (!pg_pipeline_sync($db)) {
127+
die('pg_pipeline_sync failed');
128+
}
129+
130+
usleep(10000);
131+
132+
pg_cancel_query($db);
133+
134+
if (pg_pipeline_status($db) !== PGSQL_PIPELINE_ON) {
135+
die('pg_pipeline_status failed');
136+
}
137+
138+
if (pg_connection_busy($db)) {
139+
$read = [$stream]; $write = $ex = [];
140+
while (!stream_select($read, $write, $ex, null, null)) { }
141+
}
142+
143+
$canceled_count = 0;
144+
for ($i = 99; $i < 199; ++$i) {
145+
if (!($result = pg_get_result($db))) {
146+
die('pg_get_result');
147+
}
148+
149+
$result_status = pg_result_status($result);
150+
if ($result_status === PGSQL_FATAL_ERROR) {
151+
if (pg_connection_status($db) !== PGSQL_CONNECTION_OK) {
152+
die('pg_cancel_query failed');
153+
}
154+
if (strpos(pg_last_error($db), 'canceling statement') === false) {
155+
die('pg_cancel_query failed');
156+
}
157+
pg_free_result($result);
158+
if ($result = pg_get_result($db)) {
159+
die('pg_get_result');
160+
}
161+
continue;
162+
}
163+
if ($result_status === 11/*PGSQL_STATUS_PIPELINE_ABORTED*/) {
164+
++$canceled_count;
165+
pg_free_result($result);
166+
if ($result = pg_get_result($db)) {
167+
die('pg_get_result');
168+
}
169+
continue;
170+
}
171+
if ($result_status !== PGSQL_TUPLES_OK) {
172+
die('pg_result_status failed');
173+
}
174+
175+
if (pg_num_rows($result) == -1) {
176+
die('pg_num_rows failed');
177+
}
178+
179+
if (!($row = pg_fetch_row($result, null))) {
180+
die('pg_fetch_row failed');
181+
}
182+
183+
pg_free_result($result);
184+
185+
if (pg_get_result($db) !== false) {
186+
die('pg_get_result failed');
187+
}
188+
}
189+
190+
if ($canceled_count < 1) {
191+
die('pg_cancel_query failed');
192+
}
193+
194+
if (($result = pg_get_result($db)) !== false) {
195+
if (pg_result_status($result) !== PGSQL_PIPELINE_SYNC) {
196+
die('pg_result_status failed');
197+
}
198+
}
199+
200+
120201
if (!pg_exit_pipeline_mode($db)) {
121202
die('pg_exit_pipeline_mode failed');
122203
}

0 commit comments

Comments
 (0)