Skip to content

Commit

Permalink
coord: Optimize single statement read transaction (MaterializeInc#15255)
Browse files Browse the repository at this point in the history
The PostgreSQL extended wire protocol
(https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY)
leaves it ambiguous as to whether or not the current statement will be
single statement implicit transaction or a multi statement implicit
transaction. Due to this ambiguity, we previously removed all of our
single statement read transactions in
a887755. This ended up negatively
impacting query freshness and latency in the Serializable isolation
level.

PostgreSQL will eagerly commit implicit transactions after executing
certain statements but before receiving a Sync message. See
https://www.postgresql.org/message-id/flat/17434-d9f7a064ce2a88a3%40postgresql.org
and
https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137.

This commit will eagerly commit ALL statements in an implicit
transaction when using the extended protocol. This vastly simplifies
things and allows us to re-apply the optimizations we previously made
to single statement transactions.

Multi-statement implicit transactions using the extended protocol are
extremely rare and an edge case. They are not possible unless a client
uses pipelining in psql or a custom driver. The PostgreSQL BEGIN
documentation even wrongly implies that they don't exist
(https://www.postgresql.org/docs/current/sql-begin.html). Therefore,
it's much better to apply the optimizations in the common case than to
allow this edge case feature.

Much of this commit is reversing certain parts of
a887755.

Fixes #14696
Fixes #14038
  • Loading branch information
jkosh44 authored Oct 11, 2022
1 parent 0739b02 commit 8404cde
Show file tree
Hide file tree
Showing 17 changed files with 347 additions and 151 deletions.
35 changes: 12 additions & 23 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2033,13 +2033,15 @@ impl<S: Append + 'static> Coordinator<S> {
// Queries are independent of the logical timestamp iff there are no referenced
// sources or indexes and there is no reference to `mz_now()`.
let timestamp_independent = source_ids.is_empty() && !source.contains_temporal();
// For queries that do not use AS OF, get the
// For transactions that do not use AS OF, get the
// timestamp of the in-progress transaction or create one. If this is an AS OF
// query, we don't care about any possible transaction timestamp. We do
// not do any optimization of the so-called single-statement transactions
// (TransactionStatus::Started) because multiple statements can actually be
// executed there in the extended protocol.
let timestamp = if when == QueryWhen::Immediately {
// query, we don't care about any possible transaction timestamp. If this is a
// single-statement transaction (TransactionStatus::Started), we don't need to
// worry about preventing compaction or choosing a valid timestamp for future
// queries.
let timestamp = if session.transaction().is_in_multi_statement_transaction()
&& when == QueryWhen::Immediately
{
// If all previous statements were timestamp-independent and the current one is
// not, clear the transaction ops so it can get a new timestamp and timedomain.
if let Some(read_txn) = self.txn_reads.get(&conn_id) {
Expand Down Expand Up @@ -2656,23 +2658,10 @@ impl<S: Append + 'static> Coordinator<S> {
let optimized_plan = self.view_optimizer.optimize(decorrelated_plan)?;
let timeline = self.validate_timeline(optimized_plan.depends_on())?;
let source_ids = optimized_plan.depends_on();
let id_bundle = if session.vars().transaction_isolation()
== &IsolationLevel::StrictSerializable
&& timeline.is_some()
{
self.index_oracle(compute_instance)
.sufficient_collections(&source_ids)
} else {
// Determine a timestamp that will be valid for anything in any schema
// referenced by the query.
self.timedomain_for(
&source_ids,
&timeline,
session.conn_id(),
compute_instance,
)?
};
// TODO: determine_timestamp takes a mut self to track table linearizability,
let id_bundle = self
.index_oracle(compute_instance)
.sufficient_collections(&source_ids);
// TODO: determine_timestamp takes a mut self to track linearizability,
// so explaining a plan involving tables has side effects. Removing those side
// effects would be good.
let timestamp = self.determine_timestamp(
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/timestamp_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl<S: Append + 'static> Coordinator<S> {
.collect::<Vec<_>>();
format!(
"Timestamp ({}) is not valid for all inputs: {:?}",
candidate, invalid
candidate, invalid,
)
}
}
26 changes: 21 additions & 5 deletions src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,11 +616,15 @@ pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
pub enum TransactionStatus<T> {
/// Idle. Matches `TBLOCK_DEFAULT`.
Default,
/// Running a possibly single-query transaction. Matches
/// `TBLOCK_STARTED`. WARNING: This might not actually be
/// a single statement due to the extended protocol. Thus,
/// we should not perform optimizations based on this.
/// See: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>.
/// Running a single-query transaction. Matches
/// `TBLOCK_STARTED`. In PostgreSQL, when using the extended query protocol, this
/// may be upgraded into multi-statement implicit query (see [`Self::InTransactionImplicit`]).
/// Additionally, some statements may trigger an eager commit of the implicit transaction,
/// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>. In
/// Materialize however, we eagerly commit all statements outside of an explicit transaction
/// when using the extended query protocol. Therefore, we can guarantee that this state will
/// always be a single-query transaction and never be upgraded into a multi-statement implicit
/// query.
Started(Transaction<T>),
/// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
InTransaction(Transaction<T>),
Expand Down Expand Up @@ -679,6 +683,18 @@ impl<T> TransactionStatus<T> {
}
}

/// Whether the transaction may contain multiple statements.
pub fn is_in_multi_statement_transaction(&self) -> bool {
match self {
TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
true
}
TransactionStatus::Default
| TransactionStatus::Started(_)
| TransactionStatus::Failed(_) => false,
}
}

/// Grants the write lock to the inner transaction.
///
/// # Panics
Expand Down
45 changes: 29 additions & 16 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,12 @@ fn test_linearizability() -> Result<(), Box<dyn Error>> {
// equal timestamp in strict serializable mode.
assert!(join_ts >= view_ts);

mz_client.batch_execute("SET transaction_isolation = serializable")?;
let view_ts = get_explain_timestamp(view_name, &mut mz_client);
let join_ts = get_explain_timestamp(&format!("{view_name}, t"), &mut mz_client);
// If we go back to serializable, then timestamps can revert again.
assert!(join_ts < view_ts);

cleanup_fn(&mut mz_client, &mut pg_client, &server.runtime)?;

Ok(())
Expand Down Expand Up @@ -1731,22 +1737,29 @@ fn wait_for_view_population(
view_name: &str,
source_rows: i64,
) -> Result<(), Box<dyn Error>> {
let mut rows = 0;
while rows != source_rows {
thread::sleep(Duration::from_millis(1));
// This is a bit hacky. We have no way of getting the freshest data in the view, without
// also advancing every other object in the time domain, which we usually want to avoid in
// these tests. Instead we query the view using AS OF a value close to the current system
// clock and hope it gives us fresh enough data.
let now = ((SYSTEM_TIME.as_secs() as EpochMillis) * 1_000) - 100;
rows = mz_client
.query_one(
&format!("SELECT COUNT(*) FROM {view_name} AS OF {now};"),
&[],
)
.map(|row| row.get::<_, i64>(0))
.unwrap_or(0);
}
let current_isolation = mz_client
.query_one("SHOW transaction_isolation", &[])?
.get::<_, String>(0);
mz_client.batch_execute("SET transaction_isolation = SERIALIZABLE")?;
Retry::default()
.max_duration(Duration::from_secs(10))
.retry(|_| {
let rows = mz_client
.query_one(&format!("SELECT COUNT(*) FROM {view_name};"), &[])
.unwrap()
.get::<_, i64>(0);
if rows == source_rows {
Ok(())
} else {
Err(format!(
"Waiting for {source_rows} row to be ingested. Currently at {rows}."
))
}
})
.unwrap();
mz_client.batch_execute(&format!(
"SET transaction_isolation = '{current_isolation}'"
))?;
Ok(())
}

Expand Down
32 changes: 23 additions & 9 deletions src/pgwire/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,29 @@ where
let execute_root_span =
tracing::debug_span!(parent: None, "advance_ready", otel.name = message_name);
execute_root_span.follows_from(tracing::Span::current());
self.execute(
portal_name,
max_rows,
portal_exec_message,
None,
ExecuteTimeout::None,
)
.instrument(execute_root_span)
.await?
let state = self
.execute(
portal_name,
max_rows,
portal_exec_message,
None,
ExecuteTimeout::None,
)
.instrument(execute_root_span)
.await?;
// Close the current transaction if we are in an implicit transaction.
// In PostgreSQL, when using the extended query protocol, some statements may
// trigger an eager commit of the current implicit transaction,
// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>.
// In Materialize however, we eagerly commit all statements outside of an explicit
// transaction when using the extended query protocol. This allows us to remove
// the ambiguity between multiple and single statement implicit transactions when
// using the extended query protocol and apply some optimizations to single
// statement transactions.
if self.adapter_client.session().transaction().is_implicit() {
self.commit_transaction().await?;
}
state
}
Some(FrontendMessage::DescribeStatement { name }) => {
self.describe_statement(&name).await?
Expand Down
55 changes: 55 additions & 0 deletions test/pgtest-mz/ddl-extended.pt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Test that multiple DDL statements in the same extended request are supported
send
Parse {"query": "CREATE TABLE a (a int)"}
Bind
Execute
Parse {"query": "CREATE TABLE b (a int)"}
Bind
Execute
Parse {"query": "INSERT INTO a VALUES (1)"}
Bind
Execute
Parse {"query": "SELECT 1/0"}
Bind
Execute
Sync
Query {"query": "SELECT * FROM a"}
----

until
ReadyForQuery
ReadyForQuery
----
ParseComplete
BindComplete
CommandComplete {"tag":"CREATE TABLE"}
ParseComplete
BindComplete
CommandComplete {"tag":"CREATE TABLE"}
ParseComplete
BindComplete
CommandComplete {"tag":"INSERT 0 1"}
ParseComplete
BindComplete
ErrorResponse {"fields":[{"typ":"S","value":"ERROR"},{"typ":"C","value":"XX000"},{"typ":"M","value":"division by zero"}]}
ReadyForQuery {"status":"I"}
RowDescription {"fields":[{"name":"a"}]}
DataRow {"fields":["1"]}
CommandComplete {"tag":"SELECT 1"}
ReadyForQuery {"status":"I"}

send
Parse {"query": "SELECT * FROM a"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
ParseComplete
BindComplete
DataRow {"fields":["1"]}
CommandComplete {"tag":"SELECT 1"}
ReadyForQuery {"status":"I"}
4 changes: 2 additions & 2 deletions test/pgtest-mz/parse-started.pt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# fetching from the catalog tables. Our code previously made assumptions that
# transactions in the TBLOCK_STARTED were always single statement, and would
# thus skip some work. However it is possible that there are other stataments
# issued in that block, so we should instead never perform those kinds of
# optimizations because they are not in fact safe.
# issued in that block, so we eagerly commit all statements after receiving an
# Execute message.
#
# The bug motivating this test was caused by sequence_peek adding a
# second transaction operation at a different timestamp to an existing
Expand Down
40 changes: 40 additions & 0 deletions test/pgtest-mz/portals.pt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# In Materialize we eagerly commint implicit transactions after Execute
# messages, causing portals to be destroyed.
send
Parse {"query": "VALUES (1), (2)"}
Bind {"portal": "c"}
Execute {"portal": "c", "max_rows": 1}
Execute {"portal": "c", "max_rows": 1}
Sync
----

until
ReadyForQuery
----
ParseComplete
BindComplete
DataRow {"fields":["1"]}
PortalSuspended
ErrorResponse {"fields":[{"typ":"S","value":"ERROR"},{"typ":"C","value":"34000"},{"typ":"M","value":"portal \"c\" does not exist"}]}
ReadyForQuery {"status":"I"}

# Verify that portals (cursors) are destroyed on Execute.
send
Parse {"query": "VALUES (1), (2)"}
Bind {"portal": "c"}
Execute {"portal": "c", "max_rows": 1}
Sync
Query {"query": "FETCH c"}
----

until
ReadyForQuery
ReadyForQuery
----
ParseComplete
BindComplete
DataRow {"fields":["1"]}
PortalSuspended
ReadyForQuery {"status":"I"}
ErrorResponse {"fields":[{"typ":"S","value":"ERROR"},{"typ":"C","value":"34000"},{"typ":"M","value":"cursor \"c\" does not exist"}]}
ReadyForQuery {"status":"I"}
Loading

0 comments on commit 8404cde

Please sign in to comment.