@@ -114,6 +114,48 @@ const FAKE_OID: u32 = 0;
114114/// # }
115115/// ```
116116///
117+ /// For more complex cases, an immutable reference to the connection need to be used:
118+ /// ```rust
119+ /// # include!("../doctest_setup.rs");
120+ /// use diesel_async::RunQueryDsl;
121+ ///
122+ /// #
123+ /// # #[tokio::main(flavor = "current_thread")]
124+ /// # async fn main() {
125+ /// # run_test().await.unwrap();
126+ /// # }
127+ /// #
128+ /// # async fn run_test() -> QueryResult<()> {
129+ /// # use diesel::sql_types::{Text, Integer};
130+ /// # let conn = &mut establish_connection().await;
131+ /// #
132+ /// async fn fn12(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
133+ /// let f1 = diesel::select(1_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
134+ /// let f2 = diesel::select(2_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
135+ ///
136+ /// futures_util::try_join!(f1, f2)
137+ /// }
138+ ///
139+ /// async fn fn34(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
140+ /// let f3 = diesel::select(3_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
141+ /// let f4 = diesel::select(4_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
142+ ///
143+ /// futures_util::try_join!(f3, f4)
144+ /// }
145+ ///
146+ /// let f12 = fn12(&conn);
147+ /// let f34 = fn34(&conn);
148+ ///
149+ /// let ((r1, r2), (r3, r4)) = futures_util::try_join!(f12, f34).unwrap();
150+ ///
151+ /// assert_eq!(r1, 1);
152+ /// assert_eq!(r2, 2);
153+ /// assert_eq!(r3, 3);
154+ /// assert_eq!(r4, 4);
155+ /// # Ok(())
156+ /// # }
157+ /// ```
158+ ///
117159/// ## TLS
118160///
119161/// Connections created by [`AsyncPgConnection::establish`] do not support TLS.
@@ -136,6 +178,12 @@ pub struct AsyncPgConnection {
136178}
137179
138180impl SimpleAsyncConnection for AsyncPgConnection {
181+ async fn batch_execute ( & mut self , query : & str ) -> QueryResult < ( ) > {
182+ SimpleAsyncConnection :: batch_execute ( & mut & * self , query) . await
183+ }
184+ }
185+
186+ impl SimpleAsyncConnection for & AsyncPgConnection {
139187 async fn batch_execute ( & mut self , query : & str ) -> QueryResult < ( ) > {
140188 self . record_instrumentation ( InstrumentationEvent :: start_query ( & StrQueryHelper :: new (
141189 query,
@@ -167,6 +215,38 @@ impl AsyncConnectionCore for AsyncPgConnection {
167215 type Row < ' conn , ' query > = PgRow ;
168216 type Backend = diesel:: pg:: Pg ;
169217
218+ fn load < ' conn , ' query , T > ( & ' conn mut self , source : T ) -> Self :: LoadFuture < ' conn , ' query >
219+ where
220+ T : AsQuery + ' query ,
221+ T :: Query : QueryFragment < Self :: Backend > + QueryId + ' query ,
222+ {
223+ AsyncConnectionCore :: load ( & mut & * self , source)
224+ }
225+
226+ fn execute_returning_count < ' conn , ' query , T > (
227+ & ' conn mut self ,
228+ source : T ,
229+ ) -> Self :: ExecuteFuture < ' conn , ' query >
230+ where
231+ T : QueryFragment < Self :: Backend > + QueryId + ' query ,
232+ {
233+ AsyncConnectionCore :: execute_returning_count ( & mut & * self , source)
234+ }
235+ }
236+
237+ impl AsyncConnectionCore for & AsyncPgConnection {
238+ type LoadFuture < ' conn , ' query > =
239+ <AsyncPgConnection as AsyncConnectionCore >:: LoadFuture < ' conn , ' query > ;
240+
241+ type ExecuteFuture < ' conn , ' query > =
242+ <AsyncPgConnection as AsyncConnectionCore >:: ExecuteFuture < ' conn , ' query > ;
243+
244+ type Stream < ' conn , ' query > = <AsyncPgConnection as AsyncConnectionCore >:: Stream < ' conn , ' query > ;
245+
246+ type Row < ' conn , ' query > = <AsyncPgConnection as AsyncConnectionCore >:: Row < ' conn , ' query > ;
247+
248+ type Backend = <AsyncPgConnection as AsyncConnectionCore >:: Backend ;
249+
170250 fn load < ' conn , ' query , T > ( & ' conn mut self , source : T ) -> Self :: LoadFuture < ' conn , ' query >
171251 where
172252 T : AsQuery + ' query ,
@@ -962,4 +1042,37 @@ mod tests {
9621042 assert_eq ! ( r1, 1 ) ;
9631043 assert_eq ! ( r2, 2 ) ;
9641044 }
1045+
1046+ #[ tokio:: test]
1047+ async fn pipelining_with_composed_futures ( ) {
1048+ let database_url =
1049+ std:: env:: var ( "DATABASE_URL" ) . expect ( "DATABASE_URL must be set in order to run tests" ) ;
1050+ let conn = crate :: AsyncPgConnection :: establish ( & database_url)
1051+ . await
1052+ . unwrap ( ) ;
1053+
1054+ async fn fn12 ( mut conn : & AsyncPgConnection ) -> QueryResult < ( i32 , i32 ) > {
1055+ let f1 = diesel:: select ( 1_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut conn) ;
1056+ let f2 = diesel:: select ( 2_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut conn) ;
1057+
1058+ futures_util:: try_join!( f1, f2)
1059+ }
1060+
1061+ async fn fn34 ( mut conn : & AsyncPgConnection ) -> QueryResult < ( i32 , i32 ) > {
1062+ let f3 = diesel:: select ( 3_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut conn) ;
1063+ let f4 = diesel:: select ( 4_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut conn) ;
1064+
1065+ futures_util:: try_join!( f3, f4)
1066+ }
1067+
1068+ let f12 = fn12 ( & conn) ;
1069+ let f34 = fn34 ( & conn) ;
1070+
1071+ let ( ( r1, r2) , ( r3, r4) ) = futures_util:: try_join!( f12, f34) . unwrap ( ) ;
1072+
1073+ assert_eq ! ( r1, 1 ) ;
1074+ assert_eq ! ( r2, 2 ) ;
1075+ assert_eq ! ( r3, 3 ) ;
1076+ assert_eq ! ( r4, 4 ) ;
1077+ }
9651078}
0 commit comments