Skip to content

Commit

Permalink
Merge pull request #13996 from mjibson/pgwire-extended
Browse files Browse the repository at this point in the history
adapter: don't optimize read transactions for single statements
  • Loading branch information
maddyblue authored Aug 3, 2022
2 parents b9525a9 + 1c23de2 commit a887755
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 74 deletions.
24 changes: 20 additions & 4 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,18 @@ impl<S: Append + 'static> Coordinator<S> {
// By this point we should be in a running transaction.
TransactionStatus::Default => unreachable!(),

// Started is almost always safe (started means there's a single statement
// being executed). Failed transactions have already been checked in pgwire for
// a safe statement (COMMIT, ROLLBACK, etc.) and can also proceed.
TransactionStatus::Started(_) | TransactionStatus::Failed(_) => {
// Failed transactions have already been checked in pgwire for a safe statement
// (COMMIT, ROLLBACK, etc.) and can proceed.
TransactionStatus::Failed(_) => {}

// Started is a deceptive name, and means different things depending on which
// protocol was used. It's either exactly one statement (known because this
// is the simple protocol and the parser parsed the entire string, and it had
// one statement). Or from the extended protocol, it means *some* query is
// being executed, but there might be others after it before the Sync (commit)
// message. Postgres handles this by teaching Started to eagerly commit certain
// statements that can't be run in a transaction block.
TransactionStatus::Started(_) => {
if let Statement::Declare(_) = stmt {
// Declare is an exception. Although it's not against any spec to execute
// it, it will always result in nothing happening, since all portals will be
Expand All @@ -278,6 +286,14 @@ impl<S: Append + 'static> Coordinator<S> {
session,
);
}

// TODO(mjibson): The current code causes DDL statements (well, any statement
// that doesn't call `add_transaction_ops`) to execute outside of the extended
// protocol transaction. For example, executing in extended a SELECT, then
// CREATE, then SELECT, followed by a Sync would register the transaction
// as read only in the first SELECT, then the CREATE ignores the transaction
// ops, and the last SELECT will use the timestamp from the first. This isn't
// correct, but this is an edge case that we can fix later.
}

// Implicit or explicit transactions.
Expand Down
37 changes: 23 additions & 14 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1792,20 +1792,16 @@ impl<S: Append + 'static> Coordinator<S> {

let timeline = self.validate_timeline(source_ids.clone())?;
let conn_id = session.conn_id();
let in_transaction = matches!(
session.transaction(),
&TransactionStatus::InTransaction(_) | &TransactionStatus::InTransactionImplicit(_)
);
// Queries are independent of the logical timestamp iff there are no referenced
// sources or indexes and there is no reference to `mz_logical_timestamp()`.
let timestamp_independent = source_ids.is_empty() && !source.contains_temporal();
// For explicit or implicit transactions that do not use AS OF, get the
// For queries that do not use AS OF, get the
// timestamp of the in-progress transaction or create one. If this is an AS OF
// query, we don't care about any possible transaction timestamp. If this is a
// single-statement transaction (TransactionStatus::Started), we don't need to
// worry about preventing compaction or choosing a valid timestamp for future
// queries.
let timestamp = if in_transaction && when == QueryWhen::Immediately {
// query, we don't care about any possible transaction timestamp. We do
// not do any optimization of the so-called single-statement transactions
// (TransactionStatus::Started) because multiple statements can actually be
// executed there in the extended protocol.
let timestamp = if when == QueryWhen::Immediately {
// If all previous statements were timestamp-independent and the current one is
// not, clear the transaction ops so it can get a new timestamp and timedomain.
if let Some(read_txn) = self.txn_reads.get(&conn_id) {
Expand Down Expand Up @@ -2404,11 +2400,24 @@ impl<S: Append + 'static> Coordinator<S> {
ExplainStageOld::Timestamp => {
let decorrelated_plan = decorrelate(&mut timings, raw_plan)?;
let optimized_plan = self.view_optimizer.optimize(decorrelated_plan)?;
self.validate_timeline(optimized_plan.depends_on())?;
let timeline = self.validate_timeline(optimized_plan.depends_on())?;
let source_ids = optimized_plan.depends_on();
let id_bundle = self
.index_oracle(compute_instance)
.sufficient_collections(&source_ids);
let id_bundle = if session.vars().transaction_isolation()
== &IsolationLevel::StrictSerializable
&& timeline.is_some()
{
self.index_oracle(compute_instance)
.sufficient_collections(&source_ids)
} else {
// Determine a timestamp that will be valid for anything in any schema
// referenced by the query.
self.timedomain_for(
&source_ids,
&timeline,
session.conn_id(),
compute_instance,
)?
};
// TODO: determine_timestamp takes a mut self to track table linearizability,
// so explaining a plan involving tables has side effects. Removing those side
// effects would be good.
Expand Down
6 changes: 5 additions & 1 deletion src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,11 @@ pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
pub enum TransactionStatus<T> {
/// Idle. Matches `TBLOCK_DEFAULT`.
Default,
/// Running a single-query transaction. Matches `TBLOCK_STARTED`.
/// Running a possibly single-query transaction. Matches
/// `TBLOCK_STARTED`. WARNING: This might not actually be
/// a single statement due to the extended protocol. Thus,
/// we should not perform optimizations based on this.
/// See: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>.
Started(Transaction<T>),
/// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
InTransaction(Transaction<T>),
Expand Down
52 changes: 20 additions & 32 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1353,23 +1353,22 @@ fn test_linearizability() -> Result<(), Box<dyn Error>> {
.runtime
.block_on(pg_client.execute(&format!("INSERT INTO {view_name} VALUES (42);"), &[]))?;

// Create user table in Materialize.
mz_client.batch_execute(&"DROP TABLE IF EXISTS t;")?;
mz_client.batch_execute(&"CREATE TABLE t (a INT);")?;

wait_for_view_population(&mut mz_client, view_name, 1)?;

// The user table's write frontier will be close to zero because we use a deterministic
// now function in this test. It may be slightly higher than zero because bootstrapping
// and background tasks push the global timestamp forward.
// The materialized view's write frontier will be close to the system time because it uses
// the system clock to close timestamps.
// Therefor queries that only involve the view will normally happen at a higher timestamp
// Therefore queries that only involve the view will normally happen at a higher timestamp
// than queries that involve the user table. However, we prevent this when in strict
// serializable mode.

mz_client.batch_execute(&"SET transaction_isolation = serializable")?;
let view_ts = get_explain_timestamp(view_name, &mut mz_client);
// Create user table in Materialize.
mz_client.batch_execute(&"DROP TABLE IF EXISTS t;")?;
mz_client.batch_execute(&"CREATE TABLE t (a INT);")?;
let join_ts = get_explain_timestamp(&format!("{view_name}, t"), &mut mz_client);
// In serializable transaction isolation, read timestamps can go backwards.
assert!(join_ts < view_ts);
Expand All @@ -1381,12 +1380,6 @@ fn test_linearizability() -> Result<(), Box<dyn Error>> {
// equal timestamp in strict serializable mode.
assert!(join_ts >= view_ts);

mz_client.batch_execute(&"SET transaction_isolation = serializable")?;
let view_ts = get_explain_timestamp(view_name, &mut mz_client);
let join_ts = get_explain_timestamp(&format!("{view_name}, t"), &mut mz_client);
// If we go back to serializable, then timestamps can revert again.
assert!(join_ts < view_ts);

cleanup_fn(&mut mz_client, &mut pg_client, &server.runtime)?;

Ok(())
Expand Down Expand Up @@ -1540,27 +1533,22 @@ fn wait_for_view_population(
view_name: &str,
source_rows: i64,
) -> Result<(), Box<dyn Error>> {
let _ = mz_client.query_one(&format!("SET transaction_isolation = SERIALIZABLE"), &[]);
Retry::default()
.max_duration(Duration::from_secs(10))
.retry(|_| {
let rows = mz_client
.query_one(&format!("SELECT COUNT(*) FROM {view_name};"), &[])
.unwrap()
.get::<_, i64>(0);
if rows == source_rows {
Ok(())
} else {
Err(format!(
"Waiting for {source_rows} row to be ingested. Currently at {rows}."
))
}
})
.unwrap();
let _ = mz_client.query_one(
&format!("SET transaction_isolation = 'strict serializable'"),
&[],
);
let mut rows = 0;
while rows != source_rows {
thread::sleep(Duration::from_millis(1));
// This is a bit hacky. We have no way of getting the freshest data in the view, without
// also advancing every other object in the time domain, which we usually want to avoid in
// these tests. Instead we query the view using AS OF a value close to the current system
// clock and hope it gives us fresh enough data.
let now = ((SYSTEM_TIME.as_secs() as EpochMillis) * 1_000) - 100;
rows = mz_client
.query_one(
&format!("SELECT COUNT(*) FROM {view_name} AS OF {now};"),
&[],
)
.map(|row| row.get::<_, i64>(0))
.unwrap_or(0);
}
Ok(())
}

Expand Down
70 changes: 70 additions & 0 deletions test/pgtest-mz/parse-started.pt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Regression test if the global timestamp has progressed during multiple
# Parse/Bind/Execute commands that don't have a Sync between them.

# This is a bit of an edge case in the Postgres protocol documentation and its
# implementation and code comments. The protocol docs say:
#
# At completion of each series of extended-query messages, the frontend should
# issue a Sync message. This parameterless message causes the backend to close
# the current transaction...
#
# In postgres.c's exec_parse_message function, a transaction is started
# via start_xact_command which will put the transaction into TBLOCK_STARTED
# and set a boolean to prevent doing that a second time. The comments for
# TBLOCK_STARTED say "running single-query transaction". The most common
# scenario is after Parse, a user will Bind the portal, Execute the portal,
# then Sync, which commits the transaction.
#
# However it is also completely within spec for clients to issue another
# Parse/Bind/Execute trio before the Sync. The protocol docs (until recently)
# don't specifically describe how the transaction handling should function
# in this situation (in contrast to the simple query workflow, which has
# a long description about how to handle multiple statements in the same
# query string). The comments for TBLOCK_STARTED imply it's a single-query
# transaction, but that is not always the case.
#
# In practice, the Npgsql .NET driver does exactly this on its startup when
# fetching from the catalog tables. Our code previously made assumptions that
# transactions in the TBLOCK_STARTED were always single statement, and would
# thus skip some work. However it is possible that there are other stataments
# issued in that block, so we should instead never perform those kinds of
# optimizations because they are not in fact safe.
#
# The bug motivating this test was caused by sequence_peek adding a
# second transaction operation at a different timestamp to an existing
# transaction. The sleep here was required to force the global timestamp
# forward, which was the cause of the initial panic. See:
# https://github.com/MaterializeInc/materialize/blob/5afec7c55867d1d1a0a8f1e81c088b84dcff81d8/src/adapter/src/coord/sequencer.rs#L1955
#
# See: https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137
# See: https://www.postgresql.org/message-id/flat/17434-d9f7a064ce2a88a3%40postgresql.org

send
Parse {"query": "SELECT 1 FROM pg_type LIMIT 1"}
Bind
Execute
Parse {"query": "SELECT mz_internal.mz_sleep(1) FROM pg_type LIMIT 1"}
Bind
Execute
Parse {"query": "SELECT 1 FROM pg_type LIMIT 1"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
ParseComplete
BindComplete
DataRow {"fields":["1"]}
CommandComplete {"tag":"SELECT 1"}
ParseComplete
BindComplete
DataRow {"fields":["NULL"]}
CommandComplete {"tag":"SELECT 1"}
ParseComplete
BindComplete
DataRow {"fields":["1"]}
CommandComplete {"tag":"SELECT 1"}
ReadyForQuery {"status":"I"}
90 changes: 90 additions & 0 deletions test/pgtest/ddl-extended.pt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Test what happens with rollback'd DDLs in extended protocol.
#
# Note: This only works in versions of Postgres with
# https://github.com/postgres/postgres/commit/f92944137cdec3e80e826879d817a2d3dff68b5f
# applied, which at the time of writing is none of them. The following output
# was generated by a Postgres server built from source that included that
# commit.

send
Query {"query": "DROP DATABASE IF EXISTS a"}
Query {"query": "DROP DATABASE IF EXISTS b"}
----

until ignore=NoticeResponse
ReadyForQuery
ReadyForQuery
----
CommandComplete {"tag":"DROP DATABASE"}
ReadyForQuery {"status":"I"}
CommandComplete {"tag":"DROP DATABASE"}
ReadyForQuery {"status":"I"}

send
Parse {"query": "SELECT 1 FROM pg_type LIMIT 0"}
Bind
Execute
Parse {"query": "CREATE DATABASE a"}
Bind
Execute
Parse {"query": "ROLLBACK"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
ParseComplete
BindComplete
CommandComplete {"tag":"SELECT 0"}
ParseComplete
BindComplete
CommandComplete {"tag":"CREATE DATABASE"}
ParseComplete
BindComplete
NoticeResponse {"fields":[{"typ":"S","value":"WARNING"},{"typ":"C","value":"25P01"},{"typ":"M","value":"there is no transaction in progress"}]}
CommandComplete {"tag":"ROLLBACK"}
ReadyForQuery {"status":"I"}

send
Parse {"query": "CREATE DATABASE b"}
Bind
Execute
Parse {"query": "SELECT 1 FROM pg_type LIMIT 0"}
Bind
Execute
Parse {"query": "ROLLBACK"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
ParseComplete
BindComplete
CommandComplete {"tag":"CREATE DATABASE"}
ParseComplete
BindComplete
CommandComplete {"tag":"SELECT 0"}
ParseComplete
BindComplete
NoticeResponse {"fields":[{"typ":"S","value":"WARNING"},{"typ":"C","value":"25P01"},{"typ":"M","value":"there is no transaction in progress"}]}
CommandComplete {"tag":"ROLLBACK"}
ReadyForQuery {"status":"I"}

send
Query {"query": "SELECT datname FROM pg_catalog.pg_database WHERE datname IN ('a', 'b') ORDER BY datname"}
----

until
ReadyForQuery
----
RowDescription {"fields":[{"name":"datname"}]}
DataRow {"fields":["a"]}
DataRow {"fields":["b"]}
CommandComplete {"tag":"SELECT 2"}
ReadyForQuery {"status":"I"}
23 changes: 13 additions & 10 deletions test/sqllogictest/dates-times.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@

mode cockroach

statement ok
CREATE VIEW logical_timestamp_view AS SELECT mz_logical_timestamp()

# Equivalent to running `SELECT mz_logical_timestamp()` directly IF AND ONLY IF
# there are no other objects in the same time domain, and has the same somewhat
# confusing output when there are no dependencies on any views. If there are
# other objects in the same time domain as the view, they will dictate what is
# returned.
query T
SELECT * FROM logical_timestamp_view
----
18446744073709551615

statement ok
CREATE TABLE dateish (
a DATE
Expand Down Expand Up @@ -760,16 +773,6 @@ SELECT current_timestamp > TIMESTAMP '2016-06-13 00:00:00'
----
true

statement ok
CREATE VIEW logical_timestamp_view AS SELECT mz_logical_timestamp()

# Equivalent to running `SELECT mz_logical_timestamp()` directly and has the
# same somewhat confusing output when there are no dependencies on any views.
query T
SELECT * FROM logical_timestamp_view
----
18446744073709551615

statement ok
CREATE VIEW now_view AS SELECT now() AS ts

Expand Down
Loading

0 comments on commit a887755

Please sign in to comment.