Skip to content

Commit

Permalink
adapter: Keep replica owners in sync with cluster (#20812)
Browse files Browse the repository at this point in the history
This commit updates the semantics of cluster replica owners, so that a
replica owner is always kept in sync with the cluster owner. Alter the
owner of a replica is no longer permitted.

This fits into the model of managed clusters, where replicas are part
of the same logical object as the cluster.

Fixes #20792
  • Loading branch information
jkosh44 authored Jul 28, 2023
1 parent 9bec863 commit ab3a145
Show file tree
Hide file tree
Showing 16 changed files with 287 additions and 165 deletions.
2 changes: 1 addition & 1 deletion doc/user/content/sql/create-cluster-replica.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ CREATE CLUSTER REPLICA c1.r1 SIZE = 'medium';

The privileges required to execute this statement are:

- `CREATE` privileges on the containing cluster.
- Ownership of `cluster_name`.

[AWS availability zone ID]: https://docs.aws.amazon.com/ram/latest/userguide/working-with-az-ids.html
1 change: 0 additions & 1 deletion misc/python/materialize/checks/owners.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def _alter_object_owners(self, i: int, expensive: bool = False) -> str:
ALTER SOURCE owner_source{i} OWNER TO other_owner
ALTER SINK owner_sink{i} OWNER TO other_owner
ALTER CLUSTER owner_cluster{i} OWNER TO other_owner
ALTER CLUSTER REPLICA owner_cluster{i}.owner_cluster_r{i} OWNER TO other_owner
"""
)

Expand Down
43 changes: 43 additions & 0 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub(crate) async fn migrate(
})
.await?;

sync_cluster_replica_owners(&cat, &mut tx, now)?;
sync_source_owners(&cat, &mut tx, now)?;

tx.commit().await?;
Expand Down Expand Up @@ -430,6 +431,48 @@ fn sync_source_owners(
Ok(())
}

// Update the owners of all cluster replicas so that their owners match their cluster.
//
// TODO(migration): delete in version v.64 (released in v0.63 + 1 additional
// release)
fn sync_cluster_replica_owners(
catalog: &Catalog,
tx: &mut Transaction,
now: EpochMillis,
) -> Result<(), anyhow::Error> {
let mut updated_cluster_replicas = BTreeMap::new();
for cluster_replica in catalog.user_cluster_replicas() {
let cluster = catalog.get_cluster(cluster_replica.cluster_id());
let old_owner = cluster_replica.owner_id();
if old_owner != cluster.owner_id() {
let mut new_replica = cluster_replica.clone();
new_replica.owner_id = cluster.owner_id();
updated_cluster_replicas.insert(
cluster_replica.replica_id(),
(cluster_replica.cluster_id(), new_replica),
);
add_to_audit_log(
tx,
mz_audit_log::EventType::Alter,
mz_audit_log::ObjectType::ClusterReplica,
mz_audit_log::EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
object_id: ObjectId::ClusterReplica((
cluster_replica.cluster_id(),
cluster_replica.replica_id(),
))
.to_string(),
old_owner_id: old_owner.to_string(),
new_owner_id: cluster.owner_id().to_string(),
}),
now,
)?;
}
}
tx.update_cluster_replicas(updated_cluster_replicas)
.expect("corrupt catalog");
Ok(())
}

fn add_to_audit_log(
tx: &mut Transaction,
event_type: mz_audit_log::EventType,
Expand Down
16 changes: 10 additions & 6 deletions src/adapter/src/coord/sequencer/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use mz_controller::clusters::{
DEFAULT_REPLICA_LOGGING_INTERVAL_MICROS,
};
use mz_ore::cast::CastFrom;
use mz_repr::role_id::RoleId;
use mz_sql::catalog::{CatalogCluster, CatalogItem, CatalogItemType, ObjectType};
use mz_sql::names::ObjectId;
use mz_sql::plan::{
Expand Down Expand Up @@ -200,7 +201,6 @@ impl Coordinator {
for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
let id = self.catalog_mut().allocate_replica_id().await?;
self.create_managed_cluster_replica_op(
session,
cluster_id,
id,
replica_name,
Expand All @@ -210,6 +210,7 @@ impl Coordinator {
&mut ops,
&mut az_helper,
disk,
*session.current_role_id(),
)?;
}

Expand All @@ -222,7 +223,6 @@ impl Coordinator {

fn create_managed_cluster_replica_op(
&mut self,
session: &Session,
cluster_id: ClusterId,
id: ReplicaId,
name: String,
Expand All @@ -232,6 +232,7 @@ impl Coordinator {
ops: &mut Vec<Op>,
az_helper: &mut AzHelper,
disk: bool,
owner_id: RoleId,
) -> Result<(), AdapterError> {
let availability_zone = az_helper.choose_az_and_increment();
let location = SerializedReplicaLocation::Managed {
Expand Down Expand Up @@ -269,7 +270,7 @@ impl Coordinator {
id,
name,
config,
owner_id: *session.current_role_id(),
owner_id,
});
Ok(())
}
Expand Down Expand Up @@ -522,12 +523,14 @@ impl Coordinator {
};

let id = self.catalog_mut().allocate_replica_id().await?;
// Replicas have the same owner as their cluster.
let owner_id = self.catalog().get_cluster(cluster_id).owner_id();
let op = catalog::Op::CreateClusterReplica {
cluster_id,
id,
name: name.clone(),
config,
owner_id: *session.current_role_id(),
owner_id,
};

self.catalog_transact(Some(session), vec![op]).await?;
Expand Down Expand Up @@ -729,6 +732,7 @@ impl Coordinator {
) -> Result<(), AdapterError> {
let cluster = self.catalog.get_cluster(cluster_id);
let name = cluster.name().to_string();
let owner_id = cluster.owner_id();
let mut ops = vec![];

let (
Expand Down Expand Up @@ -808,7 +812,6 @@ impl Coordinator {
for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
let id = self.catalog_mut().allocate_replica_id().await?;
self.create_managed_cluster_replica_op(
session,
cluster_id,
id,
name,
Expand All @@ -818,6 +821,7 @@ impl Coordinator {
&mut ops,
&mut az_helper,
*new_disk,
owner_id,
)?;
create_cluster_replicas.push((cluster_id, id))
}
Expand Down Expand Up @@ -850,7 +854,6 @@ impl Coordinator {
{
let id = self.catalog_mut().allocate_replica_id().await?;
self.create_managed_cluster_replica_op(
session,
cluster_id,
id,
name,
Expand All @@ -860,6 +863,7 @@ impl Coordinator {
&mut ops,
&mut az_helper,
*new_disk,
owner_id,
)?;
create_cluster_replicas.push((cluster_id, id))
}
Expand Down
93 changes: 55 additions & 38 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4736,50 +4736,67 @@ impl Coordinator {
new_owner,
}];

if let ObjectId::Item(global_id) = &id {
let entry = self.catalog().get_entry(global_id);
match &id {
ObjectId::Item(global_id) => {
let entry = self.catalog().get_entry(global_id);

// Cannot directly change the owner of an index.
if entry.is_index() {
let name = self
.catalog()
.resolve_full_name(entry.name(), Some(session.conn_id()))
.to_string();
session.add_notice(AdapterNotice::AlterIndexOwner { name });
return Ok(ExecuteResponse::AlteredObject(object_type));
}
// Cannot directly change the owner of an index.
if entry.is_index() {
let name = self
.catalog()
.resolve_full_name(entry.name(), Some(session.conn_id()))
.to_string();
session.add_notice(AdapterNotice::AlterIndexOwner { name });
return Ok(ExecuteResponse::AlteredObject(object_type));
}

// Alter owner cascades down to dependent indexes.
let dependent_index_ops = entry
.used_by()
.into_iter()
.filter(|id| self.catalog().get_entry(id).is_index())
.map(|id| Op::UpdateOwner {
id: ObjectId::Item(*id),
new_owner,
});
ops.extend(dependent_index_ops);
// Alter owner cascades down to dependent indexes.
let dependent_index_ops = entry
.used_by()
.into_iter()
.filter(|id| self.catalog().get_entry(id).is_index())
.map(|id| Op::UpdateOwner {
id: ObjectId::Item(*id),
new_owner,
});
ops.extend(dependent_index_ops);

// Alter owner cascades down to linked clusters and replicas.
if let Some(cluster) = self.catalog().get_linked_cluster(*global_id) {
let linked_cluster_replica_ops =
cluster.replicas_by_id.keys().map(|id| Op::UpdateOwner {
id: ObjectId::ClusterReplica((cluster.id(), *id)),
new_owner,
});
ops.extend(linked_cluster_replica_ops);
ops.push(Op::UpdateOwner {
id: ObjectId::Cluster(cluster.id()),
new_owner,
});
}

// Alter owner cascades down to linked clusters and replicas.
if let Some(cluster) = self.catalog().get_linked_cluster(*global_id) {
let linked_cluster_replica_ops =
cluster.replicas_by_id.keys().map(|id| Op::UpdateOwner {
id: ObjectId::ClusterReplica((cluster.id(), *id)),
// Alter owner cascades down to sub-sources and progress collections.
let dependent_subsources =
entry.subsources().into_iter().map(|id| Op::UpdateOwner {
id: ObjectId::Item(id),
new_owner,
});
ops.extend(linked_cluster_replica_ops);
ops.push(Op::UpdateOwner {
id: ObjectId::Cluster(cluster.id()),
new_owner,
});
ops.extend(dependent_subsources);
}

// Alter owner cascades down to sub-sources and progress collections.
let dependent_subsources = entry.subsources().into_iter().map(|id| Op::UpdateOwner {
id: ObjectId::Item(id),
new_owner,
});
ops.extend(dependent_subsources);
ObjectId::Cluster(cluster_id) => {
let cluster = self.catalog().get_cluster(*cluster_id);
// Alter owner cascades down to cluster replicas.
let managed_cluster_replica_ops =
cluster
.replicas_by_id
.keys()
.map(|replica_id| Op::UpdateOwner {
id: ObjectId::ClusterReplica((cluster.id(), *replica_id)),
new_owner,
});
ops.extend(managed_cluster_replica_ops);
}
_ => {}
}

self.catalog_transact(Some(session), ops)
Expand Down
18 changes: 6 additions & 12 deletions src/adapter/src/rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ fn generate_required_ownership(plan: &Plan) -> Vec<ObjectId> {
| Plan::CreateSchema(_)
| Plan::CreateRole(_)
| Plan::CreateCluster(_)
| Plan::CreateClusterReplica(_)
| Plan::CreateSource(_)
| Plan::CreateSources(_)
| Plan::CreateSecret(_)
Expand Down Expand Up @@ -437,6 +436,7 @@ fn generate_required_ownership(plan: &Plan) -> Vec<ObjectId> {
| Plan::AlterDefaultPrivileges(_)
| Plan::ValidateConnection(_)
| Plan::SideEffectingFunc(_) => Vec::new(),
Plan::CreateClusterReplica(plan) => vec![ObjectId::Cluster(plan.cluster_id)],
Plan::CreateIndex(plan) => vec![ObjectId::Item(plan.index.on)],
Plan::CreateView(CreateViewPlan { replace, .. })
| Plan::CreateMaterializedView(CreateMaterializedViewPlan { replace, .. }) => replace
Expand Down Expand Up @@ -602,17 +602,6 @@ fn generate_required_privileges(
}) => {
vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
}
Plan::CreateClusterReplica(CreateClusterReplicaPlan {
cluster_id,
name: _,
config: _,
}) => {
vec![(
SystemObjectId::Object(cluster_id.into()),
AclMode::CREATE,
role_id,
)]
}
Plan::CreateSource(CreateSourcePlan {
name,
source: _,
Expand Down Expand Up @@ -1101,6 +1090,11 @@ fn generate_required_privileges(
name: _,
options: _,
})
| Plan::CreateClusterReplica(CreateClusterReplicaPlan {
cluster_id: _,
name: _,
config: _,
})
| Plan::DiscardTemp
| Plan::DiscardAll
| Plan::EmptyQuery
Expand Down
12 changes: 10 additions & 2 deletions src/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,23 @@ macro_rules! bail_never_supported {
($feature:expr, $docs:expr, $details:expr) => {
return Err(crate::plan::error::PlanError::NeverSupported {
feature: $feature.to_string(),
documentation_link: $docs.to_string(),
documentation_link: Some($docs.to_string()),
details: Some($details.to_string()),
}
.into())
};
($feature:expr, $docs:expr) => {
return Err(crate::plan::error::PlanError::NeverSupported {
feature: $feature.to_string(),
documentation_link: $docs.to_string(),
documentation_link: Some($docs.to_string()),
details: None,
}
.into())
};
($feature:expr) => {
return Err(crate::plan::error::PlanError::NeverSupported {
feature: $feature.to_string(),
documentation_link: None,
details: None,
}
.into())
Expand Down
7 changes: 5 additions & 2 deletions src/sql/src/plan/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub enum PlanError {
/// This feature is not supported, and will likely never be supported.
NeverSupported {
feature: String,
documentation_link: String,
documentation_link: Option<String>,
details: Option<String>,
},
UnknownColumn {
Expand Down Expand Up @@ -358,7 +358,10 @@ impl fmt::Display for PlanError {
Ok(())
}
Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
write!(f, "{feature} is not supported, for more information consult the documentation at https://materialize.com/docs/{documentation_path}",)?;
write!(f, "{feature} is not supported",)?;
if let Some(documentation_path) = documentation_path {
write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
}
Ok(())
}
Self::UnknownColumn { table, column } => write!(
Expand Down
Loading

0 comments on commit ab3a145

Please sign in to comment.