Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Pg.pm
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ use 5.008001;
DBD::Pg::db->install_method('pg_result'); ## NOT duplicated below!
DBD::Pg::db->install_method('pg_rollback_to');
DBD::Pg::db->install_method('pg_savepoint');
DBD::Pg::db->install_method('pg_send_cancel');
DBD::Pg::db->install_method('pg_server_trace');
DBD::Pg::db->install_method('pg_server_untrace');
DBD::Pg::db->install_method('pg_type_info');
Expand Down Expand Up @@ -4208,6 +4209,14 @@ that would have been returned by the asynchronous L</do> or L</execute> if it ha

$result = $dbh->pg_result;

=item B<pg_send_cancel>

Send a request to cancel a running asynchronous query to the
server. Returns true if this succeeded, false otherwise. The actual
outcome of the query still needs to be determined in the ordinary
way. If a running query was actually cancelled, C<pg_result> will
return zero and the C<state> method will return 57014.

=back

=head3 Asynchronous Examples
Expand Down
7 changes: 7 additions & 0 deletions Pg.xs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,13 @@ pg_ready(dbh)
D_imp_dbh(dbh);
ST(0) = sv_2mortal(newSViv(pg_db_ready(dbh, imp_dbh)));

void
pg_send_cancel(dbh)
SV *dbh
CODE:
D_imp_dbh(dbh);
ST(0) = pg_db_send_cancel(dbh, imp_dbh) ? &PL_sv_yes : &PL_sv_no;

void
pg_cancel(dbh)
SV *dbh
Expand Down
171 changes: 72 additions & 99 deletions dbdimp.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,33 @@ static int pg_db_start_txn (pTHX_ SV *dbh, imp_dbh_t *imp_dbh);
static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int asyncflag);
static void pg_db_detect_client_encoding_utf8(pTHX_ imp_dbh_t *imp_dbh);

static int do_send_cancel(SV *h, imp_dbh_t *imp_dbh, char const *caller)
{
dTHX;
PGcancel *cancel;
char errbuf[256];

/* Get the cancel structure */
TRACE_PQGETCANCEL;
cancel = PQgetCancel(imp_dbh->conn);

/* This almost always works. If not, free our structure and complain loudly */
TRACE_PQCANCEL;
if (! PQcancel(cancel,errbuf,sizeof(errbuf))) {
TRACE_PQFREECANCEL;
PQfreeCancel(cancel);
if (TRACEWARN_slow) { TRC(DBILOGFP, "%sPQcancel failed: %s\n", THEADER_slow, errbuf); }
_fatal_sqlstate(aTHX_ imp_dbh);
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "PQcancel failed");
if (TEND_slow) TRC(DBILOGFP, "%sEnd %s (error: cancel failed)\n", THEADER_slow, caller);
return DBDPG_FALSE;
}
TRACE_PQFREECANCEL;
PQfreeCancel(cancel);

return DBDPG_TRUE;
}

/* ================================================================== */
void dbd_init (dbistate_t *dbistate)
{
Expand Down Expand Up @@ -5333,6 +5360,12 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)
pg_error(aTHX_ h, status, PQerrorMessage(imp_dbh->conn));
break;
case PGRES_FATAL_ERROR:
/* query cancelled? */
if (0 == strncmp(imp_dbh->sqlstate, "57014", 5)) {
rows = 0;
break;
}

default:
rows = -2;
TRACE_PQERRORMESSAGE;
Expand Down Expand Up @@ -5422,87 +5455,52 @@ int pg_db_ready(SV *h, imp_dbh_t *imp_dbh)

/* ================================================================== */
/*
Attempt to cancel a running asynchronous query
Returns true if the cancel succeeded, and false if it did not
In this case, pg_cancel will return false.
NOTE: We only return true if we cancelled
Send a cancel request for a running asynchronus query to the server.
The result of the query - which may be "query was cancelled" (SQLSTATE 57014) -
still needs to be determined in the ordinary way.
*/
int pg_db_cancel(SV *h, imp_dbh_t *imp_dbh)
int pg_db_send_cancel(SV *h, imp_dbh_t *imp_dbh)
{
dTHX;
PGcancel *cancel;
char errbuf[256];
PGresult *result;
ExecStatusType status;

if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_cancel (async status: %d)\n",
if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_send_cancel (async status: %d)\n",
THEADER_slow, imp_dbh->async_status);

if (0 == imp_dbh->async_status) {
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "No asynchronous query is running");
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel (error: no async)\n", THEADER_slow);
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_cancel (error: no async)\n", THEADER_slow);
return DBDPG_FALSE;
}

if (-1 == imp_dbh->async_status) {
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "Asychronous query has already been cancelled");
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel (error: async cancelled)\n", THEADER_slow);
if (!do_send_cancel(h, imp_dbh, "pg_db_send_cancel"))
return DBDPG_FALSE;
}

/* Get the cancel structure */
TRACE_PQGETCANCEL;
cancel = PQgetCancel(imp_dbh->conn);
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_cancel\n", THEADER_slow);
return DBDPG_TRUE;
} /* end of pg_db_send_cancel */

/* This almost always works. If not, free our structure and complain loudly */
TRACE_PQGETCANCEL;
if (! PQcancel(cancel,errbuf,sizeof(errbuf))) {
TRACE_PQFREECANCEL;
PQfreeCancel(cancel);
if (TRACEWARN_slow) { TRC(DBILOGFP, "%sPQcancel failed: %s\n", THEADER_slow, errbuf); }
_fatal_sqlstate(aTHX_ imp_dbh);
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "PQcancel failed");
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel (error: cancel failed)\n", THEADER_slow);
return DBDPG_FALSE;
}
TRACE_PQFREECANCEL;
PQfreeCancel(cancel);

/* Whatever else happens, we should no longer be inside of an async query */
imp_dbh->async_status = -1;
if (NULL != imp_dbh->async_sth)
imp_dbh->async_sth->async_status = -1;
/* ================================================================== */
/*
Attempt to cancel a running asynchronous query
Returns true if the cancel succeeded, and false if it did not
In this case, pg_cancel will return false.
NOTE: We only return true if we cancelled
*/
int pg_db_cancel(SV *h, imp_dbh_t *imp_dbh)
{
dTHX;

/* Read in the result - assume only one */
TRACE_PQGETRESULT;
result = PQgetResult(imp_dbh->conn);
status = _sqlstate(aTHX_ imp_dbh, result);
if (!result) {
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "Failed to get a result after PQcancel");
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel (error: no result)\n", THEADER_slow);
return DBDPG_FALSE;
}
if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_cancel (async status: %d)\n",
THEADER_slow, imp_dbh->async_status);

TRACE_PQCLEAR;
PQclear(result);
if (!pg_db_send_cancel(h, imp_dbh))
return DBDPG_FALSE;

/* If we actually cancelled a running query, just return true - the caller must rollback if needed */
if (0 == strncmp(imp_dbh->sqlstate, "57014", 5)) {
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel\n", THEADER_slow);
return DBDPG_TRUE;
}
pg_db_result(h, imp_dbh);

/* If we got any other error, make sure we report it */
if (0 != strncmp(imp_dbh->sqlstate, "00000", 5)) {
if (TRACEWARN_slow) TRC(DBILOGFP,
"%sQuery was not cancelled: was already finished\n", THEADER_slow);
TRACE_PQERRORMESSAGE;
pg_error(aTHX_ h, status, PQerrorMessage(imp_dbh->conn));
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel (error)\n", THEADER_slow);
}
else if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel\n", THEADER_slow);
return DBDPG_FALSE;

if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel\n", THEADER_slow);
return 0 == strncmp(imp_dbh->sqlstate, "57014", 5);
} /* end of pg_db_cancel */


Expand Down Expand Up @@ -5543,36 +5541,27 @@ static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int as
if (TRACE3_slow) { TRC(DBILOGFP, "%sCancelling old async command\n", THEADER_slow); }
TRACE_PQISBUSY;
if (PQisBusy(imp_dbh->conn)) {
PGcancel *cancel;
char errbuf[256];
int cresult;
if (TRACE3_slow) TRC(DBILOGFP, "%sAttempting to cancel query\n", THEADER_slow);
TRACE_PQGETCANCEL;
cancel = PQgetCancel(imp_dbh->conn);
TRACE_PQCANCEL;
cresult = PQcancel(cancel,errbuf,255);
if (! cresult) {
if (TRACEWARN_slow) { TRC(DBILOGFP, "%sPQcancel failed: %s\n", THEADER_slow, errbuf); }
_fatal_sqlstate(aTHX_ imp_dbh);
pg_error(aTHX_ handle, PGRES_FATAL_ERROR, "Could not cancel previous command");
if (TEND_slow) TRC(DBILOGFP, "%sEnd handle_old_async (error: could not cancel)\n", THEADER_slow);

if (!do_send_cancel(handle, imp_dbh, "handle_old_async"))
return -2;
}
TRACE_PQFREECANCEL;
PQfreeCancel(cancel);

/* Suck up the cancellation notice */
TRACE_PQGETRESULT;
while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
TRACE_PQCLEAR;
PQclear(result);
}
/* We need to rollback! - reprepare!? */
TRACE_PQEXEC;
PQexec(imp_dbh->conn, "rollback");
imp_dbh->done_begin = DBDPG_FALSE;

if (PQTRANS_IDLE != pg_db_txn_status(aTHX_ imp_dbh)) {
/* We need to rollback! - reprepare!? */
TRACE_PQEXEC;
PQexec(imp_dbh->conn, "rollback");
imp_dbh->done_begin = DBDPG_FALSE;
}
}
}
else if (asyncflag & PG_OLDQUERY_WAIT || imp_dbh->async_status == -1) {
else if (asyncflag & PG_OLDQUERY_WAIT) {
/* Finish up the outstanding query and throw out the result, unless an error */
if (TRACE3_slow) { TRC(DBILOGFP, "%sWaiting for old async command to finish\n", THEADER_slow); }
TRACE_PQGETRESULT;
Expand Down Expand Up @@ -5630,27 +5619,11 @@ int dbd_st_cancel(SV *sth, imp_sth_t *imp_sth)
{
dTHX;
D_imp_dbh_from_sth;
PGcancel *cancel;
char errbuf[256];

if (TSTART_slow) TRC(DBILOGFP, "%sBegin dbd_st_cancel\n", THEADER_slow);

/* Get the cancel structure */
TRACE_PQGETCANCEL;
cancel = PQgetCancel(imp_dbh->conn);

/* This almost always works. If not, free our structure and complain loudly */
TRACE_PQGETCANCEL;
if (!PQcancel(cancel, errbuf, sizeof(errbuf))) {
TRACE_PQFREECANCEL;
PQfreeCancel(cancel);
if (TRACEWARN_slow) TRC(DBILOGFP, "%sPQcancel failed: %s\n", THEADER_slow, errbuf);
pg_error(aTHX_ sth, PGRES_FATAL_ERROR, "PQcancel failed");
if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_st_cancel (error: cancel failed)\n", THEADER_slow);
if (!do_send_cancel(sth, imp_dbh, "dbd_st_cancel"))
return DBDPG_FALSE;
}
TRACE_PQFREECANCEL;
PQfreeCancel(cancel);

if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_st_cancel\n", THEADER_slow);
return DBDPG_TRUE;
Expand Down
2 changes: 2 additions & 0 deletions dbdimp.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh);

int pg_db_ready(SV *h, imp_dbh_t *imp_dbh);

int pg_db_send_cancel (SV *h, imp_dbh_t *imp_dbh);

int pg_db_cancel (SV *h, imp_dbh_t *imp_dbh);

int pg_db_cancel_sth (SV *sth, imp_sth_t *imp_sth);
Expand Down
8 changes: 4 additions & 4 deletions t/02attribs.t
Original file line number Diff line number Diff line change
Expand Up @@ -1065,11 +1065,11 @@ is ($sth->{pg_async_status}, 1, $t);
$t=q{Database handle attribute "pg_async_status" returns a 1 after an asynchronous execute};
is ($dbh->{pg_async_status}, 1, $t);

$t=q{Statement handle attribute "pg_async_status" returns a -1 after a cancel};
$t=q{Statement handle attribute "pg_async_status" returns a 0 after a cancel};
$dbh->pg_cancel();
is ($sth->{pg_async_status}, -1, $t);
$t=q{Database handle attribute "pg_async_status" returns a -1 after a cancel};
is ($dbh->{pg_async_status}, -1, $t);
is ($sth->{pg_async_status}, 0, $t);
$t=q{Database handle attribute "pg_async_status" returns a 0 after a cancel};
is ($dbh->{pg_async_status}, 0, $t);
sleep 3;

#
Expand Down
25 changes: 18 additions & 7 deletions t/08async.t
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ use DBD::Pg ':async';
require 'dbdpg_test_setup.pl';
select(($|=1,select(STDERR),$|=1)[1]);

my $dbh = connect_database();
my $dbh = connect_database({AutoCommit => 1});

if (! $dbh) {
plan skip_all => 'Connection to database failed, cannot continue testing';
}

plan tests => 67;
plan tests => 70;

isnt ($dbh, undef, 'Connect to database for async testing');

Expand Down Expand Up @@ -82,9 +82,9 @@ is ($@, q{}, $t);
$t=q{Database method pg_cancel returns a false value when cancellation works but finished};
is ($res, q{}, $t);

$t=q{Database attribute "async_status" returns -1 after pg_cancel};
$t=q{Database attribute "async_status" returns 0 after pg_cancel};
$res = $dbh->{pg_async_status};
is ($res, -1, $t);
is ($res, 0, $t);

$t=q{Running do() after a cancelled query works};
eval {
Expand Down Expand Up @@ -263,6 +263,18 @@ SKIP: {

$sth->finish();

$t = q{Can get result of an async query which already finished after pg_send_cancel};
$dbh->do('select 123', { pg_async => PG_ASYNC});
sleep(1);
$dbh->pg_send_cancel();
$res = $dbh->pg_result();
is($res, 1, $t);

$dbh->do('select pg_sleep(10)', { pg_async => PG_ASYNC });
$dbh->pg_send_cancel();
$res = $dbh->pg_result();
is (0+$res, 0, 'pg_result returns zero after cancelled query');
is ($dbh->state(), '57014', 'state is 57014 after cancelled query');
} ## end of pg_sleep skip


Expand Down Expand Up @@ -309,8 +321,8 @@ like ($@, qr{previous async}, $t);

$dbh->pg_cancel;

$t=q{Directly after pg_cancel(), pg_async_status is -1};
is ($dbh->{pg_async_status}, -1, $t);
$t=q{Directly after pg_cancel(), pg_async_status is 0};
is ($dbh->{pg_async_status}, 0, $t);

$t=q{Method execute() works when prepare has PG_ASYNC flag};
$sth->execute();
Expand All @@ -337,7 +349,6 @@ $t=q{Method fetchall_arrayref returns correct result after pg_result};
is_deeply ($res, [[123]], $t);

$dbh->do('CREATE TABLE dbd_pg_test5(id INT, t TEXT)');
$dbh->commit();
$sth->execute();

$t=q{Method prepare() works when passed in PG_OLDQUERY_CANCEL};
Expand Down