Skip to content

Commit 91ce1d0

Browse files
authored
Merge pull request #58 from naim94a/main
postgres: add cancel_token
2 parents d9e2195 + 52d1b24 commit 91ce1d0

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

src/pg/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,11 @@ impl AsyncPgConnection {
296296
Ok(conn)
297297
}
298298

299+
/// Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client.
300+
pub fn cancel_token(&self) -> tokio_postgres::CancelToken {
301+
self.conn.cancel_token()
302+
}
303+
299304
async fn set_config_options(&mut self) -> QueryResult<()> {
300305
use crate::run_query_dsl::RunQueryDsl;
301306

tests/lib.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use diesel::prelude::{ExpressionMethods, OptionalExtension, QueryDsl};
2-
use diesel::QueryResult;
2+
use diesel::{sql_function, QueryResult};
33
use diesel_async::*;
44
use scoped_futures::ScopedFutureExt;
55
use std::fmt::Debug;
@@ -111,6 +111,45 @@ async fn setup(connection: &mut TestConnection) {
111111
.unwrap();
112112
}
113113

114+
#[cfg(feature = "postgres")]
115+
sql_function!(fn pg_sleep(interval: diesel::sql_types::Double));
116+
117+
#[cfg(feature = "postgres")]
118+
#[tokio::test]
119+
async fn postgres_cancel_token() {
120+
use std::time::Duration;
121+
122+
use diesel::result::{DatabaseErrorKind, Error};
123+
124+
let conn = &mut connection().await;
125+
126+
let token = conn.cancel_token();
127+
128+
// execute a query that runs for a long time
129+
let long_running_query = diesel::select(pg_sleep(5.0)).execute(conn);
130+
131+
// execute the query elsewhere...
132+
let task = tokio::spawn(async move {
133+
long_running_query
134+
.await
135+
.expect_err("query should have been canceled.")
136+
});
137+
138+
// let the task above have some time to actually start...
139+
tokio::time::sleep(Duration::from_millis(500)).await;
140+
141+
// invoke the cancellation token.
142+
token.cancel_query(tokio_postgres::NoTls).await.unwrap();
143+
144+
// make sure the query task resulted in a cancellation error
145+
let err = task.await.unwrap();
146+
match err {
147+
Error::DatabaseError(DatabaseErrorKind::Unknown, v)
148+
if v.message() == "canceling statement due to user request" => {}
149+
_ => panic!("unexpected error: {:?}", err),
150+
}
151+
}
152+
114153
#[cfg(feature = "postgres")]
115154
async fn setup(connection: &mut TestConnection) {
116155
diesel::sql_query(

0 commit comments

Comments
 (0)