Skip to content

Commit

Permalink
adapter: Add initial support for account limits (MaterializeInc#14047)
Browse files Browse the repository at this point in the history
This commit adds the initial infrastructure needed to support SQL
account limits. This includes the following:
- System configuration stash collection.
- System configuration in-memory catalog map.
- ALTER SYSTEM SET parsing, statement, plan, and sequencing.

This commit has some items missing needed to fully support account
limits, which will be added in a future commit.

Works towards resolving: MaterializeInc/cloud#3292
  • Loading branch information
jkosh44 authored Aug 9, 2022
1 parent 0e2c93a commit 064b435
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub struct CatalogState {
storage_host_sizes: StorageHostSizeMap,
default_storage_host_size: Option<String>,
availability_zones: Vec<String>,
system_configuration: HashMap<String, mz_sql_parser::ast::Value>,
}

impl CatalogState {
Expand Down Expand Up @@ -655,6 +656,16 @@ impl CatalogState {
.cloned()
}

/// Insert system configuration `name` with `value`.
fn insert_system_configuration(&mut self, name: String, value: mz_sql_parser::ast::Value) {
self.system_configuration.insert(name, value);
}

/// Remove system configuration `name`.
fn remove_system_configuration(&mut self, name: &str) -> Option<mz_sql_parser::ast::Value> {
self.system_configuration.remove(name)
}

/// Gets the schema map for the database matching `database_spec`.
fn resolve_schema_in_database(
&self,
Expand Down Expand Up @@ -1657,6 +1668,7 @@ impl<S: Append> Catalog<S> {
storage_host_sizes: config.storage_host_sizes,
default_storage_host_size: config.default_storage_host_size,
availability_zones: config.availability_zones,
system_configuration: HashMap::new(),
},
transient_revision: 0,
storage: Arc::new(Mutex::new(config.storage)),
Expand Down Expand Up @@ -1926,6 +1938,11 @@ impl<S: Append> Catalog<S> {
.insert_compute_instance_replica(instance_id, name, replica_id, config);
}

let system_config = catalog.storage().await.load_system_configuration().await?;
for (name, value) in system_config {
catalog.state.insert_system_configuration(name, value);
}

if !config.skip_migrations {
let last_seen_version = catalog
.storage()
Expand Down Expand Up @@ -3104,6 +3121,10 @@ impl<S: Append> Catalog<S> {
UpdateComputeInstanceStatus {
event: ComputeInstanceEvent,
},
UpdateServerConfiguration {
name: String,
value: mz_sql_parser::ast::Value,
},
}

let drop_ids: HashSet<_> = ops
Expand Down Expand Up @@ -3571,6 +3592,10 @@ impl<S: Append> Catalog<S> {
)?;
vec![]
}
Op::UpdateServerConfiguration { name, value } => {
tx.upsert_system_config(&name, value.clone()).await?;
vec![Action::UpdateServerConfiguration { name, value }]
}
});
}

Expand Down Expand Up @@ -3810,6 +3835,10 @@ impl<S: Append> Catalog<S> {
builtin_table_updates.push(update);
}
}
Action::UpdateServerConfiguration { name, value } => {
state.remove_system_configuration(&name);
state.insert_system_configuration(name.clone(), value.clone());
}
}
}

Expand Down Expand Up @@ -4135,6 +4164,10 @@ pub enum Op {
object_id: Option<String>,
size_bytes: u64,
},
UpdateServerConfiguration {
name: String,
value: mz_sql_parser::ast::Value,
},
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
93 changes: 85 additions & 8 deletions src/adapter/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,23 @@ impl<S: Append> Connection<S> {
.collect())
}

/// Load the persisted server configurations.
pub async fn load_system_configuration(
&mut self,
) -> Result<BTreeMap<String, mz_sql_parser::ast::Value>, Error> {
COLLECTION_SYSTEM_CONFIGURATION
.peek_one(&mut self.stash)
.await?
.into_iter()
.map(|(k, v)| {
let name = k.name;
let value: mz_sql_parser::ast::Value = serde_json::from_slice(&v.value)
.map_err(|err| Error::from(StashError::from(err.to_string())))?;
Ok((name, value))
})
.collect()
}

/// Persist mapping from system objects to global IDs and fingerprints.
///
/// Panics if provided id is not a system id.
Expand Down Expand Up @@ -796,10 +813,11 @@ pub async fn transaction<'a, S: Append>(stash: &'a mut S) -> Result<Transaction<
.peek_one(stash)
.await?;
let id_allocator = COLLECTION_ID_ALLOC.peek_one(stash).await?;
let collection_config = COLLECTION_CONFIG.peek_one(stash).await?;
let collection_setting = COLLECTION_SETTING.peek_one(stash).await?;
let collection_timestamps = COLLECTION_TIMESTAMP.peek_one(stash).await?;
let collection_system_gid_mapping = COLLECTION_SYSTEM_GID_MAPPING.peek_one(stash).await?;
let configs = COLLECTION_CONFIG.peek_one(stash).await?;
let settings = COLLECTION_SETTING.peek_one(stash).await?;
let timestamps = COLLECTION_TIMESTAMP.peek_one(stash).await?;
let system_gid_mapping = COLLECTION_SYSTEM_GID_MAPPING.peek_one(stash).await?;
let system_configurations = COLLECTION_SYSTEM_CONFIGURATION.peek_one(stash).await?;

Ok(Transaction {
stash,
Expand All @@ -815,10 +833,11 @@ pub async fn transaction<'a, S: Append>(stash: &'a mut S) -> Result<Transaction<
}),
introspection_sources: TableTransaction::new(introspection_sources, |_a, _b| false),
id_allocator: TableTransaction::new(id_allocator, |_a, _b| false),
configs: TableTransaction::new(collection_config, |_a, _b| false),
settings: TableTransaction::new(collection_setting, |_a, _b| false),
timestamps: TableTransaction::new(collection_timestamps, |_a, _b| false),
system_gid_mapping: TableTransaction::new(collection_system_gid_mapping, |_a, _b| false),
configs: TableTransaction::new(configs, |_a, _b| false),
settings: TableTransaction::new(settings, |_a, _b| false),
timestamps: TableTransaction::new(timestamps, |_a, _b| false),
system_gid_mapping: TableTransaction::new(system_gid_mapping, |_a, _b| false),
system_configurations: TableTransaction::new(system_configurations, |_a, _b| false),
audit_log_updates: Vec::new(),
storage_usage_updates: Vec::new(),
})
Expand All @@ -840,6 +859,7 @@ pub struct Transaction<'a, S> {
settings: TableTransaction<SettingKey, SettingValue>,
timestamps: TableTransaction<TimestampKey, TimestampValue>,
system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
// Don't make this a table transaction so that it's not read into the stash
// memory cache.
audit_log_updates: Vec<(AuditLogKey, (), i64)>,
Expand Down Expand Up @@ -1216,6 +1236,37 @@ impl<'a, S: Append> Transaction<'a, S> {
Ok(())
}

/// Upserts persisted system configuration `name` to `value`.
pub async fn upsert_system_config(
&mut self,
name: &str,
value: mz_sql_parser::ast::Value,
) -> Result<(), Error> {
let key = ServerConfigurationKey {
name: name.to_string(),
};
let value = ServerConfigurationValue {
value: serde_json::to_vec(&value)
.map_err(|err| Error::from(StashError::from(err.to_string())))?,
};
self.system_configurations.delete(|k, _v| k == &key);
self.system_configurations.insert(key, value)?;
Ok(())
}

/// Removes persisted system configuration `name`.
pub async fn remove_system_config(&mut self, name: &str) {
let key = ServerConfigurationKey {
name: name.to_string(),
};
self.system_configurations.delete(|k, _v| k == &key);
}

/// Removes all persisted system configurations.
pub async fn clear_system_configs(&mut self) {
self.system_configurations.delete(|_k, _v| true);
}

pub fn remove_timestamp(&mut self, timeline: Timeline) {
let timeline_str = timeline.to_string();
let n = self.timestamps.delete(|k, _v| k.id == timeline_str).len();
Expand Down Expand Up @@ -1333,6 +1384,13 @@ impl<'a, S: Append> Transaction<'a, S> {
self.system_gid_mapping.pending(),
)
.await?;
add_batch(
self.stash,
&mut batches,
&COLLECTION_SYSTEM_CONFIGURATION,
self.system_configurations.pending(),
)
.await?;
add_batch(
self.stash,
&mut batches,
Expand Down Expand Up @@ -1404,6 +1462,7 @@ pub async fn initialize_stash<S: Append>(stash: &mut S) -> Result<(), Error> {
add_batch(stash, &mut batches, &COLLECTION_ITEM).await?;
add_batch(stash, &mut batches, &COLLECTION_ROLE).await?;
add_batch(stash, &mut batches, &COLLECTION_TIMESTAMP).await?;
add_batch(stash, &mut batches, &COLLECTION_SYSTEM_CONFIGURATION).await?;
add_batch(stash, &mut batches, &COLLECTION_AUDIT_LOG).await?;
add_batch(stash, &mut batches, &COLLECTION_STORAGE_USAGE).await?;
stash.append(batches).await.map_err(|e| e.into())
Expand Down Expand Up @@ -1648,6 +1707,20 @@ struct TimestampValue {
}
impl_codec!(TimestampValue);

#[derive(Clone, Message, PartialOrd, PartialEq, Eq, Ord, Hash)]
struct ServerConfigurationKey {
#[prost(string)]
name: String,
}
impl_codec!(ServerConfigurationKey);

#[derive(Clone, Message, PartialOrd, PartialEq, Eq, Ord)]
struct ServerConfigurationValue {
#[prost(bytes)]
value: Vec<u8>,
}
impl_codec!(ServerConfigurationValue);

static COLLECTION_CONFIG: TypedCollection<String, ConfigValue> = TypedCollection::new("config");
static COLLECTION_SETTING: TypedCollection<SettingKey, SettingValue> =
TypedCollection::new("setting");
Expand All @@ -1672,6 +1745,10 @@ static COLLECTION_ITEM: TypedCollection<ItemKey, ItemValue> = TypedCollection::n
static COLLECTION_ROLE: TypedCollection<RoleKey, RoleValue> = TypedCollection::new("role");
static COLLECTION_TIMESTAMP: TypedCollection<TimestampKey, TimestampValue> =
TypedCollection::new("timestamp");
static COLLECTION_SYSTEM_CONFIGURATION: TypedCollection<
ServerConfigurationKey,
ServerConfigurationValue,
> = TypedCollection::new("system_configuration");
static COLLECTION_AUDIT_LOG: TypedCollection<AuditLogKey, ()> = TypedCollection::new("audit_log");
static COLLECTION_STORAGE_USAGE: TypedCollection<StorageMetricsKey, ()> =
TypedCollection::new("storage_usage");
1 change: 1 addition & 0 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ impl SessionClient {
| ExecuteResponse::Updated(_)
| ExecuteResponse::AlteredObject(_)
| ExecuteResponse::AlteredIndexLogicalCompaction
| ExecuteResponse::AlteredSystemConfiguraion
| ExecuteResponse::Deallocate { all: _ }
| ExecuteResponse::Prepare => {
results.push(SimpleResult::Ok);
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ pub enum ExecuteResponse {
AlteredObject(ObjectType),
// The index was altered.
AlteredIndexLogicalCompaction,
// The system configuration was altered.
AlteredSystemConfiguraion,
// The query was canceled.
Canceled,
/// The requested cursor was closed.
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl<S: Append + 'static> Coordinator<S> {
Statement::AlterIndex(_)
| Statement::AlterSecret(_)
| Statement::AlterObjectRename(_)
| Statement::AlterSystem(_)
| Statement::CreateConnection(_)
| Statement::CreateDatabase(_)
| Statement::CreateIndex(_)
Expand Down
32 changes: 28 additions & 4 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ use mz_sql::catalog::{CatalogComputeInstance, CatalogError, CatalogItemType, Cat
use mz_sql::names::QualifiedObjectName;
use mz_sql::plan::{
AlterIndexResetOptionsPlan, AlterIndexSetOptionsPlan, AlterItemRenamePlan, AlterSecretPlan,
ComputeInstanceReplicaConfig, CreateComputeInstancePlan, CreateComputeInstanceReplicaPlan,
CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, CreateSourcePlan,
CreateTablePlan, CreateTypePlan, CreateViewPlan, CreateViewsPlan,
AlterSystemPlan, ComputeInstanceReplicaConfig, CreateComputeInstancePlan,
CreateComputeInstanceReplicaPlan, CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan,
CreateMaterializedViewPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, CreateViewsPlan,
DropComputeInstanceReplicaPlan, DropComputeInstancesPlan, DropDatabasePlan, DropItemsPlan,
DropRolesPlan, DropSchemaPlan, ExecutePlan, ExplainPlan, ExplainPlanNew, ExplainPlanOld,
FetchPlan, HirRelationExpr, IndexOption, InsertPlan, MaterializedView, MutationKind,
Expand Down Expand Up @@ -279,6 +279,9 @@ impl<S: Append + 'static> Coordinator<S> {
Plan::AlterSecret(plan) => {
tx.send(self.sequence_alter_secret(&session, plan).await, session);
}
Plan::AlterSystem(plan) => {
tx.send(self.sequence_alter_system(&session, plan).await, session);
}
Plan::DiscardTemp => {
self.drop_temp_items(&session).await;
tx.send(Ok(ExecuteResponse::DiscardedTemp), session);
Expand Down Expand Up @@ -3106,6 +3109,27 @@ impl<S: Append + 'static> Coordinator<S> {
return Ok(Vec::from(payload));
}

async fn sequence_alter_system(
&mut self,
session: &Session,
AlterSystemPlan { name, value }: AlterSystemPlan,
) -> Result<ExecuteResponse, AdapterError> {
use mz_sql::ast::SetVariableValue;
// TODO(jkosh44) name and value should be validated against some predetermined list of
// system configurations.
let value = match value {
SetVariableValue::Literal(value) => value,
SetVariableValue::Ident(ident) => ident.into(),
SetVariableValue::Default => {
return Err(AdapterError::Unsupported("default system configurations"))
}
};
let op = catalog::Op::UpdateServerConfiguration { name, value };
self.catalog_transact(Some(session), vec![op], |_| Ok(()))
.await?;
Ok(ExecuteResponse::AlteredSystemConfiguraion)
}

// Returns the name of the portal to execute.
fn sequence_execute(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions src/pgwire/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ where
ExecuteResponse::Updated(n) => command_complete!("UPDATE {}", n),
ExecuteResponse::AlteredObject(o) => command_complete!("ALTER {}", o),
ExecuteResponse::AlteredIndexLogicalCompaction => command_complete!("ALTER INDEX"),
ExecuteResponse::AlteredSystemConfiguraion => command_complete!("ALTER SYSTEM"),
ExecuteResponse::Prepare => command_complete!("PREPARE"),
ExecuteResponse::Deallocate { all } => {
command_complete!("DEALLOCATE{}", if all { " ALL" } else { "" })
Expand Down
1 change: 1 addition & 0 deletions src/sql-parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ anyhow = "1.0.58"
itertools = "0.10.3"
mz-ore = { path = "../ore", default-features = false, features = ["stack"] }
phf = { version = "0.11.0", features = ["uncased"] }
serde = { version = "1.0.140", features = ["derive"] }
tracing = "0.1.35"
uncased = "0.9.7"

Expand Down
20 changes: 20 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum Statement<T: AstInfo> {
AlterObjectRename(AlterObjectRenameStatement),
AlterIndex(AlterIndexStatement<T>),
AlterSecret(AlterSecretStatement<T>),
AlterSystem(AlterSystemStatement),
Discard(DiscardStatement),
DropDatabase(DropDatabaseStatement),
DropSchema(DropSchemaStatement),
Expand Down Expand Up @@ -118,6 +119,7 @@ impl<T: AstInfo> AstDisplay for Statement<T> {
Statement::AlterObjectRename(stmt) => f.write_node(stmt),
Statement::AlterIndex(stmt) => f.write_node(stmt),
Statement::AlterSecret(stmt) => f.write_node(stmt),
Statement::AlterSystem(stmt) => f.write_node(stmt),
Statement::Discard(stmt) => f.write_node(stmt),
Statement::DropDatabase(stmt) => f.write_node(stmt),
Statement::DropSchema(stmt) => f.write_node(stmt),
Expand Down Expand Up @@ -2447,6 +2449,24 @@ impl AstDisplay for NoticeSeverity {
}
impl_display!(NoticeSeverity);

/// `ALTER SYSTEM ...`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AlterSystemStatement {
pub name: Ident,
pub value: SetVariableValue,
}

impl AstDisplay for AlterSystemStatement {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("ALTER SYSTEM SET ");
f.write_node(&self.name);
f.write_str(" = ");
f.write_node(&self.value);
}
}

impl_display!(AlterSystemStatement);

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum AsOf<T: AstInfo> {
At(Expr<T>),
Expand Down
Loading

0 comments on commit 064b435

Please sign in to comment.