Skip to content

Commit

Permalink
adapter: don't optimize read transactions for single statements
Browse files Browse the repository at this point in the history
This fixes the csharp test flakes. See parse-started.pt for the full
explanation.
  • Loading branch information
maddyblue committed Aug 2, 2022
1 parent 9850d79 commit bd13644
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 15 deletions.
24 changes: 20 additions & 4 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,18 @@ impl<S: Append + 'static> Coordinator<S> {
// By this point we should be in a running transaction.
TransactionStatus::Default => unreachable!(),

// Started is almost always safe (started means there's a single statement
// being executed). Failed transactions have already been checked in pgwire for
// a safe statement (COMMIT, ROLLBACK, etc.) and can also proceed.
TransactionStatus::Started(_) | TransactionStatus::Failed(_) => {
// Failed transactions have already been checked in pgwire for a safe statement
// (COMMIT, ROLLBACK, etc.) and can proceed.
TransactionStatus::Failed(_) => {}

// Started is a deceptive name, and means different things depending on which
// protocol was used. It's either exactly one statement (known because this
// is the simple protocol and the parser parsed the entire string, and it had
// one statement). Or from the extended protocol, it means *some* query is
// being executed, but there might be others after it before the Sync (commit)
// message. Postgres handles this by teaching Started to eagerly commit certain
// statements that can't be run in a transaction block.
TransactionStatus::Started(_) => {
if let Statement::Declare(_) = stmt {
// Declare is an exception. Although it's not against any spec to execute
// it, it will always result in nothing happening, since all portals will be
Expand All @@ -277,6 +285,14 @@ impl<S: Append + 'static> Coordinator<S> {
session,
);
}

// TODO(mjibson): The current code causes DDL statements (well, any statement
// that doesn't call `add_transaction_ops`) to execute outside of the extended
// protocol transaction. For example, executing in extended a SELECT, then
// CREATE, then SELECT, followed by a Sync would register the transaction
// as read only in the first SELECT, then the CREATE ignores the transaction
// ops, and the last SELECT will use the timestamp from the first. This isn't
// correct, but this is an edge case that we can fix later.
}

// Implicit or explicit transactions.
Expand Down
16 changes: 6 additions & 10 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1780,20 +1780,16 @@ impl<S: Append + 'static> Coordinator<S> {

let timeline = self.validate_timeline(source_ids.clone())?;
let conn_id = session.conn_id();
let in_transaction = matches!(
session.transaction(),
&TransactionStatus::InTransaction(_) | &TransactionStatus::InTransactionImplicit(_)
);
// Queries are independent of the logical timestamp iff there are no referenced
// sources or indexes and there is no reference to `mz_logical_timestamp()`.
let timestamp_independent = source_ids.is_empty() && !source.contains_temporal();
// For explicit or implicit transactions that do not use AS OF, get the
// For queries that do not use AS OF, get the
// timestamp of the in-progress transaction or create one. If this is an AS OF
// query, we don't care about any possible transaction timestamp. If this is a
// single-statement transaction (TransactionStatus::Started), we don't need to
// worry about preventing compaction or choosing a valid timestamp for future
// queries.
let timestamp = if in_transaction && when == QueryWhen::Immediately {
// query, we don't care about any possible transaction timestamp. We do
// not do any optimization of the so-called single-statement transactions
// (TransactionStatus::Started) because multiple statements can actually be
// executed there in the extended protocol.
let timestamp = if when == QueryWhen::Immediately {
// If all previous statements were timestamp-independent and the current one is
// not, clear the transaction ops so it can get a new timestamp and timedomain.
if let Some(read_txn) = self.txn_reads.get(&conn_id) {
Expand Down
6 changes: 5 additions & 1 deletion src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,11 @@ pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
pub enum TransactionStatus<T> {
/// Idle. Matches `TBLOCK_DEFAULT`.
Default,
/// Running a single-query transaction. Matches `TBLOCK_STARTED`.
/// Running a possibly single-query transaction. Matches
/// `TBLOCK_STARTED`. WARNING: This might not actually be
/// a single statement due to the extended protocol. Thus,
/// we should not perform optimizations based on this.
/// See: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>.
Started(Transaction<T>),
/// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
InTransaction(Transaction<T>),
Expand Down
70 changes: 70 additions & 0 deletions test/pgtest-mz/parse-started.pt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Regression test if the global timestamp has progressed during multiple
# Parse/Bind/Execute commands that don't have a Sync between them.

# This is a bit of an edge case in the Postgres protocol documentation and its
# implementation and code comments. The protocol docs say:
#
# At completion of each series of extended-query messages, the frontend should
# issue a Sync message. This parameterless message causes the backend to close
# the current transaction...
#
# In postgres.c's exec_parse_message function, a transaction is started
# via start_xact_command which will put the transaction into TBLOCK_STARTED
# and set a boolean to prevent doing that a second time. The comments for
# TBLOCK_STARTED say "running single-query transaction". The most common
# scenario is after Parse, a user will Bind the portal, Execute the portal,
# then Sync, which commits the transaction.
#
# However it is also completely within spec for clients to issue another
# Parse/Bind/Execute trio before the Sync. The protocol docs (until recently)
# don't specifically describe how the transaction handling should function
# in this situation (in contrast to the simple query workflow, which has
# a long description about how to handle multiple statements in the same
# query string). The comments for TBLOCK_STARTED imply it's a single-query
# transaction, but that is not always the case.
#
# In practice, the Npgsql .NET driver does exactly this on its startup when
# fetching from the catalog tables. Our code previously made assumptions that
# transactions in the TBLOCK_STARTED were always single statement, and would
# thus skip some work. However it is possible that there are other stataments
# issued in that block, so we should instead never perform those kinds of
# optimizations because they are not in fact safe.
#
# The bug motivating this test was caused by sequence_peek adding a
# second transaction operation at a different timestamp to an existing
# transaction. The sleep here was required to force the global timestamp
# forward, which was the cause of the initial panic. See:
# https://github.com/MaterializeInc/materialize/blob/5afec7c55867d1d1a0a8f1e81c088b84dcff81d8/src/adapter/src/coord/sequencer.rs#L1955
#
# See: https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137
# See: https://www.postgresql.org/message-id/flat/17434-d9f7a064ce2a88a3%40postgresql.org

send
Parse {"query": "SELECT 1 FROM pg_type LIMIT 1"}
Bind
Execute
Parse {"query": "SELECT mz_internal.mz_sleep(1) FROM pg_type LIMIT 1"}
Bind
Execute
Parse {"query": "SELECT 1 FROM pg_type LIMIT 1"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
ParseComplete
BindComplete
DataRow {"fields":["1"]}
CommandComplete {"tag":"SELECT 1"}
ParseComplete
BindComplete
DataRow {"fields":["NULL"]}
CommandComplete {"tag":"SELECT 1"}
ParseComplete
BindComplete
DataRow {"fields":["1"]}
CommandComplete {"tag":"SELECT 1"}
ReadyForQuery {"status":"I"}
90 changes: 90 additions & 0 deletions test/pgtest/ddl-extended.pt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Test what happens with rollback'd DDLs in extended protocol.
#
# Note: This only works in versions of Postgres with
# https://github.com/postgres/postgres/commit/f92944137cdec3e80e826879d817a2d3dff68b5f
# applied, which at the time of writing is none of them. The following output
# was generated by a Postgres server built from source that included that
# commit.

send
Query {"query": "DROP DATABASE IF EXISTS a"}
Query {"query": "DROP DATABASE IF EXISTS b"}
----

until ignore=NoticeResponse
ReadyForQuery
ReadyForQuery
----
CommandComplete {"tag":"DROP DATABASE"}
ReadyForQuery {"status":"I"}
CommandComplete {"tag":"DROP DATABASE"}
ReadyForQuery {"status":"I"}

send
Parse {"query": "SELECT 1 FROM pg_type LIMIT 0"}
Bind
Execute
Parse {"query": "CREATE DATABASE a"}
Bind
Execute
Parse {"query": "ROLLBACK"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
ParseComplete
BindComplete
CommandComplete {"tag":"SELECT 0"}
ParseComplete
BindComplete
CommandComplete {"tag":"CREATE DATABASE"}
ParseComplete
BindComplete
NoticeResponse {"fields":[{"typ":"S","value":"WARNING"},{"typ":"C","value":"25P01"},{"typ":"M","value":"there is no transaction in progress"}]}
CommandComplete {"tag":"ROLLBACK"}
ReadyForQuery {"status":"I"}

send
Parse {"query": "CREATE DATABASE b"}
Bind
Execute
Parse {"query": "SELECT 1 FROM pg_type LIMIT 0"}
Bind
Execute
Parse {"query": "ROLLBACK"}
Bind
Execute
Sync
----

until
ReadyForQuery
----
ParseComplete
BindComplete
CommandComplete {"tag":"CREATE DATABASE"}
ParseComplete
BindComplete
CommandComplete {"tag":"SELECT 0"}
ParseComplete
BindComplete
NoticeResponse {"fields":[{"typ":"S","value":"WARNING"},{"typ":"C","value":"25P01"},{"typ":"M","value":"there is no transaction in progress"}]}
CommandComplete {"tag":"ROLLBACK"}
ReadyForQuery {"status":"I"}

send
Query {"query": "SELECT datname FROM pg_catalog.pg_database WHERE datname IN ('a', 'b') ORDER BY datname"}
----

until
ReadyForQuery
----
RowDescription {"fields":[{"name":"datname"}]}
DataRow {"fields":["a"]}
DataRow {"fields":["b"]}
CommandComplete {"tag":"SELECT 2"}
ReadyForQuery {"status":"I"}

0 comments on commit bd13644

Please sign in to comment.