Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Keep replica owners in sync with cluster #20812

Merged
merged 2 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/user/content/sql/create-cluster-replica.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ CREATE CLUSTER REPLICA c1.r1 SIZE = 'medium';

The privileges required to execute this statement are:

- `CREATE` privileges on the containing cluster.
- Ownership of `cluster_name`.

[AWS availability zone ID]: https://docs.aws.amazon.com/ram/latest/userguide/working-with-az-ids.html
1 change: 0 additions & 1 deletion misc/python/materialize/checks/owners.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def _alter_object_owners(self, i: int, expensive: bool = False) -> str:
ALTER SOURCE owner_source{i} OWNER TO other_owner
ALTER SINK owner_sink{i} OWNER TO other_owner
ALTER CLUSTER owner_cluster{i} OWNER TO other_owner
ALTER CLUSTER REPLICA owner_cluster{i}.owner_cluster_r{i} OWNER TO other_owner
"""
)

Expand Down
43 changes: 43 additions & 0 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub(crate) async fn migrate(
})
.await?;

sync_cluster_replica_owners(&cat, &mut tx, now)?;
sync_source_owners(&cat, &mut tx, now)?;

tx.commit().await?;
Expand Down Expand Up @@ -430,6 +431,48 @@ fn sync_source_owners(
Ok(())
}

// Update the owners of all cluster replicas so that their owners match their cluster.
//
// TODO(migration): delete in version v.64 (released in v0.63 + 1 additional
// release)
fn sync_cluster_replica_owners(
catalog: &Catalog,
tx: &mut Transaction,
now: EpochMillis,
) -> Result<(), anyhow::Error> {
let mut updated_cluster_replicas = BTreeMap::new();
for cluster_replica in catalog.user_cluster_replicas() {
let cluster = catalog.get_cluster(cluster_replica.cluster_id());
let old_owner = cluster_replica.owner_id();
if old_owner != cluster.owner_id() {
let mut new_replica = cluster_replica.clone();
new_replica.owner_id = cluster.owner_id();
updated_cluster_replicas.insert(
cluster_replica.replica_id(),
(cluster_replica.cluster_id(), new_replica),
);
add_to_audit_log(
tx,
mz_audit_log::EventType::Alter,
mz_audit_log::ObjectType::ClusterReplica,
mz_audit_log::EventDetails::UpdateOwnerV1(mz_audit_log::UpdateOwnerV1 {
object_id: ObjectId::ClusterReplica((
cluster_replica.cluster_id(),
cluster_replica.replica_id(),
))
.to_string(),
old_owner_id: old_owner.to_string(),
new_owner_id: cluster.owner_id().to_string(),
}),
now,
)?;
}
}
tx.update_cluster_replicas(updated_cluster_replicas)
.expect("corrupt catalog");
Ok(())
}

fn add_to_audit_log(
tx: &mut Transaction,
event_type: mz_audit_log::EventType,
Expand Down
16 changes: 10 additions & 6 deletions src/adapter/src/coord/sequencer/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use mz_controller::clusters::{
DEFAULT_REPLICA_LOGGING_INTERVAL_MICROS,
};
use mz_ore::cast::CastFrom;
use mz_repr::role_id::RoleId;
use mz_sql::catalog::{CatalogCluster, CatalogItem, CatalogItemType, ObjectType};
use mz_sql::names::ObjectId;
use mz_sql::plan::{
Expand Down Expand Up @@ -200,7 +201,6 @@ impl Coordinator {
for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
let id = self.catalog_mut().allocate_replica_id().await?;
self.create_managed_cluster_replica_op(
session,
cluster_id,
id,
replica_name,
Expand All @@ -210,6 +210,7 @@ impl Coordinator {
&mut ops,
&mut az_helper,
disk,
*session.current_role_id(),
)?;
}

Expand All @@ -222,7 +223,6 @@ impl Coordinator {

fn create_managed_cluster_replica_op(
&mut self,
session: &Session,
cluster_id: ClusterId,
id: ReplicaId,
name: String,
Expand All @@ -232,6 +232,7 @@ impl Coordinator {
ops: &mut Vec<Op>,
az_helper: &mut AzHelper,
disk: bool,
owner_id: RoleId,
) -> Result<(), AdapterError> {
let availability_zone = az_helper.choose_az_and_increment();
let location = SerializedReplicaLocation::Managed {
Expand Down Expand Up @@ -269,7 +270,7 @@ impl Coordinator {
id,
name,
config,
owner_id: *session.current_role_id(),
owner_id,
});
Ok(())
}
Expand Down Expand Up @@ -522,12 +523,14 @@ impl Coordinator {
};

let id = self.catalog_mut().allocate_replica_id().await?;
// Replicas have the same owner as their cluster.
let owner_id = self.catalog().get_cluster(cluster_id).owner_id();
let op = catalog::Op::CreateClusterReplica {
cluster_id,
id,
name: name.clone(),
config,
owner_id: *session.current_role_id(),
owner_id,
};

self.catalog_transact(Some(session), vec![op]).await?;
Expand Down Expand Up @@ -729,6 +732,7 @@ impl Coordinator {
) -> Result<(), AdapterError> {
let cluster = self.catalog.get_cluster(cluster_id);
let name = cluster.name().to_string();
let owner_id = cluster.owner_id();
let mut ops = vec![];

let (
Expand Down Expand Up @@ -808,7 +812,6 @@ impl Coordinator {
for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
let id = self.catalog_mut().allocate_replica_id().await?;
self.create_managed_cluster_replica_op(
session,
cluster_id,
id,
name,
Expand All @@ -818,6 +821,7 @@ impl Coordinator {
&mut ops,
&mut az_helper,
*new_disk,
owner_id,
)?;
create_cluster_replicas.push((cluster_id, id))
}
Expand Down Expand Up @@ -850,7 +854,6 @@ impl Coordinator {
{
let id = self.catalog_mut().allocate_replica_id().await?;
self.create_managed_cluster_replica_op(
session,
cluster_id,
id,
name,
Expand All @@ -860,6 +863,7 @@ impl Coordinator {
&mut ops,
&mut az_helper,
*new_disk,
owner_id,
)?;
create_cluster_replicas.push((cluster_id, id))
}
Expand Down
93 changes: 55 additions & 38 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4730,50 +4730,67 @@ impl Coordinator {
new_owner,
}];

if let ObjectId::Item(global_id) = &id {
let entry = self.catalog().get_entry(global_id);
match &id {
ObjectId::Item(global_id) => {
let entry = self.catalog().get_entry(global_id);

// Cannot directly change the owner of an index.
if entry.is_index() {
let name = self
.catalog()
.resolve_full_name(entry.name(), Some(session.conn_id()))
.to_string();
session.add_notice(AdapterNotice::AlterIndexOwner { name });
return Ok(ExecuteResponse::AlteredObject(object_type));
}
// Cannot directly change the owner of an index.
if entry.is_index() {
let name = self
.catalog()
.resolve_full_name(entry.name(), Some(session.conn_id()))
.to_string();
session.add_notice(AdapterNotice::AlterIndexOwner { name });
return Ok(ExecuteResponse::AlteredObject(object_type));
}

// Alter owner cascades down to dependent indexes.
let dependent_index_ops = entry
.used_by()
.into_iter()
.filter(|id| self.catalog().get_entry(id).is_index())
.map(|id| Op::UpdateOwner {
id: ObjectId::Item(*id),
new_owner,
});
ops.extend(dependent_index_ops);
// Alter owner cascades down to dependent indexes.
let dependent_index_ops = entry
.used_by()
.into_iter()
.filter(|id| self.catalog().get_entry(id).is_index())
.map(|id| Op::UpdateOwner {
id: ObjectId::Item(*id),
new_owner,
});
ops.extend(dependent_index_ops);

// Alter owner cascades down to linked clusters and replicas.
if let Some(cluster) = self.catalog().get_linked_cluster(*global_id) {
let linked_cluster_replica_ops =
cluster.replicas_by_id.keys().map(|id| Op::UpdateOwner {
id: ObjectId::ClusterReplica((cluster.id(), *id)),
new_owner,
});
ops.extend(linked_cluster_replica_ops);
ops.push(Op::UpdateOwner {
id: ObjectId::Cluster(cluster.id()),
new_owner,
});
}

// Alter owner cascades down to linked clusters and replicas.
if let Some(cluster) = self.catalog().get_linked_cluster(*global_id) {
let linked_cluster_replica_ops =
cluster.replicas_by_id.keys().map(|id| Op::UpdateOwner {
id: ObjectId::ClusterReplica((cluster.id(), *id)),
// Alter owner cascades down to sub-sources and progress collections.
let dependent_subsources =
entry.subsources().into_iter().map(|id| Op::UpdateOwner {
id: ObjectId::Item(id),
new_owner,
});
ops.extend(linked_cluster_replica_ops);
ops.push(Op::UpdateOwner {
id: ObjectId::Cluster(cluster.id()),
new_owner,
});
ops.extend(dependent_subsources);
}

// Alter owner cascades down to sub-sources and progress collections.
let dependent_subsources = entry.subsources().into_iter().map(|id| Op::UpdateOwner {
id: ObjectId::Item(id),
new_owner,
});
ops.extend(dependent_subsources);
ObjectId::Cluster(cluster_id) => {
let cluster = self.catalog().get_cluster(*cluster_id);
// Alter owner cascades down to cluster replicas.
let managed_cluster_replica_ops =
cluster
.replicas_by_id
.keys()
.map(|replica_id| Op::UpdateOwner {
id: ObjectId::ClusterReplica((cluster.id(), *replica_id)),
new_owner,
});
ops.extend(managed_cluster_replica_ops);
}
_ => {}
}

self.catalog_transact(Some(session), ops)
Expand Down
18 changes: 6 additions & 12 deletions src/adapter/src/rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ fn generate_required_ownership(plan: &Plan) -> Vec<ObjectId> {
| Plan::CreateSchema(_)
| Plan::CreateRole(_)
| Plan::CreateCluster(_)
| Plan::CreateClusterReplica(_)
| Plan::CreateSource(_)
| Plan::CreateSources(_)
| Plan::CreateSecret(_)
Expand Down Expand Up @@ -437,6 +436,7 @@ fn generate_required_ownership(plan: &Plan) -> Vec<ObjectId> {
| Plan::AlterDefaultPrivileges(_)
| Plan::ValidateConnection(_)
| Plan::SideEffectingFunc(_) => Vec::new(),
Plan::CreateClusterReplica(plan) => vec![ObjectId::Cluster(plan.cluster_id)],
Plan::CreateIndex(plan) => vec![ObjectId::Item(plan.index.on)],
Plan::CreateView(CreateViewPlan { replace, .. })
| Plan::CreateMaterializedView(CreateMaterializedViewPlan { replace, .. }) => replace
Expand Down Expand Up @@ -602,17 +602,6 @@ fn generate_required_privileges(
}) => {
vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
}
Plan::CreateClusterReplica(CreateClusterReplicaPlan {
cluster_id,
name: _,
config: _,
}) => {
vec![(
SystemObjectId::Object(cluster_id.into()),
AclMode::CREATE,
role_id,
)]
Comment on lines -610 to -614
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we require this privilege anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, we're changing how we treat creating replica from "creating a new object in a cluster" to "altering an existing cluster". The matches with the design and syntax of managed clusters, which eventually will be the default (or only) cluster type. So the privileges are transitioning from the ability to create something to the ability to alter something. There's no explicit alter privilege, instead you must own an object to alter it.

}
Plan::CreateSource(CreateSourcePlan {
name,
source: _,
Expand Down Expand Up @@ -1101,6 +1090,11 @@ fn generate_required_privileges(
name: _,
options: _,
})
| Plan::CreateClusterReplica(CreateClusterReplicaPlan {
cluster_id: _,
name: _,
config: _,
})
| Plan::DiscardTemp
| Plan::DiscardAll
| Plan::EmptyQuery
Expand Down
12 changes: 10 additions & 2 deletions src/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,23 @@ macro_rules! bail_never_supported {
($feature:expr, $docs:expr, $details:expr) => {
return Err(crate::plan::error::PlanError::NeverSupported {
feature: $feature.to_string(),
documentation_link: $docs.to_string(),
documentation_link: Some($docs.to_string()),
details: Some($details.to_string()),
}
.into())
};
($feature:expr, $docs:expr) => {
return Err(crate::plan::error::PlanError::NeverSupported {
feature: $feature.to_string(),
documentation_link: $docs.to_string(),
documentation_link: Some($docs.to_string()),
details: None,
}
.into())
};
($feature:expr) => {
return Err(crate::plan::error::PlanError::NeverSupported {
feature: $feature.to_string(),
documentation_link: None,
details: None,
}
.into())
Expand Down
7 changes: 5 additions & 2 deletions src/sql/src/plan/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub enum PlanError {
/// This feature is not supported, and will likely never be supported.
NeverSupported {
feature: String,
documentation_link: String,
documentation_link: Option<String>,
details: Option<String>,
},
UnknownColumn {
Expand Down Expand Up @@ -358,7 +358,10 @@ impl fmt::Display for PlanError {
Ok(())
}
Self::NeverSupported { feature, documentation_link: documentation_path,.. } => {
write!(f, "{feature} is not supported, for more information consult the documentation at https://materialize.com/docs/{documentation_path}",)?;
write!(f, "{feature} is not supported",)?;
if let Some(documentation_path) = documentation_path {
write!(f, ", for more information consult the documentation at https://materialize.com/docs/{documentation_path}")?;
}
Ok(())
}
Self::UnknownColumn { table, column } => write!(
Expand Down
Loading