Skip to content

Commit 33dbbcb

Browse files
committed
Support cancellation in synchronous client
1 parent a8fa2a4 commit 33dbbcb

File tree

11 files changed

+208
-25
lines changed

11 files changed

+208
-25
lines changed

postgres/src/cancel_token.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use tokio::runtime;
2+
use tokio_postgres::tls::MakeTlsConnect;
3+
use tokio_postgres::{Error, Socket};
4+
5+
/// The capability to request cancellation of in-progress queries on a
6+
/// connection.
7+
#[derive(Clone)]
8+
pub struct CancelToken(tokio_postgres::CancelToken);
9+
10+
impl CancelToken {
11+
pub(crate) fn new(inner: tokio_postgres::CancelToken) -> CancelToken {
12+
CancelToken(inner)
13+
}
14+
15+
/// Attempts to cancel the in-progress query on the connection associated
16+
/// with this `CancelToken`.
17+
///
18+
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
19+
/// only be returned if the client was unable to connect to the database.
20+
///
21+
/// Cancellation is inherently racy. There is no guarantee that the
22+
/// cancellation request will reach the server before the query terminates
23+
/// normally, or that the connection associated with this token is still
24+
/// active.
25+
pub fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
26+
where
27+
T: MakeTlsConnect<Socket>,
28+
{
29+
runtime::Builder::new()
30+
.enable_all()
31+
.basic_scheduler()
32+
.build()
33+
.unwrap() // FIXME don't unwrap
34+
.block_on(self.0.cancel_query(tls))
35+
}
36+
}

postgres/src/client.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::{Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction};
1+
use crate::{
2+
CancelToken, Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction,
3+
};
24
use std::ops::{Deref, DerefMut};
35
use tokio::runtime::Runtime;
46
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
@@ -443,6 +445,46 @@ impl Client {
443445
Ok(Transaction::new(&mut self.runtime, transaction))
444446
}
445447

448+
/// Constructs a cancellation token that can later be used to request
449+
/// cancellation of a query running on this connection.
450+
///
451+
/// # Examples
452+
///
453+
/// ```no_run
454+
/// use postgres::{Client, NoTls};
455+
/// use postgres::error::SqlState;
456+
/// use std::thread;
457+
/// use std::time::Duration;
458+
///
459+
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
460+
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
461+
///
462+
/// let cancel_token = client.cancel_token();
463+
///
464+
/// thread::spawn(move || {
465+
/// // Abort the query after 5s.
466+
/// thread::sleep(Duration::from_secs(5));
467+
/// cancel_token.cancel_query(NoTls);
468+
/// });
469+
///
470+
/// match client.simple_query("SELECT long_running_query()") {
471+
/// Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {
472+
/// // Handle canceled query.
473+
/// }
474+
/// Err(err) => return Err(err.into()),
475+
/// Ok(rows) => {
476+
/// // ...
477+
/// }
478+
/// }
479+
/// // ...
480+
///
481+
/// # Ok(())
482+
/// # }
483+
/// ```
484+
pub fn cancel_token(&self) -> CancelToken {
485+
CancelToken::new(self.client.cancel_token())
486+
}
487+
446488
/// Determines if the client's connection has already closed.
447489
///
448490
/// If this returns `true`, the client is no longer usable.

postgres/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub use tokio_postgres::{
5454
error, row, tls, types, Column, Portal, SimpleQueryMessage, Socket, Statement, ToStatement,
5555
};
5656

57+
pub use crate::cancel_token::CancelToken;
5758
pub use crate::client::*;
5859
pub use crate::config::Config;
5960
pub use crate::copy_in_writer::CopyInWriter;
@@ -68,6 +69,7 @@ pub use crate::tls::NoTls;
6869
pub use crate::transaction::*;
6970

7071
pub mod binary_copy;
72+
mod cancel_token;
7173
mod client;
7274
pub mod config;
7375
mod copy_in_writer;

postgres/src/test.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
use std::io::{Read, Write};
2+
use std::thread;
3+
use std::time::Duration;
4+
use tokio_postgres::error::SqlState;
25
use tokio_postgres::types::Type;
36
use tokio_postgres::NoTls;
47

@@ -288,3 +291,21 @@ fn portal() {
288291
assert_eq!(rows.len(), 1);
289292
assert_eq!(rows[0].get::<_, i32>(0), 3);
290293
}
294+
295+
#[test]
296+
fn cancel_query() {
297+
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
298+
299+
let cancel_token = client.cancel_token();
300+
let cancel_thread = thread::spawn(move || {
301+
thread::sleep(Duration::from_millis(100));
302+
cancel_token.cancel_query(NoTls).unwrap();
303+
});
304+
305+
match client.batch_execute("SELECT pg_sleep(100)") {
306+
Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {}
307+
t => panic!("unexpected return: {:?}", t),
308+
}
309+
310+
cancel_thread.join().unwrap();
311+
}

postgres/src/transaction.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::{CopyInWriter, CopyOutReader, Portal, RowIter, Rt, Statement, ToStatement};
1+
use crate::{
2+
CancelToken, CopyInWriter, CopyOutReader, Portal, RowIter, Rt, Statement, ToStatement,
3+
};
24
use tokio::runtime::Runtime;
35
use tokio_postgres::types::{ToSql, Type};
46
use tokio_postgres::{Error, Row, SimpleQueryMessage};
@@ -168,6 +170,11 @@ impl<'a> Transaction<'a> {
168170
self.runtime.block_on(self.transaction.batch_execute(query))
169171
}
170172

173+
/// Like `Client::cancel_token`.
174+
pub fn cancel_token(&self) -> CancelToken {
175+
CancelToken::new(self.transaction.cancel_token())
176+
}
177+
171178
/// Like `Client::transaction`.
172179
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
173180
let transaction = self.runtime.block_on(self.transaction.transaction())?;

tokio-postgres/src/cancel_token.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use crate::config::SslMode;
2+
use crate::tls::TlsConnect;
3+
#[cfg(feature = "runtime")]
4+
use crate::{cancel_query, client::SocketConfig, tls::MakeTlsConnect, Socket};
5+
use crate::{cancel_query_raw, Error};
6+
use tokio::io::{AsyncRead, AsyncWrite};
7+
8+
/// The capability to request cancellation of in-progress queries on a
9+
/// connection.
10+
#[derive(Clone)]
11+
pub struct CancelToken {
12+
#[cfg(feature = "runtime")]
13+
pub(crate) socket_config: Option<SocketConfig>,
14+
pub(crate) ssl_mode: SslMode,
15+
pub(crate) process_id: i32,
16+
pub(crate) secret_key: i32,
17+
}
18+
19+
impl CancelToken {
20+
/// Attempts to cancel the in-progress query on the connection associated
21+
/// with this `CancelToken`.
22+
///
23+
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
24+
/// only be returned if the client was unable to connect to the database.
25+
///
26+
/// Cancellation is inherently racy. There is no guarantee that the
27+
/// cancellation request will reach the server before the query terminates
28+
/// normally, or that the connection associated with this token is still
29+
/// active.
30+
///
31+
/// Requires the `runtime` Cargo feature (enabled by default).
32+
#[cfg(feature = "runtime")]
33+
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
34+
where
35+
T: MakeTlsConnect<Socket>,
36+
{
37+
cancel_query::cancel_query(
38+
self.socket_config.clone(),
39+
self.ssl_mode,
40+
tls,
41+
self.process_id,
42+
self.secret_key,
43+
)
44+
.await
45+
}
46+
47+
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
48+
/// connection itself.
49+
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
50+
where
51+
S: AsyncRead + AsyncWrite + Unpin,
52+
T: TlsConnect<S>,
53+
{
54+
cancel_query_raw::cancel_query_raw(
55+
stream,
56+
self.ssl_mode,
57+
tls,
58+
self.process_id,
59+
self.secret_key,
60+
)
61+
.await
62+
}
63+
}

tokio-postgres/src/client.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#[cfg(feature = "runtime")]
2-
use crate::cancel_query;
31
use crate::codec::BackendMessages;
42
use crate::config::{Host, SslMode};
53
use crate::connection::{Request, RequestMessages};
@@ -14,7 +12,7 @@ use crate::to_statement::ToStatement;
1412
use crate::types::{Oid, ToSql, Type};
1513
#[cfg(feature = "runtime")]
1614
use crate::Socket;
17-
use crate::{cancel_query_raw, copy_in, copy_out, query, CopyInSink, Transaction};
15+
use crate::{copy_in, copy_out, query, CancelToken, CopyInSink, Transaction};
1816
use crate::{prepare, SimpleQueryMessage};
1917
use crate::{simple_query, Row};
2018
use crate::{Error, Statement};
@@ -451,42 +449,43 @@ impl Client {
451449
Ok(Transaction::new(self))
452450
}
453451

452+
/// Constructs a cancellation token that can later be used to request
453+
/// cancellation of a query running on the connection associated with
454+
/// this client.
455+
pub fn cancel_token(&self) -> CancelToken {
456+
CancelToken {
457+
#[cfg(feature = "runtime")]
458+
socket_config: self.socket_config.clone(),
459+
ssl_mode: self.ssl_mode,
460+
process_id: self.process_id,
461+
secret_key: self.secret_key,
462+
}
463+
}
464+
454465
/// Attempts to cancel an in-progress query.
455466
///
456467
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
457468
/// only be returned if the client was unable to connect to the database.
458469
///
459470
/// Requires the `runtime` Cargo feature (enabled by default).
460471
#[cfg(feature = "runtime")]
472+
#[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
461473
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
462474
where
463475
T: MakeTlsConnect<Socket>,
464476
{
465-
cancel_query::cancel_query(
466-
self.socket_config.clone(),
467-
self.ssl_mode,
468-
tls,
469-
self.process_id,
470-
self.secret_key,
471-
)
472-
.await
477+
self.cancel_token().cancel_query(tls).await
473478
}
474479

475480
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
476481
/// connection itself.
482+
#[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
477483
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
478484
where
479485
S: AsyncRead + AsyncWrite + Unpin,
480486
T: TlsConnect<S>,
481487
{
482-
cancel_query_raw::cancel_query_raw(
483-
stream,
484-
self.ssl_mode,
485-
tls,
486-
self.process_id,
487-
self.secret_key,
488-
)
489-
.await
488+
self.cancel_token().cancel_query_raw(stream, tls).await
490489
}
491490

492491
/// Determines if the connection to the server has already closed.

tokio-postgres/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.5")]
100100
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
101101

102+
pub use crate::cancel_token::CancelToken;
102103
pub use crate::client::Client;
103104
pub use crate::config::Config;
104105
pub use crate::connection::Connection;
@@ -125,6 +126,7 @@ mod bind;
125126
#[cfg(feature = "runtime")]
126127
mod cancel_query;
127128
mod cancel_query_raw;
129+
mod cancel_token;
128130
mod client;
129131
mod codec;
130132
pub mod config;

tokio-postgres/src/transaction.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use crate::types::{ToSql, Type};
99
#[cfg(feature = "runtime")]
1010
use crate::Socket;
1111
use crate::{
12-
bind, query, slice_iter, Client, CopyInSink, Error, Portal, Row, SimpleQueryMessage, Statement,
13-
ToStatement,
12+
bind, query, slice_iter, CancelToken, Client, CopyInSink, Error, Portal, Row,
13+
SimpleQueryMessage, Statement, ToStatement,
1414
};
1515
use bytes::Buf;
1616
use futures::TryStreamExt;
@@ -249,21 +249,30 @@ impl<'a> Transaction<'a> {
249249
self.client.batch_execute(query).await
250250
}
251251

252+
/// Like `Client::cancel_token`.
253+
pub fn cancel_token(&self) -> CancelToken {
254+
self.client.cancel_token()
255+
}
256+
252257
/// Like `Client::cancel_query`.
253258
#[cfg(feature = "runtime")]
259+
#[deprecated(since = "0.6.0", note = "use Transaction::cancel_token() instead")]
254260
pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
255261
where
256262
T: MakeTlsConnect<Socket>,
257263
{
264+
#[allow(deprecated)]
258265
self.client.cancel_query(tls).await
259266
}
260267

261268
/// Like `Client::cancel_query_raw`.
269+
#[deprecated(since = "0.6.0", note = "use Transaction::cancel_token() instead")]
262270
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
263271
where
264272
S: AsyncRead + AsyncWrite + Unpin,
265273
T: TlsConnect<S>,
266274
{
275+
#[allow(deprecated)]
267276
self.client.cancel_query_raw(stream, tls).await
268277
}
269278

tokio-postgres/tests/test/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ async fn cancel_query_raw() {
304304
let client = connect("user=postgres").await;
305305

306306
let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
307-
let cancel = client.cancel_query_raw(socket, NoTls);
307+
let cancel_token = client.cancel_token();
308+
let cancel = cancel_token.cancel_query_raw(socket, NoTls);
308309
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
309310

310311
let sleep = client.batch_execute("SELECT pg_sleep(100)");

tokio-postgres/tests/test/runtime.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ async fn target_session_attrs_err() {
7070
async fn cancel_query() {
7171
let client = connect("host=localhost port=5433 user=postgres").await;
7272

73-
let cancel = client.cancel_query(NoTls);
73+
let cancel_token = client.cancel_token();
74+
let cancel = cancel_token.cancel_query(NoTls);
7475
let cancel = time::delay_for(Duration::from_millis(100)).then(|()| cancel);
7576

7677
let sleep = client.batch_execute("SELECT pg_sleep(100)");

0 commit comments

Comments
 (0)