Skip to content

Commit

Permalink
coord: Apply schema migrations to builtin tables and views (Materiali…
Browse files Browse the repository at this point in the history
…zeInc#11741)

This commit adds the functionality to be able to migrate the schema of
builtin objects. It accomplishes this by dropping all objects that have
changed schema, along with all objects that depend on the original
object. Then it recreates all dropped objects with new GlobalIds.

Previously in the single binary world, schema migrations were
accomplished by restarting the whole system. In the platform world,
each component restarts independently which is why that approach no
longer worked.

Works towards resolving #11435
  • Loading branch information
jkosh44 authored Jul 29, 2022
1 parent a7f91bc commit 9c79808
Show file tree
Hide file tree
Showing 10 changed files with 628 additions and 59 deletions.
348 changes: 318 additions & 30 deletions src/adapter/src/catalog.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/adapter/src/catalog/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub enum ErrorKind {
this_version: &'static str,
cause: String,
},
#[error("failed to migrate schema of builtin objects: {0}")]
FailedBuiltinSchemaMigration(String),
#[error("failpoint {0} reached)")]
FailpointReached(String),
#[error("{0}")]
Expand Down
65 changes: 52 additions & 13 deletions src/adapter/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::hash::Hash;
use std::iter::once;

use bytes::BufMut;
use itertools::max;
use itertools::{max, Itertools};
use prost::{self, Message};
use serde_json::json;
use timely::progress::Timestamp;
Expand Down Expand Up @@ -41,6 +41,7 @@ use mz_storage::types::sources::Timeline;
use crate::catalog::builtin::BuiltinLog;
use crate::catalog::error::{Error, ErrorKind};
use crate::catalog::SerializedComputeInstanceReplicaConfig;
use crate::catalog::SystemObjectMapping;

const USER_VERSION: &str = "user_version";

Expand Down Expand Up @@ -584,34 +585,38 @@ impl<S: Append> Connection<S> {
.collect())
}

/// Persist mapping from system objects to global IDs. Each element of `mappings` should be
/// (schema-name, object-name, global-id).
/// Persist mapping from system objects to global IDs and fingerprints.
///
/// Panics if provided id is not a system id
pub async fn set_system_gids(
/// Panics if provided id is not a system id.
pub async fn set_system_object_mapping(
&mut self,
mappings: Vec<(&str, &str, GlobalId, u64)>,
mappings: Vec<SystemObjectMapping>,
) -> Result<(), Error> {
if mappings.is_empty() {
return Ok(());
}

let mappings = mappings
.into_iter()
.map(|(schema_name, object_name, id, fingerprint)| {
let mappings = mappings.into_iter().map(
|SystemObjectMapping {
schema_name,
object_name,
id,
fingerprint,
}| {
let id = if let GlobalId::System(id) = id {
id
} else {
panic!("non-system id provided")
};
(
GidMappingKey {
schema_name: schema_name.to_string(),
object_name: object_name.to_string(),
schema_name,
object_name,
},
GidMappingValue { id, fingerprint },
)
});
},
);
COLLECTION_SYSTEM_GID_MAPPING
.upsert(&mut self.stash, mappings)
.await
Expand Down Expand Up @@ -1145,6 +1150,40 @@ impl<'a, S: Append> Transaction<'a, S> {
Ok(())
}

/// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
/// of `mappings` should be (old-global-id, new-system-object-mapping).
///
/// Panics if provided id is not a system id.
pub fn update_system_object_mappings(
&mut self,
mappings: &HashMap<GlobalId, SystemObjectMapping>,
) -> Result<(), Error> {
let n = self.system_gid_mapping.update(|_k, v| {
if let Some(mapping) = mappings.get(&GlobalId::System(v.id)) {
let id = if let GlobalId::System(id) = mapping.id {
id
} else {
panic!("non-system id provided")
};
Some(GidMappingValue {
id,
fingerprint: mapping.fingerprint,
})
} else {
None
}
})?;

if usize::try_from(n).expect("update diff should fit into usize") != mappings.len() {
let id_str = mappings.keys().map(|id| id.to_string()).join(",");
return Err(Error {
kind: ErrorKind::FailedBuiltinSchemaMigration(id_str),
});
}

Ok(())
}

pub fn remove_timestamp(&mut self, timeline: Timeline) {
let timeline_str = timeline.to_string();
let n = self.timestamps.delete(|k, _v| k.id == timeline_str).len();
Expand Down
68 changes: 52 additions & 16 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ use mz_transform::Optimizer;

use crate::catalog::builtin::{BUILTINS, MZ_VIEW_FOREIGN_KEYS, MZ_VIEW_KEYS};
use crate::catalog::{
self, storage, BuiltinTableUpdate, Catalog, CatalogItem, ClusterReplicaSizeMap, Sink,
SinkConnectionState, StorageHostSizeMap,
self, storage, BuiltinMigrationMetadata, BuiltinTableUpdate, Catalog, CatalogItem,
ClusterReplicaSizeMap, Sink, SinkConnectionState, StorageHostSizeMap,
};
use crate::client::{Client, ConnectionId, Handle};
use crate::command::{Canceled, Command, ExecuteResponse};
Expand Down Expand Up @@ -323,6 +323,7 @@ impl<S: Append + 'static> Coordinator<S> {
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn bootstrap(
&mut self,
builtin_migration_metadata: BuiltinMigrationMetadata,
mut builtin_table_updates: Vec<BuiltinTableUpdate>,
) -> Result<(), AdapterError> {
let mut persisted_log_ids = vec![];
Expand Down Expand Up @@ -362,6 +363,39 @@ impl<S: Append + 'static> Coordinator<S> {
)
.await;

// Migrate builtin objects.
for (compute_id, sink_ids) in builtin_migration_metadata.previous_sink_ids {
self.controller
.compute_mut(compute_id)
.unwrap()
.drop_sinks_unvalidated(sink_ids)
.await?;
}
for (compute_id, index_ids) in builtin_migration_metadata.previous_index_ids {
self.controller
.compute_mut(compute_id)
.unwrap()
.drop_indexes_unvalidated(index_ids)
.await?;
}
for (compute_id, recorded_view_ids) in
builtin_migration_metadata.previous_materialized_view_ids
{
self.controller
.compute_mut(compute_id)
.unwrap()
.drop_sinks_unvalidated(recorded_view_ids.clone())
.await?;
self.controller
.storage_mut()
.drop_sources_unvalidated(recorded_view_ids)
.await?;
}
self.controller
.storage_mut()
.drop_sources_unvalidated(builtin_migration_metadata.previous_source_ids)
.await?;

let mut entries: Vec<_> = self.catalog.entries().cloned().collect();
// Topologically sort entries based on the used_by relationship
entries.sort_unstable_by(|a, b| {
Expand Down Expand Up @@ -799,19 +833,20 @@ pub async fn serve<S: Append + 'static>(
// Coordinator::sequence_create_compute_instance_replica.
availability_zones.shuffle(&mut rand::thread_rng());

let (mut catalog, builtin_table_updates) = Catalog::open(catalog::Config {
storage,
unsafe_mode,
build_info,
now: now.clone(),
skip_migrations: false,
metrics_registry: &metrics_registry,
cluster_replica_sizes,
storage_host_sizes,
default_storage_host_size,
availability_zones,
})
.await?;
let (mut catalog, builtin_migration_metadata, builtin_table_updates) =
Catalog::open(catalog::Config {
storage,
unsafe_mode,
build_info,
now: now.clone(),
skip_migrations: false,
metrics_registry: &metrics_registry,
cluster_replica_sizes,
storage_host_sizes,
default_storage_host_size,
availability_zones,
})
.await?;
let cluster_id = catalog.config().cluster_id;
let session_id = catalog.config().session_id;
let start_instant = catalog.config().start_instant;
Expand Down Expand Up @@ -878,7 +913,8 @@ pub async fn serve<S: Append + 'static>(
connection_context,
transient_replica_metadata: HashMap::new(),
};
let bootstrap = handle.block_on(coord.bootstrap(builtin_table_updates));
let bootstrap =
handle.block_on(coord.bootstrap(builtin_migration_metadata, builtin_table_updates));
let ok = bootstrap.is_ok();
bootstrap_tx.send(bootstrap).unwrap();
if ok {
Expand Down
20 changes: 20 additions & 0 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use std::num::TryFromIntError;
use dec::TryFromDecimalError;
use tokio::sync::oneshot;

use mz_compute_client::controller::ComputeError;
use mz_expr::{EvalError, UnmaterializableFunc};
use mz_ore::stack::RecursionLimitError;
use mz_ore::str::StrExt;
use mz_repr::explain_new::ExplainError;
use mz_repr::NotNullViolation;
use mz_sql::plan::PlanError;
use mz_sql::query_model::QGMError;
use mz_storage::controller::StorageError;
use mz_transform::TransformError;

use crate::catalog;
Expand Down Expand Up @@ -157,6 +159,10 @@ pub enum AdapterError {
WriteOnlyTransaction,
/// The transaction only supports single table writes
MultiTableWriteTransaction,
/// An error occurred in the storage layer
Storage(mz_storage::controller::StorageError),
/// An error occurred in the compute layer
Compute(mz_compute_client::controller::ComputeError),
}

impl AdapterError {
Expand Down Expand Up @@ -392,6 +398,8 @@ impl fmt::Display for AdapterError {
AdapterError::MultiTableWriteTransaction => {
f.write_str("write transactions only support writes to a single table")
}
AdapterError::Storage(e) => e.fmt(f),
AdapterError::Compute(e) => e.fmt(f),
}
}
}
Expand Down Expand Up @@ -477,4 +485,16 @@ impl From<oneshot::error::RecvError> for AdapterError {
}
}

impl From<StorageError> for AdapterError {
fn from(e: StorageError) -> Self {
AdapterError::Storage(e)
}
}

impl From<ComputeError> for AdapterError {
fn from(e: ComputeError) -> Self {
AdapterError::Compute(e)
}
}

impl Error for AdapterError {}
64 changes: 64 additions & 0 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,27 @@ where
self.allow_compaction(compaction_commands).await?;
Ok(())
}
/// Drops the read capability for the sinks and allows their resources to be reclaimed.
/// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
/// controller starts/restarts it has no durable state. That means that it has no way of
/// remembering any past commands sent. In the future we plan on persisting state for the
/// controller so that it is aware of past commands.
/// Therefore this method is for dropping sinks that we know to have been previously
/// created, but have been forgotten by the controller due to a restart.
/// Once command history becomes durable we can remove this method and use the normal
/// `drop_sinks`.
pub async fn drop_sinks_unvalidated(
&mut self,
identifiers: Vec<GlobalId>,
) -> Result<(), ComputeError> {
let compaction_commands = identifiers
.into_iter()
.map(|id| (id, Antichain::new()))
.collect();
self.allow_compaction_unvalidated(compaction_commands)
.await?;
Ok(())
}
/// Drops the read capability for the indexes and allows their resources to be reclaimed.
pub async fn drop_indexes(&mut self, identifiers: Vec<GlobalId>) -> Result<(), ComputeError> {
// Validate that the ids exist.
Expand All @@ -488,6 +509,27 @@ where
self.allow_compaction(compaction_commands).await?;
Ok(())
}
/// Drops the read capability for the indexes and allows their resources to be reclaimed.
/// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
/// controller starts/restarts it has no durable state. That means that it has no way of
/// remembering any past commands sent. In the future we plan on persisting state for the
/// controller so that it is aware of past commands.
/// Therefore this method is for dropping indexes that we know to have been previously
/// created, but have been forgotten by the controller due to a restart.
/// Once command history becomes durable we can remove this method and use the normal
/// `drop_indexes`.
pub async fn drop_indexes_unvalidated(
&mut self,
identifiers: Vec<GlobalId>,
) -> Result<(), ComputeError> {
let compaction_commands = identifiers
.into_iter()
.map(|id| (id, Antichain::new()))
.collect();
self.allow_compaction_unvalidated(compaction_commands)
.await?;
Ok(())
}
/// Initiate a peek request for the contents of `id` at `timestamp`.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn peek(
Expand Down Expand Up @@ -564,6 +606,28 @@ where
Ok(())
}

/// Downgrade the read capabilities of specific identifiers to specific frontiers.
///
/// Downgrading any read capability to the empty frontier will drop the item and eventually reclaim its resources.
/// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
/// controller starts/restarts it has no durable state. That means that it has no way of
/// remembering any past commands sent. In the future we plan on persisting state for the
/// controller so that it is aware of past commands.
/// Therefore this method is for allowing compaction on objects that we know to have been
/// previously created, but have been forgotten by the controller due to a restart.
/// Once command history becomes durable we can remove this method and use the normal
/// `allow_compaction`.
async fn allow_compaction_unvalidated(
&mut self,
frontiers: Vec<(GlobalId, Antichain<T>)>,
) -> Result<(), ComputeError> {
let policies = frontiers
.into_iter()
.map(|(id, frontier)| (id, ReadPolicy::ValidFrom(frontier)));
self.set_read_policy(policies.collect()).await?;
Ok(())
}

/// Assigns a read policy to specific identifiers.
///
/// The policies are assigned in the order presented, and repeated identifiers should
Expand Down
1 change: 1 addition & 0 deletions src/pgwire/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ impl ErrorResponse {
// code, so it's probably the best choice.
AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::MultiTableWriteTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::Storage(_) | AdapterError::Compute(_) => SqlState::INTERNAL_ERROR,
};
ErrorResponse {
severity,
Expand Down
Loading

0 comments on commit 9c79808

Please sign in to comment.