1+ mod utils;
2+
13use crate :: AsyncConnectionCore ;
24use diesel:: associations:: HasTable ;
35use diesel:: query_builder:: IntoUpdateTarget ;
46use diesel:: result:: QueryResult ;
57use diesel:: AsChangeset ;
68use futures_core:: future:: BoxFuture ;
7- use futures_core:: Stream ;
8- use futures_util:: { future, stream, FutureExt , StreamExt , TryFutureExt , TryStreamExt } ;
9+ #[ cfg( any( feature = "mysql" , feature = "postgres" ) ) ]
10+ use futures_util:: FutureExt ;
11+ use futures_util:: { stream, StreamExt , TryStreamExt } ;
912use std:: future:: Future ;
10- use std:: pin:: Pin ;
1113
1214/// The traits used by `QueryDsl`.
1315///
@@ -22,7 +24,7 @@ pub mod methods {
2224 use diesel:: expression:: QueryMetadata ;
2325 use diesel:: query_builder:: { AsQuery , QueryFragment , QueryId } ;
2426 use diesel:: query_dsl:: CompatibleType ;
25- use futures_util:: { Future , Stream , TryFutureExt } ;
27+ use futures_util:: { Future , Stream } ;
2628
2729 /// The `execute` method
2830 ///
@@ -74,6 +76,7 @@ pub mod methods {
7476 type LoadFuture < ' conn > : Future < Output = QueryResult < Self :: Stream < ' conn > > > + Send
7577 where
7678 Conn : ' conn ;
79+
7780 /// The inner stream returned by [`LoadQuery::internal_load`]
7881 type Stream < ' conn > : Stream < Item = QueryResult < U > > + Send
7982 where
@@ -96,10 +99,7 @@ pub mod methods {
9699 ST : ' static ,
97100 {
98101 type LoadFuture < ' conn >
99- = future:: MapOk <
100- Conn :: LoadFuture < ' conn , ' query > ,
101- fn ( Conn :: Stream < ' conn , ' query > ) -> Self :: Stream < ' conn > ,
102- >
102+ = utils:: MapOk < Conn :: LoadFuture < ' conn , ' query > , Self :: Stream < ' conn > >
103103 where
104104 Conn : ' conn ;
105105
@@ -112,33 +112,13 @@ pub mod methods {
112112 Conn : ' conn ;
113113
114114 fn internal_load ( self , conn : & mut Conn ) -> Self :: LoadFuture < ' _ > {
115- conn. load ( self )
116- . map_ok ( map_result_stream_future :: < U , _ , _ , DB , ST > )
115+ utils:: MapOk :: new ( conn. load ( self ) , |stream| {
116+ stream. map ( |row| {
117+ U :: build_from_row ( & row?) . map_err ( diesel:: result:: Error :: DeserializationError )
118+ } )
119+ } )
117120 }
118121 }
119-
120- #[ allow( clippy:: type_complexity) ]
121- fn map_result_stream_future < ' s , ' a , U , S , R , DB , ST > (
122- stream : S ,
123- ) -> stream:: Map < S , fn ( QueryResult < R > ) -> QueryResult < U > >
124- where
125- S : Stream < Item = QueryResult < R > > + Send + ' s ,
126- R : diesel:: row:: Row < ' a , DB > + ' s ,
127- DB : Backend + ' static ,
128- U : FromSqlRow < ST , DB > + ' static ,
129- ST : ' static ,
130- {
131- stream. map ( map_row_helper :: < _ , DB , U , ST > )
132- }
133-
134- fn map_row_helper < ' a , R , DB , U , ST > ( row : QueryResult < R > ) -> QueryResult < U >
135- where
136- U : FromSqlRow < ST , DB > ,
137- R : diesel:: row:: Row < ' a , DB > ,
138- DB : Backend ,
139- {
140- U :: build_from_row ( & row?) . map_err ( diesel:: result:: Error :: DeserializationError )
141- }
142122}
143123
144124/// The return types produced by the various [`RunQueryDsl`] methods
@@ -149,37 +129,24 @@ pub mod methods {
149129// the same connection
150130#[ allow( type_alias_bounds) ] // we need these bounds otherwise we cannot use GAT's
151131pub mod return_futures {
132+ use crate :: run_query_dsl:: utils;
133+
152134 use super :: methods:: LoadQuery ;
153- use diesel:: QueryResult ;
154- use futures_util:: { future, stream} ;
135+ use futures_util:: stream;
155136 use std:: pin:: Pin ;
156137
157138 /// The future returned by [`RunQueryDsl::load`](super::RunQueryDsl::load)
158139 /// and [`RunQueryDsl::get_results`](super::RunQueryDsl::get_results)
159140 ///
160141 /// This is essentially `impl Future<Output = QueryResult<Vec<U>>>`
161- pub type LoadFuture < ' conn , ' query , Q : LoadQuery < ' query , Conn , U > , Conn , U > = future:: AndThen <
162- Q :: LoadFuture < ' conn > ,
163- stream:: TryCollect < Q :: Stream < ' conn > , Vec < U > > ,
164- fn ( Q :: Stream < ' conn > ) -> stream:: TryCollect < Q :: Stream < ' conn > , Vec < U > > ,
165- > ;
142+ pub type LoadFuture < ' conn , ' query , Q : LoadQuery < ' query , Conn , U > , Conn , U > =
143+ utils:: AndThen < Q :: LoadFuture < ' conn > , stream:: TryCollect < Q :: Stream < ' conn > , Vec < U > > > ;
166144
167145 /// The future returned by [`RunQueryDsl::get_result`](super::RunQueryDsl::get_result)
168146 ///
169147 /// This is essentially `impl Future<Output = QueryResult<U>>`
170- pub type GetResult < ' conn , ' query , Q : LoadQuery < ' query , Conn , U > , Conn , U > = future:: AndThen <
171- Q :: LoadFuture < ' conn > ,
172- future:: Map <
173- stream:: StreamFuture < Pin < Box < Q :: Stream < ' conn > > > > ,
174- fn ( ( Option < QueryResult < U > > , Pin < Box < Q :: Stream < ' conn > > > ) ) -> QueryResult < U > ,
175- > ,
176- fn (
177- Q :: Stream < ' conn > ,
178- ) -> future:: Map <
179- stream:: StreamFuture < Pin < Box < Q :: Stream < ' conn > > > > ,
180- fn ( ( Option < QueryResult < U > > , Pin < Box < Q :: Stream < ' conn > > > ) ) -> QueryResult < U > ,
181- > ,
182- > ;
148+ pub type GetResult < ' conn , ' query , Q : LoadQuery < ' query , Conn , U > , Conn , U > =
149+ utils:: AndThen < Q :: LoadFuture < ' conn > , utils:: LoadNext < Pin < Box < Q :: Stream < ' conn > > > > > ;
183150}
184151
185152/// Methods used to execute queries.
@@ -346,13 +313,7 @@ pub trait RunQueryDsl<Conn>: Sized {
346313 Conn : AsyncConnectionCore ,
347314 Self : methods:: LoadQuery < ' query , Conn , U > + ' query ,
348315 {
349- fn collect_result < U , S > ( stream : S ) -> stream:: TryCollect < S , Vec < U > >
350- where
351- S : Stream < Item = QueryResult < U > > ,
352- {
353- stream. try_collect ( )
354- }
355- self . internal_load ( conn) . and_then ( collect_result :: < U , _ > )
316+ utils:: AndThen :: new ( self . internal_load ( conn) , |stream| stream. try_collect ( ) )
356317 }
357318
358319 /// Executes the given query, returning a [`Stream`] with the returned rows.
@@ -547,29 +508,9 @@ pub trait RunQueryDsl<Conn>: Sized {
547508 Conn : AsyncConnectionCore ,
548509 Self : methods:: LoadQuery < ' query , Conn , U > + ' query ,
549510 {
550- #[ allow( clippy:: type_complexity) ]
551- fn get_next_stream_element < S , U > (
552- stream : S ,
553- ) -> future:: Map <
554- stream:: StreamFuture < Pin < Box < S > > > ,
555- fn ( ( Option < QueryResult < U > > , Pin < Box < S > > ) ) -> QueryResult < U > ,
556- >
557- where
558- S : Stream < Item = QueryResult < U > > ,
559- {
560- fn map_option_to_result < U , S > (
561- ( o, _) : ( Option < QueryResult < U > > , Pin < Box < S > > ) ,
562- ) -> QueryResult < U > {
563- match o {
564- Some ( s) => s,
565- None => Err ( diesel:: result:: Error :: NotFound ) ,
566- }
567- }
568-
569- Box :: pin ( stream) . into_future ( ) . map ( map_option_to_result)
570- }
571-
572- self . load_stream ( conn) . and_then ( get_next_stream_element)
511+ utils:: AndThen :: new ( self . internal_load ( conn) , |stream| {
512+ utils:: LoadNext :: new ( Box :: pin ( stream) )
513+ } )
573514 }
574515
575516 /// Runs the command, returning an `Vec` with the affected rows.
0 commit comments