Skip to content

Commit

Permalink
catalog: Allow for more builtin roles (MaterializeInc#14128)
Browse files Browse the repository at this point in the history
This commit updates role ids to have two separate namespaces. One for
builtin roles and one for user roles. This will allow us to more easily
add builtin roles in the future.

This is a breaking change.

Works towards resolving: MaterializeInc/cloud#3292
  • Loading branch information
jkosh44 authored Aug 10, 2022
1 parent e5c76e0 commit d370474
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 147 deletions.
24 changes: 15 additions & 9 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use mz_sql::catalog::{
};
use mz_sql::names::{
Aug, DatabaseId, FullObjectName, ObjectQualifiers, PartialObjectName, QualifiedObjectName,
QualifiedSchemaName, RawDatabaseSpecifier, ResolvedDatabaseSpecifier, SchemaId,
QualifiedSchemaName, RawDatabaseSpecifier, ResolvedDatabaseSpecifier, RoleId, SchemaId,
SchemaSpecifier,
};
use mz_sql::plan::{
Expand All @@ -71,7 +71,7 @@ use mz_transform::Optimizer;

use crate::catalog::builtin::{
Builtin, BuiltinLog, BuiltinStorageCollection, BuiltinTable, BuiltinType, Fingerprint,
BUILTINS, BUILTIN_ROLES, INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA,
BUILTINS, BUILTIN_ROLE_PREFIXES, INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_INTERNAL_SCHEMA,
MZ_TEMP_SCHEMA, PG_CATALOG_SCHEMA,
};
pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
Expand Down Expand Up @@ -1169,7 +1169,7 @@ pub struct Schema {
#[derive(Debug, Serialize, Clone)]
pub struct Role {
pub name: String,
pub id: u64,
pub id: RoleId,
#[serde(skip)]
pub oid: u32,
}
Expand Down Expand Up @@ -1830,8 +1830,7 @@ impl<S: Append> Catalog<S> {
}

let roles = catalog.storage().await.load_roles().await?;
let builtin_roles = BUILTIN_ROLES.iter().map(|b| (b.id, b.name.to_owned()));
for (id, name) in roles.into_iter().chain(builtin_roles) {
for (id, name) in roles {
let oid = catalog.allocate_oid().await?;
catalog.state.roles.insert(
name.clone(),
Expand Down Expand Up @@ -3180,7 +3179,7 @@ impl<S: Append> Catalog<S> {
schema_name: String,
},
CreateRole {
id: u64,
id: RoleId,
oid: u32,
name: String,
},
Expand Down Expand Up @@ -3329,7 +3328,7 @@ impl<S: Append> Catalog<S> {
)));
}
vec![Action::CreateRole {
id: tx.insert_role(&name)?,
id: tx.insert_user_role(&name)?,
oid,
name,
}]
Expand Down Expand Up @@ -3481,6 +3480,11 @@ impl<S: Append> Catalog<S> {
}]
}
Op::DropRole { name } => {
if is_reserved_name(&name) {
return Err(AdapterError::Catalog(Error::new(
ErrorKind::ReservedRoleName(name),
)));
}
tx.remove_role(&name)?;
builtin_table_updates.push(self.state.pack_role_update(&name, -1));
vec![Action::DropRole { name }]
Expand Down Expand Up @@ -4253,7 +4257,9 @@ impl<S: Append> Catalog<S> {
}

fn is_reserved_name(name: &str) -> bool {
name.starts_with("mz_") || name.starts_with("pg_")
BUILTIN_ROLE_PREFIXES
.iter()
.any(|prefix| name.starts_with(prefix))
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -4762,7 +4768,7 @@ impl mz_sql::catalog::CatalogRole for Role {
&self.name
}

fn id(&self) -> u64 {
fn id(&self) -> RoleId {
self.id
}
}
Expand Down
19 changes: 11 additions & 8 deletions src/adapter/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@ pub struct BuiltinFunc {
pub inner: &'static mz_sql::func::Func,
}

pub static BUILTIN_ROLE_PREFIXES: Lazy<Vec<&str>> = Lazy::new(|| vec!["mz_", "pg_"]);

pub struct BuiltinRole {
/// Name of the builtin role.
///
/// IMPORTANT: Must start with a prefix from [`BUILTIN_ROLE_PREFIXES`].
pub name: &'static str,
pub id: u64,
}

pub trait Fingerprint {
Expand Down Expand Up @@ -1134,7 +1138,7 @@ pub static MZ_ROLES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
name: "mz_roles",
schema: MZ_CATALOG_SCHEMA,
desc: RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("id", ScalarType::String.nullable(false))
.with_column("oid", ScalarType::Oid.nullable(false))
.with_column("name", ScalarType::String.nullable(false)),
});
Expand Down Expand Up @@ -2101,10 +2105,7 @@ AS SELECT
FROM mz_catalog.mz_roles r",
};

pub const MZ_SYSTEM: BuiltinRole = BuiltinRole {
name: "mz_system",
id: 0,
};
pub const MZ_SYSTEM: BuiltinRole = BuiltinRole { name: "mz_system" };

pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
let mut builtins = vec![
Expand Down Expand Up @@ -2293,6 +2294,7 @@ pub static BUILTIN_ROLES: Lazy<Vec<BuiltinRole>> = Lazy::new(|| vec![MZ_SYSTEM])
#[allow(non_snake_case)]
pub mod BUILTINS {
use super::*;

pub fn logs() -> impl Iterator<Item = &'static BuiltinLog> {
BUILTINS_STATIC.iter().filter_map(|b| match b {
Builtin::Log(log) => Some(*log),
Expand Down Expand Up @@ -2327,15 +2329,16 @@ mod tests {
use std::collections::{HashMap, HashSet};
use std::env;

use mz_ore::now::NOW_ZERO;
use tokio_postgres::NoTls;

use crate::catalog::{Catalog, CatalogItem, SYSTEM_CONN_ID};
use mz_ore::now::NOW_ZERO;
use mz_ore::task;
use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID};
use mz_sql::catalog::{CatalogSchema, SessionCatalog};
use mz_sql::names::{PartialObjectName, ResolvedDatabaseSpecifier};

use crate::catalog::{Catalog, CatalogItem, SYSTEM_CONN_ID};

use super::*;

// Connect to a running Postgres server and verify that our builtin
Expand Down
3 changes: 1 addition & 2 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ impl CatalogState {
BuiltinTableUpdate {
id: self.resolve_builtin_table(&MZ_ROLES),
row: Row::pack_slice(&[
// TODO(jkosh44) when Uint64 is supported change below to Datum::Uint64
Datum::Int64(role.id as i64),
Datum::String(&role.id.to_string()),
Datum::UInt32(role.oid),
Datum::String(&name),
]),
Expand Down
74 changes: 60 additions & 14 deletions src/adapter/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::Hash;
use std::iter::once;
use std::str::FromStr;

use bytes::BufMut;
use itertools::{max, Itertools};
Expand All @@ -18,7 +19,6 @@ use serde_json::json;
use timely::progress::Timestamp;
use uuid::Uuid;

use crate::catalog;
use mz_audit_log::{VersionedEvent, VersionedStorageMetrics};
use mz_compute_client::command::ReplicaId;
use mz_compute_client::controller::ComputeInstanceId;
Expand All @@ -31,14 +31,15 @@ use mz_repr::global_id::ProtoGlobalId;
use mz_repr::GlobalId;
use mz_sql::catalog::CatalogError as SqlCatalogError;
use mz_sql::names::{
DatabaseId, ObjectQualifiers, QualifiedObjectName, ResolvedDatabaseSpecifier, SchemaId,
DatabaseId, ObjectQualifiers, QualifiedObjectName, ResolvedDatabaseSpecifier, RoleId, SchemaId,
SchemaSpecifier,
};
use mz_sql::plan::ComputeInstanceIntrospectionConfig;
use mz_stash::{Append, AppendBatch, Stash, StashError, TableTransaction, TypedCollection};
use mz_storage::types::sources::Timeline;

use crate::catalog::builtin::BuiltinLog;
use crate::catalog;
use crate::catalog::builtin::{BuiltinLog, BUILTIN_ROLES};
use crate::catalog::error::{Error, ErrorKind};
use crate::catalog::SerializedComputeInstanceReplicaConfig;
use crate::catalog::SystemObjectMapping;
Expand All @@ -57,7 +58,8 @@ const DEFAULT_REPLICA_ID: u64 = 1;

const DATABASE_ID_ALLOC_KEY: &str = "database";
const SCHEMA_ID_ALLOC_KEY: &str = "schema";
const ROLE_ID_ALLOC_KEY: &str = "role";
const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
const SYSTEM_ROLE_ID_ALLOC_KEY: &str = "system_role";
const COMPUTE_ID_ALLOC_KEY: &str = "compute";
const REPLICA_ID_ALLOC_KEY: &str = "replica";
pub(crate) const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
Expand Down Expand Up @@ -112,12 +114,18 @@ async fn migrate<S: Append>(
)?;
txn.id_allocator.insert(
IdAllocKey {
name: ROLE_ID_ALLOC_KEY.into(),
name: USER_ROLE_ID_ALLOC_KEY.into(),
},
IdAllocValue {
next_id: MATERIALIZE_ROLE_ID + 1,
},
)?;
txn.id_allocator.insert(
IdAllocKey {
name: SYSTEM_ROLE_ID_ALLOC_KEY.into(),
},
IdAllocValue { next_id: 1 },
)?;
txn.id_allocator.insert(
IdAllocKey {
name: COMPUTE_ID_ALLOC_KEY.into(),
Expand Down Expand Up @@ -195,7 +203,7 @@ async fn migrate<S: Append>(
)?;
txn.roles.insert(
RoleKey {
id: MATERIALIZE_ROLE_ID,
id: RoleId::User(MATERIALIZE_ROLE_ID).to_string(),
},
RoleValue {
name: "materialize".into(),
Expand Down Expand Up @@ -496,12 +504,33 @@ impl<S: Append> Connection<S> {
.collect())
}

pub async fn load_roles(&mut self) -> Result<Vec<(u64, String)>, Error> {
pub async fn load_roles(&mut self) -> Result<Vec<(RoleId, String)>, Error> {
// Add in any new builtin roles.
let mut tx = self.transaction().await?;
let role_names: HashSet<_> = tx
.roles
.items()
.into_values()
.map(|value| value.name)
.collect();
for builtin_role in &*BUILTIN_ROLES {
if !role_names.contains(builtin_role.name) {
tx.insert_system_role(builtin_role.name)?;
}
}
tx.commit().await?;

Ok(COLLECTION_ROLE
.peek_one(&mut self.stash)
.await?
.into_iter()
.map(|(k, v)| (k.id, v.name))
.map(|(k, v)| {
(
RoleId::from_str(&k.id)
.unwrap_or_else(|_| panic!("Invalid persisted role id {}", k.id)),
v.name,
)
})
.collect())
}

Expand Down Expand Up @@ -945,10 +974,27 @@ impl<'a, S: Append> Transaction<'a, S> {
}
}

pub fn insert_role(&mut self, role_name: &str) -> Result<u64, Error> {
let id = self.get_and_increment_id(ROLE_ID_ALLOC_KEY.to_string())?;
pub fn insert_user_role(&mut self, role_name: &str) -> Result<RoleId, Error> {
self.insert_role(role_name, USER_ROLE_ID_ALLOC_KEY, RoleId::User)
}

fn insert_system_role(&mut self, role_name: &str) -> Result<RoleId, Error> {
self.insert_role(role_name, SYSTEM_ROLE_ID_ALLOC_KEY, RoleId::System)
}

fn insert_role<F>(
&mut self,
role_name: &str,
id_alloc_key: &str,
role_id_variant: F,
) -> Result<RoleId, Error>
where
F: Fn(u64) -> RoleId,
{
let id = self.get_and_increment_id(id_alloc_key.to_string())?;
let id = role_id_variant(id);
match self.roles.insert(
RoleKey { id },
RoleKey { id: id.to_string() },
RoleValue {
name: role_name.to_string(),
},
Expand Down Expand Up @@ -1654,8 +1700,8 @@ impl_codec!(ItemValue);

#[derive(Clone, Message, PartialOrd, PartialEq, Eq, Ord, Hash)]
struct RoleKey {
#[prost(uint64)]
id: u64,
#[prost(string)]
id: String,
}
impl_codec!(RoleKey);

Expand Down
4 changes: 2 additions & 2 deletions src/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use uuid::Uuid;
use crate::func::Func;
use crate::names::{
Aug, DatabaseId, FullObjectName, PartialObjectName, QualifiedObjectName, QualifiedSchemaName,
ResolvedDatabaseSpecifier, SchemaSpecifier,
ResolvedDatabaseSpecifier, RoleId, SchemaSpecifier,
};
use crate::plan::statement::StatementDesc;

Expand Down Expand Up @@ -255,7 +255,7 @@ pub trait CatalogRole {
fn name(&self) -> &str;

/// Returns a stable ID for the role.
fn id(&self) -> u64;
fn id(&self) -> RoleId;
}

/// A compute instance in a [`SessionCatalog`].
Expand Down
33 changes: 33 additions & 0 deletions src/sql/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

//! Structured name types for SQL objects.

use anyhow::anyhow;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::str::FromStr;
Expand Down Expand Up @@ -663,6 +664,38 @@ impl FromStr for DatabaseId {
}
}

/// The identifier for a role.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum RoleId {
System(u64),
User(u64),
}

impl FromStr for RoleId {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() < 2 {
return Err(anyhow!("couldn't parse role id {}", s));
}
let val: u64 = s[1..].parse()?;
match s.chars().next().unwrap() {
's' => Ok(Self::System(val)),
'u' => Ok(Self::User(val)),
_ => Err(anyhow!("couldn't parse role id {}", s)),
}
}
}

impl fmt::Display for RoleId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::System(id) => write!(f, "s{}", id),
Self::User(id) => write!(f, "u{}", id),
}
}
}

#[derive(Debug)]
pub struct NameResolver<'a> {
catalog: &'a dyn SessionCatalog,
Expand Down
Loading

0 comments on commit d370474

Please sign in to comment.