Skip to content
7 changes: 7 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5412,6 +5412,10 @@ impl Catalog {
self.state.resolve_builtin_cluster(cluster)
}

pub fn get_mz_introspections_cluster_id(&self) -> &ClusterId {
&self.resolve_builtin_cluster(&MZ_INTROSPECTION_CLUSTER).id
}

/// Resolves a [`Cluster`] for a TargetCluster.
pub fn resolve_target_cluster(
&self,
Expand All @@ -5423,6 +5427,9 @@ impl Catalog {
Ok(self.resolve_builtin_cluster(&MZ_INTROSPECTION_CLUSTER))
}
TargetCluster::Active => self.active_cluster(session),
TargetCluster::Transaction(cluster_id) => self
.try_get_cluster(cluster_id)
.ok_or(AdapterError::ConcurrentClusterDrop),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ pub enum TargetCluster {
Introspection,
/// The current user's active cluster.
Active,
/// The cluster selected at the start of a transaction.
Transaction(ClusterId),
}

/// A struct to hold information about the validity of plans and if they should be abandoned after
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/src/coord/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ pub fn auto_run_on_introspection<'a, 's, 'p>(
| Plan::SideEffectingFunc(_) => return TargetCluster::Active,
};

// Use transaction cluster if we're mid-transaction
if let Some(cluster_id) = session.transaction().cluster() {
return TargetCluster::Transaction(cluster_id);
}

// Bail if the user has disabled it via the SessionVar.
if !session.vars().auto_route_introspection_queries() {
return TargetCluster::Active;
Expand Down
26 changes: 23 additions & 3 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,9 @@ impl Coordinator {
if &name == TRANSACTION_ISOLATION_VAR_NAME {
self.validate_set_isolation_level(session)?;
}
if &name == CLUSTER_VAR_NAME {
self.validate_set_cluster(session)?;
}

let vars = session.vars_mut();
let values = match plan.value {
Expand Down Expand Up @@ -1830,6 +1833,9 @@ impl Coordinator {
if &name == TRANSACTION_ISOLATION_VAR_NAME {
self.validate_set_isolation_level(session)?;
}
if &name == CLUSTER_VAR_NAME {
self.validate_set_cluster(session)?;
}
session
.vars_mut()
.reset(Some(self.catalog().system_config()), &name, false)?;
Expand Down Expand Up @@ -1873,6 +1879,14 @@ impl Coordinator {
}
}

fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
if session.transaction().contains_ops() {
Err(AdapterError::InvalidSetCluster)
} else {
Ok(())
}
}

pub(super) fn sequence_end_transaction(
&mut self,
mut ctx: ExecuteContext,
Expand Down Expand Up @@ -1911,7 +1925,7 @@ impl Coordinator {
});
return;
}
Ok((Some(TransactionOps::Peeks(determination)), _))
Ok((Some(TransactionOps::Peeks { determination, .. }), _))
if ctx.session().vars().transaction_isolation()
== &IsolationLevel::StrictSerializable =>
{
Expand Down Expand Up @@ -2562,11 +2576,17 @@ impl Coordinator {
// depend on whether or not reads have occurred in the txn.
let mut transaction_determination = determination.clone();
if when.is_transactional() {
session.add_transaction_ops(TransactionOps::Peeks(transaction_determination))?;
session.add_transaction_ops(TransactionOps::Peeks {
determination: transaction_determination,
cluster_id,
})?;
} else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
// If the query uses AS OF, then ignore the timestamp.
transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
session.add_transaction_ops(TransactionOps::Peeks(transaction_determination))?;
session.add_transaction_ops(TransactionOps::Peeks {
determination: transaction_determination,
cluster_id,
})?;
};

Ok(determination)
Expand Down
20 changes: 13 additions & 7 deletions src/adapter/src/coord/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,28 +805,34 @@ impl Coordinator {
schemas.insert((&name.qualifiers.database_spec, &name.qualifiers.schema_spec));
}

// If any of the system schemas is specified, add the rest of the
// system schemas.
let pg_catalog_schema = (
&ResolvedDatabaseSpecifier::Ambient,
&SchemaSpecifier::Id(self.catalog().get_pg_catalog_schema_id().clone()),
);
let system_schemas = [
(
&ResolvedDatabaseSpecifier::Ambient,
&SchemaSpecifier::Id(self.catalog().get_mz_catalog_schema_id().clone()),
),
(
&ResolvedDatabaseSpecifier::Ambient,
&SchemaSpecifier::Id(self.catalog().get_pg_catalog_schema_id().clone()),
&SchemaSpecifier::Id(self.catalog().get_mz_internal_schema_id().clone()),
),
pg_catalog_schema.clone(),
(
&ResolvedDatabaseSpecifier::Ambient,
&SchemaSpecifier::Id(self.catalog().get_information_schema_id().clone()),
),
(
&ResolvedDatabaseSpecifier::Ambient,
&SchemaSpecifier::Id(self.catalog().get_mz_internal_schema_id().clone()),
),
];

if system_schemas.iter().any(|s| schemas.contains(s)) {
// If any of the system schemas is specified, add the rest of the
// system schemas.
schemas.extend(system_schemas);
} else if !schemas.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like having both of these if blocks. Feels like the right tradeoff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

// Always include the pg_catalog schema, if schemas is non-empty. The pg_catalog schemas is
// sometimes used by applications in followup queries.
schemas.insert(pg_catalog_schema);
}

// Gather the IDs of all items in all used schemas.
Expand Down
12 changes: 12 additions & 0 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub enum AdapterError {
},
/// SET TRANSACTION ISOLATION LEVEL was called in the middle of a transaction.
InvalidSetIsolationLevel,
/// SET cluster was called in the middle of a transaction.
InvalidSetCluster,
/// No such storage instance size has been configured.
InvalidStorageClusterSize {
size: String,
Expand All @@ -99,6 +101,8 @@ pub enum AdapterError {
},
/// Expression violated a column's constraint
ConstraintViolation(NotNullViolation),
/// Transaction cluster was dropped in the middle of a transaction.
ConcurrentClusterDrop,
/// Target cluster has no replicas to service query.
NoClusterReplicasAvailable(String),
/// The named operation cannot be run in a transaction.
Expand Down Expand Up @@ -382,10 +386,12 @@ impl AdapterError {
AdapterError::InvalidClusterReplicaAz { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::InvalidClusterReplicaSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::InvalidSetIsolationLevel => SqlState::ACTIVE_SQL_TRANSACTION,
AdapterError::InvalidSetCluster => SqlState::ACTIVE_SQL_TRANSACTION,
AdapterError::InvalidStorageClusterSize { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::SourceOrSinkSizeRequired { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::InvalidTableMutationSelection => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::ConstraintViolation(NotNullViolation(_)) => SqlState::NOT_NULL_VIOLATION,
AdapterError::ConcurrentClusterDrop => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::NoClusterReplicasAvailable(_) => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::OperationProhibitsTransaction(_) => SqlState::ACTIVE_SQL_TRANSACTION,
AdapterError::OperationRequiresTransaction(_) => SqlState::NO_ACTIVE_SQL_TRANSACTION,
Expand Down Expand Up @@ -499,6 +505,9 @@ impl fmt::Display for AdapterError {
f,
"SET TRANSACTION ISOLATION LEVEL must be called before any query"
),
AdapterError::InvalidSetCluster => {
write!(f, "SET cluster cannot be called in an active transaction")
}
AdapterError::InvalidStorageClusterSize { size, .. } => {
write!(f, "unknown source size {size}")
}
Expand All @@ -511,6 +520,9 @@ impl fmt::Display for AdapterError {
AdapterError::ConstraintViolation(not_null_violation) => {
write!(f, "{}", not_null_violation)
}
AdapterError::ConcurrentClusterDrop => {
write!(f, "the transaction's active cluster has been dropped")
}
AdapterError::NoClusterReplicasAvailable(cluster) => {
write!(
f,
Expand Down
72 changes: 57 additions & 15 deletions src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use derivative::Derivative;
use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
use mz_controller_types::ClusterId;
use mz_ore::now::EpochMillis;
use mz_pgrepr::Format;
use mz_repr::role_id::RoleId;
Expand Down Expand Up @@ -224,7 +225,7 @@ impl<T: TimestampManipulation> Session<T> {
// - Currently in `READ ONLY`
// - Already performed a query
let read_write_prohibited = match txn.ops {
TransactionOps::Peeks(_) | TransactionOps::Subscribe => {
TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
txn.access == Some(TransactionAccessMode::ReadOnly)
}
TransactionOps::None
Expand Down Expand Up @@ -424,7 +425,7 @@ impl<T: TimestampManipulation> Session<T> {
/// anomalies will occur if cleared.
pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
if let TransactionOps::Peeks(_) = ops {
if let TransactionOps::Peeks { .. } = ops {
let ops = std::mem::take(ops);
Some(
ops.timestamp_determination()
Expand All @@ -447,7 +448,7 @@ impl<T: TimestampManipulation> Session<T> {
match self.transaction.inner() {
Some(Transaction {
pcx: _,
ops: TransactionOps::Peeks(determination),
ops: TransactionOps::Peeks { determination, .. },
write_lock_guard: _,
access: _,
id: _,
Expand All @@ -462,10 +463,13 @@ impl<T: TimestampManipulation> Session<T> {
self.transaction.inner(),
Some(Transaction {
pcx: _,
ops: TransactionOps::Peeks(TimestampDetermination {
timestamp_context: TimestampContext::TimelineTimestamp(_, _),
ops: TransactionOps::Peeks {
determination: TimestampDetermination {
timestamp_context: TimestampContext::TimelineTimestamp(_, _),
..
},
..
}),
},
write_lock_guard: _,
access: _,
id: _,
Expand Down Expand Up @@ -953,6 +957,17 @@ impl<T: TimestampManipulation> TransactionStatus<T> {
}
}

/// The cluster of the transaction, if one exists.
pub fn cluster(&self) -> Option<ClusterId> {
match self {
TransactionStatus::Default => None,
TransactionStatus::Started(txn)
| TransactionStatus::InTransaction(txn)
| TransactionStatus::InTransactionImplicit(txn)
| TransactionStatus::Failed(txn) => txn.cluster(),
}
}

/// Reports whether any operations have been executed as part of this transaction
pub fn contains_ops(&self) -> bool {
match self.inner() {
Expand All @@ -978,8 +993,15 @@ impl<T: TimestampManipulation> TransactionStatus<T> {
}
*ops = add_ops;
}
TransactionOps::Peeks(determination) => match add_ops {
TransactionOps::Peeks(add_timestamp_determination) => {
TransactionOps::Peeks {
determination,
cluster_id,
} => match add_ops {
TransactionOps::Peeks {
determination: add_timestamp_determination,
cluster_id: add_cluster_id,
} => {
assert_eq!(*cluster_id, add_cluster_id);
match (
&determination.timestamp_context,
&add_timestamp_determination.timestamp_context,
Expand Down Expand Up @@ -1029,7 +1051,7 @@ impl<T: TimestampManipulation> TransactionStatus<T> {
}
// Iff peeks do not have a timestamp (i.e. they are
// constant), we can permit them.
TransactionOps::Peeks(determination)
TransactionOps::Peeks { determination, .. }
if !determination.timestamp_context.contains_timestamp() => {}
_ => {
return Err(AdapterError::WriteOnlyTransaction);
Expand Down Expand Up @@ -1084,18 +1106,33 @@ impl<T> Transaction<T> {
/// The timeline of the transaction, if one exists.
fn timeline(&self) -> Option<Timeline> {
match &self.ops {
TransactionOps::Peeks(TimestampDetermination {
timestamp_context: TimestampContext::TimelineTimestamp(timeline, _),
TransactionOps::Peeks {
determination:
TimestampDetermination {
timestamp_context: TimestampContext::TimelineTimestamp(timeline, _),
..
},
..
}) => Some(timeline.clone()),
TransactionOps::Peeks(_)
} => Some(timeline.clone()),
TransactionOps::Peeks { .. }
| TransactionOps::None
| TransactionOps::Subscribe
| TransactionOps::Writes(_)
| TransactionOps::SingleStatement { .. } => None,
}
}

/// The cluster of the transaction, if one exists.
pub fn cluster(&self) -> Option<ClusterId> {
match &self.ops {
TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
TransactionOps::None
| TransactionOps::Subscribe
| TransactionOps::Writes(_)
| TransactionOps::SingleStatement { .. } => None,
}
}

/// Reports whether any operations have been executed as part of this transaction
fn contains_ops(&self) -> bool {
!matches!(self.ops, TransactionOps::None)
Expand Down Expand Up @@ -1156,7 +1193,12 @@ pub enum TransactionOps<T> {
/// is has a timestamp, it must only do other peeks. However, if it doesn't
/// have a timestamp (i.e. the values are constants), the transaction can still
/// perform writes.
Peeks(TimestampDetermination<T>),
Peeks {
/// The timestamp and timestamp related metadata for the peek.
determination: TimestampDetermination<T>,
/// The cluster used to execute peeks.
cluster_id: ClusterId,
},
/// This transaction has done a `SUBSCRIBE` and must do nothing else.
Subscribe,
/// This transaction has had a write (`INSERT`, `UPDATE`, `DELETE`) and must
Expand All @@ -1174,7 +1216,7 @@ pub enum TransactionOps<T> {
impl<T> TransactionOps<T> {
fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
match self {
TransactionOps::Peeks(determination) => Some(determination),
TransactionOps::Peeks { determination, .. } => Some(determination),
TransactionOps::None
| TransactionOps::Subscribe
| TransactionOps::Writes(_)
Expand Down
33 changes: 33 additions & 0 deletions src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3034,3 +3034,36 @@ fn copy_from() {
.expect("success");
assert_eq!(rows.len(), 2);
}

// Test that a cluster dropped mid transaction results in an error.
#[mz_ore::test]
#[cfg_attr(miri, ignore)] // too slow
fn concurrent_cluster_drop() {
let server = util::start_server(util::Config::default()).unwrap();
let mut txn_client = server.connect(postgres::NoTls).unwrap();
let mut drop_client = server.connect(postgres::NoTls).unwrap();

txn_client
.execute("CREATE CLUSTER c REPLICAS (r1 (SIZE '1'));", &[])
.expect("failed to create cluster");
txn_client
.execute("CREATE TABLE t (a INT);", &[])
.expect("failed to create cluster");

txn_client
.execute("SET CLUSTER TO c", &[])
.expect("success");
txn_client.execute("BEGIN", &[]).expect("success");
let _ = txn_client.query("SELECT * FROM t", &[]).expect("success");

drop_client.execute("DROP CLUSTER c", &[]).expect("success");

let err = txn_client
.execute("SELECT * FROM t", &[])
.expect_err("error");

assert_eq!(
err.as_db_error().unwrap().message(),
"the transaction's active cluster has been dropped"
);
}
Loading