Skip to content

Commit acc2ce3

Browse files
committed
add simple query versions of copy operations
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 41f5bac commit acc2ce3

File tree

3 files changed

+60
-16
lines changed

3 files changed

+60
-16
lines changed

tokio-postgres/src/client.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,14 @@ impl Client {
527527
copy_in::copy_in(self.inner(), statement).await
528528
}
529529

530+
/// Executes a `COPY FROM STDIN` query, returning a sink used to write the copy data.
531+
pub async fn copy_in_simple<U>(&self, query: &str) -> Result<CopyInSink<U>, Error>
532+
where
533+
U: Buf + 'static + Send,
534+
{
535+
copy_in::copy_in_simple(self.inner(), query).await
536+
}
537+
530538
/// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
531539
///
532540
/// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
@@ -538,6 +546,11 @@ impl Client {
538546
copy_out::copy_out(self.inner(), statement).await
539547
}
540548

549+
/// Executes a `COPY TO STDOUT` query, returning a stream of the resulting data.
550+
pub async fn copy_out_simple(&self, query: &str) -> Result<CopyOutStream, Error> {
551+
copy_out::copy_out_simple(self.inner(), query).await
552+
}
553+
541554
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
542555
/// data.
543556
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>

tokio-postgres/src/copy_in.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::query::extract_row_affected;
5-
use crate::{query, slice_iter, Error, Statement};
6-
use bytes::{Buf, BufMut, BytesMut};
5+
use crate::{query, simple_query, slice_iter, Error, Statement};
6+
use bytes::{Buf, BufMut, Bytes, BytesMut};
77
use futures_channel::mpsc;
88
use futures_util::{future, ready, Sink, SinkExt, Stream, StreamExt};
99
use log::debug;
@@ -188,14 +188,10 @@ where
188188
}
189189
}
190190

191-
pub async fn copy_in<T>(client: &InnerClient, statement: Statement) -> Result<CopyInSink<T>, Error>
191+
async fn start<T>(client: &InnerClient, buf: Bytes, simple: bool) -> Result<CopyInSink<T>, Error>
192192
where
193193
T: Buf + 'static + Send,
194194
{
195-
debug!("executing copy in statement {}", statement.name());
196-
197-
let buf = query::encode(client, &statement, slice_iter(&[]))?;
198-
199195
let (mut sender, receiver) = mpsc::channel(1);
200196
let receiver = CopyInReceiver::new(receiver);
201197
let mut responses = client.send(RequestMessages::CopyIn(receiver))?;
@@ -205,9 +201,11 @@ where
205201
.await
206202
.map_err(|_| Error::closed())?;
207203

208-
match responses.next().await? {
209-
Message::BindComplete => {}
210-
_ => return Err(Error::unexpected_message()),
204+
if !simple {
205+
match responses.next().await? {
206+
Message::BindComplete => {}
207+
_ => return Err(Error::unexpected_message()),
208+
}
211209
}
212210

213211
match responses.next().await? {
@@ -224,3 +222,23 @@ where
224222
_p2: PhantomData,
225223
})
226224
}
225+
226+
pub async fn copy_in<T>(client: &InnerClient, statement: Statement) -> Result<CopyInSink<T>, Error>
227+
where
228+
T: Buf + 'static + Send,
229+
{
230+
debug!("executing copy in statement {}", statement.name());
231+
232+
let buf = query::encode(client, &statement, slice_iter(&[]))?;
233+
start(client, buf, false).await
234+
}
235+
236+
pub async fn copy_in_simple<T>(client: &InnerClient, query: &str) -> Result<CopyInSink<T>, Error>
237+
where
238+
T: Buf + 'static + Send,
239+
{
240+
debug!("executing copy in query {}", query);
241+
242+
let buf = simple_query::encode(client, query)?;
243+
start(client, buf, true).await
244+
}

tokio-postgres/src/copy_out.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::{query, slice_iter, Error, Statement};
4+
use crate::{query, simple_query, slice_iter, Error, Statement};
55
use bytes::Bytes;
66
use futures_util::{ready, Stream};
77
use log::debug;
@@ -11,23 +11,36 @@ use std::marker::PhantomPinned;
1111
use std::pin::Pin;
1212
use std::task::{Context, Poll};
1313

14+
pub async fn copy_out_simple(client: &InnerClient, query: &str) -> Result<CopyOutStream, Error> {
15+
debug!("executing copy out query {}", query);
16+
17+
let buf = simple_query::encode(client, query)?;
18+
let responses = start(client, buf, true).await?;
19+
Ok(CopyOutStream {
20+
responses,
21+
_p: PhantomPinned,
22+
})
23+
}
24+
1425
pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
1526
debug!("executing copy out statement {}", statement.name());
1627

1728
let buf = query::encode(client, &statement, slice_iter(&[]))?;
18-
let responses = start(client, buf).await?;
29+
let responses = start(client, buf, false).await?;
1930
Ok(CopyOutStream {
2031
responses,
2132
_p: PhantomPinned,
2233
})
2334
}
2435

25-
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
36+
async fn start(client: &InnerClient, buf: Bytes, simple: bool) -> Result<Responses, Error> {
2637
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
2738

28-
match responses.next().await? {
29-
Message::BindComplete => {}
30-
_ => return Err(Error::unexpected_message()),
39+
if !simple {
40+
match responses.next().await? {
41+
Message::BindComplete => {}
42+
_ => return Err(Error::unexpected_message()),
43+
}
3144
}
3245

3346
match responses.next().await? {

0 commit comments

Comments
 (0)