Skip to content

Extended query pipeline to execute multiple independent queries in a single batch #2068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ hex = "0.4.3"
tempdir = "0.3.7"
# Needed to test SQLCipher
libsqlite3-sys = { version = "*", features = ["bundled-sqlcipher"] }
# Used to test PgExtendedQueryPipeline
uuid = "1"
futures-util = "0.3"
either = "1.6.1"

#
# Any
Expand Down Expand Up @@ -275,6 +279,11 @@ name = "postgres-test-attr"
path = "tests/postgres/test-attr.rs"
required-features = ["postgres", "macros", "migrate"]

[[test]]
name = "postgres-pipeline"
path = "tests/postgres/pipeline.rs"
required-features = ["postgres", "macros", "migrate", "uuid"]

#
# Microsoft SQL Server (MSSQL)
#
Expand Down
4 changes: 2 additions & 2 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ sqlformat = "0.2.0"
thiserror = "1.0.30"
time = { version = "0.3.2", features = ["macros", "formatting", "parsing"], optional = true }
tokio-stream = { version = "0.1.8", features = ["fs"], optional = true }
smallvec = "1.7.0"
smallvec = { version = "1.7.0", features = ["const_generics"] }
url = { version = "2.2.2", default-features = false }
uuid = { version = "1.0", default-features = false, optional = true, features = ["std"] }
webpki-roots = { version = "0.22.0", optional = true }
Expand All @@ -179,4 +179,4 @@ dotenvy = "0.15"

[dev-dependencies]
sqlx = { version = "0.6.1", path = "..", features = ["postgres", "sqlite", "mysql"] }
tokio = { version = "1", features = ["rt"] }
tokio = { version = "1", features = ["rt", "macros"] }
2 changes: 1 addition & 1 deletion sqlx-core/src/postgres/connection/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl TryFrom<u8> for TypCategory {
}

impl PgConnection {
pub(super) async fn handle_row_description(
pub(in crate::postgres) async fn handle_row_description(
&mut self,
desc: Option<RowDescription>,
should_fetch: bool,
Expand Down
4 changes: 2 additions & 2 deletions sqlx-core/src/postgres/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ impl PgConnection {
Ok(())
}

pub(crate) fn write_sync(&mut self) {
pub(in crate::postgres) fn write_sync(&mut self) {
self.stream.write(message::Sync);

// all SYNC messages will return a ReadyForQuery
self.pending_ready_for_query_count += 1;
}

async fn get_or_prepare<'a>(
pub(in crate::postgres) async fn get_or_prepare<'a>(
&mut self,
sql: &str,
parameters: &[PgTypeInfo],
Expand Down
15 changes: 9 additions & 6 deletions sqlx-core/src/postgres/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct PgConnection {
// underlying TCP or UDS stream,
// wrapped in a potentially TLS stream,
// wrapped in a buffered stream
pub(crate) stream: PgStream,
pub(in crate::postgres) stream: PgStream,

// process id of this backend
// used to send cancel requests
Expand All @@ -56,13 +56,13 @@ pub struct PgConnection {
cache_type_oid: HashMap<UStr, Oid>,

// number of ReadyForQuery messages that we are currently expecting
pub(crate) pending_ready_for_query_count: usize,
pub(in crate::postgres) pending_ready_for_query_count: usize,

// current transaction status
transaction_status: TransactionStatus,
pub(crate) transaction_depth: usize,
pub(in crate::postgres) transaction_depth: usize,

log_settings: LogSettings,
pub(in crate::postgres) log_settings: LogSettings,
}

impl PgConnection {
Expand Down Expand Up @@ -100,7 +100,10 @@ impl PgConnection {
Ok(())
}

fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> {
pub(in crate::postgres) fn handle_ready_for_query(
&mut self,
message: Message,
) -> Result<(), Error> {
self.pending_ready_for_query_count -= 1;
self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status;

Expand All @@ -110,7 +113,7 @@ impl PgConnection {
/// Queue a simple query (not prepared) to execute the next time this connection is used.
///
/// Used for rolling back transactions and releasing advisory locks.
pub(crate) fn queue_simple_query(&mut self, query: &str) {
pub(in crate::postgres) fn queue_simple_query(&mut self, query: &str) {
self.pending_ready_for_query_count += 1;
self.stream.write(Query(query));
}
Expand Down
4 changes: 4 additions & 0 deletions sqlx-core/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod io;
mod listener;
mod message;
mod options;
mod pipeline;
mod query_result;
mod row;
mod statement;
Expand All @@ -37,6 +38,7 @@ pub use error::{PgDatabaseError, PgErrorPosition};
pub use listener::{PgListener, PgNotification};
pub use message::PgSeverity;
pub use options::{PgConnectOptions, PgSslMode};
pub use pipeline::PgExtendedQueryPipeline;
pub use query_result::PgQueryResult;
pub use row::PgRow;
pub use statement::PgStatement;
Expand All @@ -51,6 +53,8 @@ pub type PgPool = crate::pool::Pool<Postgres>;
/// An alias for [`PoolOptions`][crate::pool::PoolOptions], specialized for Postgres.
pub type PgPoolOptions = crate::pool::PoolOptions<Postgres>;

/// An alias for [`Query`][crate::query::Query], specialized for Postgres.
pub type PgQuery<'q> = crate::query::Query<'q, Postgres, PgArguments>;
/// An alias for [`Executor<'_, Database = Postgres>`][Executor].
pub trait PgExecutor<'c>: Executor<'c, Database = Postgres> {}
impl<'c, T: Executor<'c, Database = Postgres>> PgExecutor<'c> for T {}
Expand Down
Loading