From bd13644d5d9f326930f16f6ffe6fb176294bfef5 Mon Sep 17 00:00:00 2001 From: Matt Jibson Date: Mon, 1 Aug 2022 21:55:13 -0600 Subject: [PATCH 1/9] adapter: don't optimize read transactions for single statements This fixes the csharp test flakes. See parse-started.pt for the full explanation. --- src/adapter/src/coord/command_handler.rs | 24 +++++-- src/adapter/src/coord/sequencer.rs | 16 ++--- src/adapter/src/session.rs | 6 +- test/pgtest-mz/parse-started.pt | 70 ++++++++++++++++++ test/pgtest/ddl-extended.pt | 90 ++++++++++++++++++++++++ 5 files changed, 191 insertions(+), 15 deletions(-) create mode 100644 test/pgtest-mz/parse-started.pt create mode 100644 test/pgtest/ddl-extended.pt diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 73c5c7e3e0a3..ce976bdf189a 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -261,10 +261,18 @@ impl Coordinator { // By this point we should be in a running transaction. TransactionStatus::Default => unreachable!(), - // Started is almost always safe (started means there's a single statement - // being executed). Failed transactions have already been checked in pgwire for - // a safe statement (COMMIT, ROLLBACK, etc.) and can also proceed. - TransactionStatus::Started(_) | TransactionStatus::Failed(_) => { + // Failed transactions have already been checked in pgwire for a safe statement + // (COMMIT, ROLLBACK, etc.) and can proceed. + TransactionStatus::Failed(_) => {} + + // Started is a deceptive name, and means different things depending on which + // protocol was used. It's either exactly one statement (known because this + // is the simple protocol and the parser parsed the entire string, and it had + // one statement). Or from the extended protocol, it means *some* query is + // being executed, but there might be others after it before the Sync (commit) + // message. Postgres handles this by teaching Started to eagerly commit certain + // statements that can't be run in a transaction block. + TransactionStatus::Started(_) => { if let Statement::Declare(_) = stmt { // Declare is an exception. Although it's not against any spec to execute // it, it will always result in nothing happening, since all portals will be @@ -277,6 +285,14 @@ impl Coordinator { session, ); } + + // TODO(mjibson): The current code causes DDL statements (well, any statement + // that doesn't call `add_transaction_ops`) to execute outside of the extended + // protocol transaction. For example, executing in extended a SELECT, then + // CREATE, then SELECT, followed by a Sync would register the transaction + // as read only in the first SELECT, then the CREATE ignores the transaction + // ops, and the last SELECT will use the timestamp from the first. This isn't + // correct, but this is an edge case that we can fix later. } // Implicit or explicit transactions. diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 38bf298d61d4..ab5b783af246 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -1780,20 +1780,16 @@ impl Coordinator { let timeline = self.validate_timeline(source_ids.clone())?; let conn_id = session.conn_id(); - let in_transaction = matches!( - session.transaction(), - &TransactionStatus::InTransaction(_) | &TransactionStatus::InTransactionImplicit(_) - ); // Queries are independent of the logical timestamp iff there are no referenced // sources or indexes and there is no reference to `mz_logical_timestamp()`. let timestamp_independent = source_ids.is_empty() && !source.contains_temporal(); - // For explicit or implicit transactions that do not use AS OF, get the + // For queries that do not use AS OF, get the // timestamp of the in-progress transaction or create one. If this is an AS OF - // query, we don't care about any possible transaction timestamp. If this is a - // single-statement transaction (TransactionStatus::Started), we don't need to - // worry about preventing compaction or choosing a valid timestamp for future - // queries. - let timestamp = if in_transaction && when == QueryWhen::Immediately { + // query, we don't care about any possible transaction timestamp. We do + // not do any optimization of the so-called single-statement transactions + // (TransactionStatus::Started) because multiple statements can actually be + // executed there in the extended protocol. + let timestamp = if when == QueryWhen::Immediately { // If all previous statements were timestamp-independent and the current one is // not, clear the transaction ops so it can get a new timestamp and timedomain. if let Some(read_txn) = self.txn_reads.get(&conn_id) { diff --git a/src/adapter/src/session.rs b/src/adapter/src/session.rs index 92bad0e61d18..58b6ced93e19 100644 --- a/src/adapter/src/session.rs +++ b/src/adapter/src/session.rs @@ -541,7 +541,11 @@ pub type RowBatchStream = UnboundedReceiver; pub enum TransactionStatus { /// Idle. Matches `TBLOCK_DEFAULT`. Default, - /// Running a single-query transaction. Matches `TBLOCK_STARTED`. + /// Running a possibly single-query transaction. Matches + /// `TBLOCK_STARTED`. WARNING: This might not actually be + /// a single statement due to the extended protocol. Thus, + /// we should not perform optimizations based on this. + /// See: . Started(Transaction), /// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`. InTransaction(Transaction), diff --git a/test/pgtest-mz/parse-started.pt b/test/pgtest-mz/parse-started.pt new file mode 100644 index 000000000000..56516dc5f89c --- /dev/null +++ b/test/pgtest-mz/parse-started.pt @@ -0,0 +1,70 @@ +# Regression test if the global timestamp has progressed during multiple +# Parse/Bind/Execute commands that don't have a Sync between them. + +# This is a bit of an edge case in the Postgres protocol documentation and its +# implementation and code comments. The protocol docs say: +# +# At completion of each series of extended-query messages, the frontend should +# issue a Sync message. This parameterless message causes the backend to close +# the current transaction... +# +# In postgres.c's exec_parse_message function, a transaction is started +# via start_xact_command which will put the transaction into TBLOCK_STARTED +# and set a boolean to prevent doing that a second time. The comments for +# TBLOCK_STARTED say "running single-query transaction". The most common +# scenario is after Parse, a user will Bind the portal, Execute the portal, +# then Sync, which commits the transaction. +# +# However it is also completely within spec for clients to issue another +# Parse/Bind/Execute trio before the Sync. The protocol docs (until recently) +# don't specifically describe how the transaction handling should function +# in this situation (in contrast to the simple query workflow, which has +# a long description about how to handle multiple statements in the same +# query string). The comments for TBLOCK_STARTED imply it's a single-query +# transaction, but that is not always the case. +# +# In practice, the Npgsql .NET driver does exactly this on its startup when +# fetching from the catalog tables. Our code previously made assumptions that +# transactions in the TBLOCK_STARTED were always single statement, and would +# thus skip some work. However it is possible that there are other stataments +# issued in that block, so we should instead never perform those kinds of +# optimizations because they are not in fact safe. +# +# The bug motivating this test was caused by sequence_peek adding a +# second transaction operation at a different timestamp to an existing +# transaction. The sleep here was required to force the global timestamp +# forward, which was the cause of the initial panic. See: +# https://github.com/MaterializeInc/materialize/blob/5afec7c55867d1d1a0a8f1e81c088b84dcff81d8/src/adapter/src/coord/sequencer.rs#L1955 +# +# See: https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137 +# See: https://www.postgresql.org/message-id/flat/17434-d9f7a064ce2a88a3%40postgresql.org + +send +Parse {"query": "SELECT 1 FROM pg_type LIMIT 1"} +Bind +Execute +Parse {"query": "SELECT mz_internal.mz_sleep(1) FROM pg_type LIMIT 1"} +Bind +Execute +Parse {"query": "SELECT 1 FROM pg_type LIMIT 1"} +Bind +Execute +Sync +---- + +until +ReadyForQuery +---- +ParseComplete +BindComplete +DataRow {"fields":["1"]} +CommandComplete {"tag":"SELECT 1"} +ParseComplete +BindComplete +DataRow {"fields":["NULL"]} +CommandComplete {"tag":"SELECT 1"} +ParseComplete +BindComplete +DataRow {"fields":["1"]} +CommandComplete {"tag":"SELECT 1"} +ReadyForQuery {"status":"I"} diff --git a/test/pgtest/ddl-extended.pt b/test/pgtest/ddl-extended.pt new file mode 100644 index 000000000000..f9cea9dd0f41 --- /dev/null +++ b/test/pgtest/ddl-extended.pt @@ -0,0 +1,90 @@ +# Test what happens with rollback'd DDLs in extended protocol. +# +# Note: This only works in versions of Postgres with +# https://github.com/postgres/postgres/commit/f92944137cdec3e80e826879d817a2d3dff68b5f +# applied, which at the time of writing is none of them. The following output +# was generated by a Postgres server built from source that included that +# commit. + +send +Query {"query": "DROP DATABASE IF EXISTS a"} +Query {"query": "DROP DATABASE IF EXISTS b"} +---- + +until ignore=NoticeResponse +ReadyForQuery +ReadyForQuery +---- +CommandComplete {"tag":"DROP DATABASE"} +ReadyForQuery {"status":"I"} +CommandComplete {"tag":"DROP DATABASE"} +ReadyForQuery {"status":"I"} + +send +Parse {"query": "SELECT 1 FROM pg_type LIMIT 0"} +Bind +Execute +Parse {"query": "CREATE DATABASE a"} +Bind +Execute +Parse {"query": "ROLLBACK"} +Bind +Execute +Sync +---- + +until +ReadyForQuery +---- +ParseComplete +BindComplete +CommandComplete {"tag":"SELECT 0"} +ParseComplete +BindComplete +CommandComplete {"tag":"CREATE DATABASE"} +ParseComplete +BindComplete +NoticeResponse {"fields":[{"typ":"S","value":"WARNING"},{"typ":"C","value":"25P01"},{"typ":"M","value":"there is no transaction in progress"}]} +CommandComplete {"tag":"ROLLBACK"} +ReadyForQuery {"status":"I"} + +send +Parse {"query": "CREATE DATABASE b"} +Bind +Execute +Parse {"query": "SELECT 1 FROM pg_type LIMIT 0"} +Bind +Execute +Parse {"query": "ROLLBACK"} +Bind +Execute +Sync +---- + +until +ReadyForQuery +---- +ParseComplete +BindComplete +CommandComplete {"tag":"CREATE DATABASE"} +ParseComplete +BindComplete +CommandComplete {"tag":"SELECT 0"} +ParseComplete +BindComplete +NoticeResponse {"fields":[{"typ":"S","value":"WARNING"},{"typ":"C","value":"25P01"},{"typ":"M","value":"there is no transaction in progress"}]} +CommandComplete {"tag":"ROLLBACK"} +ReadyForQuery {"status":"I"} + +send +Query {"query": "SELECT datname FROM pg_catalog.pg_database WHERE datname IN ('a', 'b') ORDER BY datname"} +---- + +until +ReadyForQuery +---- +RowDescription {"fields":[{"name":"datname"}]} +DataRow {"fields":["a"]} +DataRow {"fields":["b"]} +CommandComplete {"tag":"SELECT 2"} +ReadyForQuery {"status":"I"} From bdb9c63978e0c1c8668e08109bf0f4c175a2a8a4 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 3 Aug 2022 11:38:31 -0400 Subject: [PATCH 2/9] Fix tests --- test/sqllogictest/dates-times.slt | 23 ++++++++++++--------- test/testdrive/numeric.td | 2 +- test/testdrive/temporal.td | 2 +- test/testdrive/timestamps-debezium-kafka.td | 11 ---------- 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/test/sqllogictest/dates-times.slt b/test/sqllogictest/dates-times.slt index c7d1373e6ad1..a05755ff928e 100644 --- a/test/sqllogictest/dates-times.slt +++ b/test/sqllogictest/dates-times.slt @@ -9,6 +9,19 @@ mode cockroach +statement ok +CREATE VIEW logical_timestamp_view AS SELECT mz_logical_timestamp() + +# Equivalent to running `SELECT mz_logical_timestamp()` directly IF AND ONLY IF +# there are no other objects in the same time domain, and has the same somewhat +# confusing output when there are no dependencies on any views. If there are +# other objects in the same time domain as the view, they will dictate what is +# returned. +query T +SELECT * FROM logical_timestamp_view +---- +18446744073709551615 + statement ok CREATE TABLE dateish ( a DATE @@ -760,16 +773,6 @@ SELECT current_timestamp > TIMESTAMP '2016-06-13 00:00:00' ---- true -statement ok -CREATE VIEW logical_timestamp_view AS SELECT mz_logical_timestamp() - -# Equivalent to running `SELECT mz_logical_timestamp()` directly and has the -# same somewhat confusing output when there are no dependencies on any views. -query T -SELECT * FROM logical_timestamp_view ----- -18446744073709551615 - statement ok CREATE VIEW now_view AS SELECT now() AS ts diff --git a/test/testdrive/numeric.td b/test/testdrive/numeric.td index 178d971cfc09..a55c1b39f9c9 100644 --- a/test/testdrive/numeric.td +++ b/test/testdrive/numeric.td @@ -37,7 +37,7 @@ a # statement ok > CREATE OR REPLACE MATERIALIZED VIEW numeric_cast_ok AS SELECT 1 - WHERE mz_logical_timestamp() > 1927418240000::numeric(38,0); + WHERE mz_logical_timestamp() < 18446744073709551615::numeric(38,0); > SELECT * FROM numeric_cast_ok; 1 diff --git a/test/testdrive/temporal.td b/test/testdrive/temporal.td index 4ba8aa7b5485..cc2f61165ff9 100644 --- a/test/testdrive/temporal.td +++ b/test/testdrive/temporal.td @@ -277,7 +277,7 @@ contains:Unsupported temporal predicate. Note: `mz_logical_timestamp()` must be > CREATE OR REPLACE MATERIALIZED VIEW numeric_trunc AS SELECT 1 - WHERE mz_logical_timestamp() > 1927418240000.1; + WHERE mz_logical_timestamp() <= 18446744073709551614.1; > SELECT * FROM numeric_trunc; 1 diff --git a/test/testdrive/timestamps-debezium-kafka.td b/test/testdrive/timestamps-debezium-kafka.td index 00d67c6033f4..2ff7247bd38a 100644 --- a/test/testdrive/timestamps-debezium-kafka.td +++ b/test/testdrive/timestamps-debezium-kafka.td @@ -189,17 +189,6 @@ b sum ------ 1 10 -# Under weaker isolation levels we can see the row at time 2 before all objects have closed time 2. -> SET transaction_isolation = serializable - -> SELECT * FROM bar; -b sum ------- -1 10 -3 30 - -> SET transaction_isolation = 'strict serializable' - > SELECT * FROM join; b foo_sum bar_sum ----------------- From 6d5844170a618faf617041456ced0cbe24418178 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 3 Aug 2022 12:05:13 -0400 Subject: [PATCH 3/9] Fix explain timestamp --- src/adapter/src/coord/sequencer.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 0362586901d1..beae6e914436 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -2400,11 +2400,16 @@ impl Coordinator { ExplainStageOld::Timestamp => { let decorrelated_plan = decorrelate(&mut timings, raw_plan)?; let optimized_plan = self.view_optimizer.optimize(decorrelated_plan)?; - self.validate_timeline(optimized_plan.depends_on())?; + let timeline = self.validate_timeline(optimized_plan.depends_on())?; let source_ids = optimized_plan.depends_on(); - let id_bundle = self - .index_oracle(compute_instance) - .sufficient_collections(&source_ids); + // Determine a timestamp that will be valid for anything in any schema + // referenced by the query. + let id_bundle = self.timedomain_for( + &source_ids, + &timeline, + session.conn_id(), + compute_instance, + )?; // TODO: determine_timestamp takes a mut self to track table linearizability, // so explaining a plan involving tables has side effects. Removing those side // effects would be good. From 38034a8342f028a5087bc47dc91cacf6d2394d93 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 3 Aug 2022 12:52:19 -0400 Subject: [PATCH 4/9] Fix test --- test/testdrive/csv-sources.td | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/testdrive/csv-sources.td b/test/testdrive/csv-sources.td index 26312f6174d0..64fc6567bd65 100644 --- a/test/testdrive/csv-sources.td +++ b/test/testdrive/csv-sources.td @@ -137,7 +137,7 @@ contains:Expected a list of columns in parentheses, found EOF # since the definition of "static" means "will never change again". $ set-regex match=(\d{13}|u\d{1,3}) replacement=<> > EXPLAIN TIMESTAMP FOR SELECT * FROM static_csv -" timestamp: <>\n since:[<>]\n upper:[]\n has table: false\n\nsource materialize.public.static_csv (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n" +" timestamp: <>\n since:[<>]\n upper:[]\n has table: false\n\nsource materialize.public.mismatched_column_count (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.matching_column_names (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.matching_column_names_alias (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.mismatched_column_names (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.mismatched_column_names_count (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.static_csv (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n" # Static CSV with manual headers. > CREATE SOURCE static_csv_manual_header (city_man, state_man, zip_man) From 9d4bc483a6ad345e2eac7ad8d9fb2d114ed7893b Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 3 Aug 2022 12:53:52 -0400 Subject: [PATCH 5/9] Fix source waiting in tests --- src/environmentd/tests/sql.rs | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index 0778b19f0d6a..2053dcb309aa 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -1311,7 +1311,7 @@ fn test_timeline_read_holds() -> Result<(), Box> { .block_on(pg_client.execute(&format!("INSERT INTO {view_name} VALUES (42);"), &[]))?; } - wait_for_view_population(&mut mz_client, view_name, source_rows)?; + wait_for_view_population(&mut mz_client, view_name, source_rows, Arc::clone(&now))?; // Make sure that the table and view are joinable immediately at some timestamp. let mut mz_join_client = server.connect(postgres::NoTls)?; @@ -1357,7 +1357,7 @@ fn test_linearizability() -> Result<(), Box> { mz_client.batch_execute(&"DROP TABLE IF EXISTS t;")?; mz_client.batch_execute(&"CREATE TABLE t (a INT);")?; - wait_for_view_population(&mut mz_client, view_name, 1)?; + wait_for_view_population(&mut mz_client, view_name, 1, Arc::clone(&now))?; // The user table's write frontier will be close to zero because we use a deterministic // now function in this test. It may be slightly higher than zero because bootstrapping @@ -1539,24 +1539,22 @@ fn wait_for_view_population( mz_client: &mut postgres::Client, view_name: &str, source_rows: i64, + now: Arc>, ) -> Result<(), Box> { let _ = mz_client.query_one(&format!("SET transaction_isolation = SERIALIZABLE"), &[]); - Retry::default() - .max_duration(Duration::from_secs(10)) - .retry(|_| { - let rows = mz_client - .query_one(&format!("SELECT COUNT(*) FROM {view_name};"), &[]) - .unwrap() - .get::<_, i64>(0); - if rows == source_rows { - Ok(()) - } else { - Err(format!( - "Waiting for {source_rows} row to be ingested. Currently at {rows}." - )) - } - }) - .unwrap(); + let mut rows = 0; + let mut current_ts = (SYSTEM_TIME.as_secs() as EpochMillis) * 1_000; + while rows != source_rows { + // Keep increasing `now` until we can see the source data. + current_ts += 1_000; + *now.lock().expect("lock poisoned") = current_ts; + thread::sleep(Duration::from_millis(1)); + rows = mz_client + .query_one(&format!("SELECT COUNT(*) FROM {view_name};"), &[]) + .unwrap() + .get::<_, i64>(0); + get_explain_timestamp(view_name, mz_client); + } let _ = mz_client.query_one( &format!("SET transaction_isolation = 'strict serializable'"), &[], From 14a26c18f91b94198f33c43a839b4bb57fbe7594 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 3 Aug 2022 13:22:58 -0400 Subject: [PATCH 6/9] Fix linearizability test --- src/environmentd/tests/sql.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index 2053dcb309aa..f73b808570ef 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -1353,10 +1353,6 @@ fn test_linearizability() -> Result<(), Box> { .runtime .block_on(pg_client.execute(&format!("INSERT INTO {view_name} VALUES (42);"), &[]))?; - // Create user table in Materialize. - mz_client.batch_execute(&"DROP TABLE IF EXISTS t;")?; - mz_client.batch_execute(&"CREATE TABLE t (a INT);")?; - wait_for_view_population(&mut mz_client, view_name, 1, Arc::clone(&now))?; // The user table's write frontier will be close to zero because we use a deterministic @@ -1370,6 +1366,9 @@ fn test_linearizability() -> Result<(), Box> { mz_client.batch_execute(&"SET transaction_isolation = serializable")?; let view_ts = get_explain_timestamp(view_name, &mut mz_client); + // Create user table in Materialize. + mz_client.batch_execute(&"DROP TABLE IF EXISTS t;")?; + mz_client.batch_execute(&"CREATE TABLE t (a INT);")?; let join_ts = get_explain_timestamp(&format!("{view_name}, t"), &mut mz_client); // In serializable transaction isolation, read timestamps can go backwards. assert!(join_ts < view_ts); @@ -1381,12 +1380,6 @@ fn test_linearizability() -> Result<(), Box> { // equal timestamp in strict serializable mode. assert!(join_ts >= view_ts); - mz_client.batch_execute(&"SET transaction_isolation = serializable")?; - let view_ts = get_explain_timestamp(view_name, &mut mz_client); - let join_ts = get_explain_timestamp(&format!("{view_name}, t"), &mut mz_client); - // If we go back to serializable, then timestamps can revert again. - assert!(join_ts < view_ts); - cleanup_fn(&mut mz_client, &mut pg_client, &server.runtime)?; Ok(()) From 5bd3806f2591dbc6e6229e0d04ed319c050f8d55 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 3 Aug 2022 15:37:13 -0400 Subject: [PATCH 7/9] Fix sql tests --- src/environmentd/tests/sql.rs | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index f73b808570ef..2c301a9db6f6 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -1311,7 +1311,7 @@ fn test_timeline_read_holds() -> Result<(), Box> { .block_on(pg_client.execute(&format!("INSERT INTO {view_name} VALUES (42);"), &[]))?; } - wait_for_view_population(&mut mz_client, view_name, source_rows, Arc::clone(&now))?; + wait_for_view_population(&mut mz_client, view_name, source_rows)?; // Make sure that the table and view are joinable immediately at some timestamp. let mut mz_join_client = server.connect(postgres::NoTls)?; @@ -1353,14 +1353,14 @@ fn test_linearizability() -> Result<(), Box> { .runtime .block_on(pg_client.execute(&format!("INSERT INTO {view_name} VALUES (42);"), &[]))?; - wait_for_view_population(&mut mz_client, view_name, 1, Arc::clone(&now))?; + wait_for_view_population(&mut mz_client, view_name, 1)?; // The user table's write frontier will be close to zero because we use a deterministic // now function in this test. It may be slightly higher than zero because bootstrapping // and background tasks push the global timestamp forward. // The materialized view's write frontier will be close to the system time because it uses // the system clock to close timestamps. - // Therefor queries that only involve the view will normally happen at a higher timestamp + // Therefore queries that only involve the view will normally happen at a higher timestamp // than queries that involve the user table. However, we prevent this when in strict // serializable mode. @@ -1532,26 +1532,23 @@ fn wait_for_view_population( mz_client: &mut postgres::Client, view_name: &str, source_rows: i64, - now: Arc>, ) -> Result<(), Box> { - let _ = mz_client.query_one(&format!("SET transaction_isolation = SERIALIZABLE"), &[]); let mut rows = 0; - let mut current_ts = (SYSTEM_TIME.as_secs() as EpochMillis) * 1_000; while rows != source_rows { - // Keep increasing `now` until we can see the source data. - current_ts += 1_000; - *now.lock().expect("lock poisoned") = current_ts; thread::sleep(Duration::from_millis(1)); + // This is a bit hacky. We have no way of getting the freshest data in the view, without + // also advancing every other object in the time domain, which we usually want to avoid in + // these tests. Instead we query the view using AS OF a value close to the current system + // clock and hope it gives us fresh enough data. + let now = ((SYSTEM_TIME.as_secs() as EpochMillis) * 1_000) - 100; rows = mz_client - .query_one(&format!("SELECT COUNT(*) FROM {view_name};"), &[]) - .unwrap() - .get::<_, i64>(0); - get_explain_timestamp(view_name, mz_client); + .query_one( + &format!("SELECT COUNT(*) FROM {view_name} AS OF {now};"), + &[], + ) + .map(|row| row.get::<_, i64>(0)) + .unwrap_or(0); } - let _ = mz_client.query_one( - &format!("SET transaction_isolation = 'strict serializable'"), - &[], - ); Ok(()) } From fede456c44382331362d8c2ae4122f90a069ff48 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 3 Aug 2022 15:55:41 -0400 Subject: [PATCH 8/9] Clean up explain timestamp --- src/adapter/src/coord/sequencer.rs | 24 ++++++++++++++++-------- test/testdrive/csv-sources.td | 2 +- test/testdrive/explain-timestamps.td | 24 ++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 9 deletions(-) create mode 100644 test/testdrive/explain-timestamps.td diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index beae6e914436..0bf512681a2c 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -2402,14 +2402,22 @@ impl Coordinator { let optimized_plan = self.view_optimizer.optimize(decorrelated_plan)?; let timeline = self.validate_timeline(optimized_plan.depends_on())?; let source_ids = optimized_plan.depends_on(); - // Determine a timestamp that will be valid for anything in any schema - // referenced by the query. - let id_bundle = self.timedomain_for( - &source_ids, - &timeline, - session.conn_id(), - compute_instance, - )?; + let id_bundle = if session.vars().transaction_isolation() + == &IsolationLevel::StrictSerializable + && timeline.is_some() + { + self.index_oracle(compute_instance) + .sufficient_collections(&source_ids) + } else { + // Determine a timestamp that will be valid for anything in any schema + // referenced by the query. + self.timedomain_for( + &source_ids, + &timeline, + session.conn_id(), + compute_instance, + )? + }; // TODO: determine_timestamp takes a mut self to track table linearizability, // so explaining a plan involving tables has side effects. Removing those side // effects would be good. diff --git a/test/testdrive/csv-sources.td b/test/testdrive/csv-sources.td index 64fc6567bd65..0d6555946ec9 100644 --- a/test/testdrive/csv-sources.td +++ b/test/testdrive/csv-sources.td @@ -137,7 +137,7 @@ contains:Expected a list of columns in parentheses, found EOF # since the definition of "static" means "will never change again". $ set-regex match=(\d{13}|u\d{1,3}) replacement=<> > EXPLAIN TIMESTAMP FOR SELECT * FROM static_csv -" timestamp: <>\n since:[<>]\n upper:[]\n has table: false\n\nsource materialize.public.mismatched_column_count (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.matching_column_names (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.matching_column_names_alias (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.mismatched_column_names (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.mismatched_column_names_count (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n\nsource materialize.public.static_csv (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n" +" timestamp: <>\n since:[<>]\n upper:[]\n has table: false\n\nsource materialize.public.static_csv (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n" # Static CSV with manual headers. > CREATE SOURCE static_csv_manual_header (city_man, state_man, zip_man) diff --git a/test/testdrive/explain-timestamps.td b/test/testdrive/explain-timestamps.td new file mode 100644 index 000000000000..78f2d0843aee --- /dev/null +++ b/test/testdrive/explain-timestamps.td @@ -0,0 +1,24 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set-regex match=(\d{13}|u\d{1,3}) replacement= + +> CREATE TABLE t1 (a INT); + +> CREATE TABLE t2 (a INT); + +# Strict serializable doesn't look at every object in the same time domain +> SET TRANSACTION_ISOLATION = 'STRICT SERIALIZABLE'; +> EXPLAIN TIMESTAMP FOR SELECT * FROM t1 +" timestamp: \n since:[]\n upper:[]\n has table: true\n table read ts: \n\nsource materialize.public.t1 (, storage):\n read frontier:[]\nwrite frontier:[]\n" + +# Serializable does look at every object in the same time domain +> SET TRANSACTION_ISOLATION = 'SERIALIZABLE'; +> EXPLAIN TIMESTAMP FOR SELECT * FROM t1 +" timestamp: \n since:[]\n upper:[]\n has table: true\n table read ts: \n\nsource materialize.public.t1 (, storage):\n read frontier:[]\nwrite frontier:[]\n\nsource materialize.public.t2 (, storage):\n read frontier:[]\nwrite frontier:[]\n" From 1c23de2ba5ddca7c26d7cd939bb805d841e4359c Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 3 Aug 2022 16:02:35 -0400 Subject: [PATCH 9/9] Fix csv test --- test/testdrive/csv-sources.td | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/testdrive/csv-sources.td b/test/testdrive/csv-sources.td index 0d6555946ec9..26312f6174d0 100644 --- a/test/testdrive/csv-sources.td +++ b/test/testdrive/csv-sources.td @@ -137,7 +137,7 @@ contains:Expected a list of columns in parentheses, found EOF # since the definition of "static" means "will never change again". $ set-regex match=(\d{13}|u\d{1,3}) replacement=<> > EXPLAIN TIMESTAMP FOR SELECT * FROM static_csv -" timestamp: <>\n since:[<>]\n upper:[]\n has table: false\n\nsource materialize.public.static_csv (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n" +" timestamp: <>\n since:[<>]\n upper:[]\n has table: false\n\nsource materialize.public.static_csv (<>, storage):\n read frontier:[<>]\nwrite frontier:[]\n" # Static CSV with manual headers. > CREATE SOURCE static_csv_manual_header (city_man, state_man, zip_man)