Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coord: Apply schema migrations to builtin tables and views #11741

Merged
merged 15 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 318 additions & 30 deletions src/adapter/src/catalog.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/adapter/src/catalog/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub enum ErrorKind {
this_version: &'static str,
cause: String,
},
#[error("failed to migrate schema of builtin objects: {0}")]
FailedBuiltinSchemaMigration(String),
#[error("failpoint {0} reached)")]
FailpointReached(String),
#[error("{0}")]
Expand Down
63 changes: 51 additions & 12 deletions src/adapter/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::hash::Hash;
use std::iter::once;

use bytes::BufMut;
use itertools::max;
use itertools::{max, Itertools};
use prost::{self, Message};
use serde_json::json;
use timely::progress::Timestamp;
Expand Down Expand Up @@ -41,6 +41,7 @@ use mz_storage::types::sources::Timeline;
use crate::catalog::builtin::BuiltinLog;
use crate::catalog::error::{Error, ErrorKind};
use crate::catalog::SerializedComputeInstanceReplicaConfig;
use crate::catalog::SystemObjectMapping;

const USER_VERSION: &str = "user_version";

Expand Down Expand Up @@ -584,34 +585,38 @@ impl<S: Append> Connection<S> {
.collect())
}

/// Persist mapping from system objects to global IDs. Each element of `mappings` should be
/// (schema-name, object-name, global-id).
/// Persist mapping from system objects to global IDs and fingerprints.
///
/// Panics if provided id is not a system id
pub async fn set_system_gids(
pub async fn set_system_object_mapping(
&mut self,
mappings: Vec<(&str, &str, GlobalId, u64)>,
mappings: Vec<SystemObjectMapping>,
) -> Result<(), Error> {
if mappings.is_empty() {
return Ok(());
}

let mappings = mappings
.into_iter()
.map(|(schema_name, object_name, id, fingerprint)| {
let mappings = mappings.into_iter().map(
|SystemObjectMapping {
schema_name,
object_name,
id,
fingerprint,
}| {
let id = if let GlobalId::System(id) = id {
id
} else {
panic!("non-system id provided")
};
(
GidMappingKey {
schema_name: schema_name.to_string(),
object_name: object_name.to_string(),
schema_name,
object_name,
},
GidMappingValue { id, fingerprint },
)
});
},
);
COLLECTION_SYSTEM_GID_MAPPING
.upsert(&mut self.stash, mappings)
.await
Expand Down Expand Up @@ -1145,6 +1150,40 @@ impl<'a, S: Append> Transaction<'a, S> {
Ok(())
}

/// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
/// of `mappings` should be (old-global-id, new-system-object-mapping).
///
/// Panics if provided id is not a system id
pub fn update_system_object_mappings(
&mut self,
mappings: &HashMap<GlobalId, SystemObjectMapping>,
) -> Result<(), Error> {
let n = self.system_gid_mapping.update(|_k, v| {
if let Some(mapping) = mappings.get(&GlobalId::System(v.id)) {
let id = if let GlobalId::System(id) = mapping.id {
id
} else {
panic!("non-system id provided")
};
Some(GidMappingValue {
id,
fingerprint: mapping.fingerprint,
})
} else {
None
}
})?;

if usize::try_from(n).expect("update diff should fit into usize") != mappings.len() {
let id_str = mappings.keys().map(|id| id.to_string()).join(",");
return Err(Error {
kind: ErrorKind::FailedBuiltinSchemaMigration(id_str),
});
}

Ok(())
}

pub fn remove_timestamp(&mut self, timeline: Timeline) {
let timeline_str = timeline.to_string();
let n = self.timestamps.delete(|k, _v| k.id == timeline_str).len();
Expand Down
68 changes: 52 additions & 16 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ use mz_transform::Optimizer;

use crate::catalog::builtin::{BUILTINS, MZ_VIEW_FOREIGN_KEYS, MZ_VIEW_KEYS};
use crate::catalog::{
self, storage, BuiltinTableUpdate, Catalog, CatalogItem, ClusterReplicaSizeMap, Sink,
SinkConnectionState, StorageHostSizeMap,
self, storage, BuiltinMigrationMetadata, BuiltinTableUpdate, Catalog, CatalogItem,
ClusterReplicaSizeMap, Sink, SinkConnectionState, StorageHostSizeMap,
};
use crate::client::{Client, ConnectionId, Handle};
use crate::command::{Canceled, Command, ExecuteResponse};
Expand Down Expand Up @@ -323,6 +323,7 @@ impl<S: Append + 'static> Coordinator<S> {
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn bootstrap(
&mut self,
builtin_migration_metadata: BuiltinMigrationMetadata,
mut builtin_table_updates: Vec<BuiltinTableUpdate>,
) -> Result<(), AdapterError> {
let mut persisted_log_ids = vec![];
Expand Down Expand Up @@ -362,6 +363,39 @@ impl<S: Append + 'static> Coordinator<S> {
)
.await;

// Migrate builtin objects
for (compute_id, sink_ids) in builtin_migration_metadata.previous_sink_ids {
self.controller
.compute_mut(compute_id)
.unwrap()
.drop_sinks_unvalidated(sink_ids)
.await?;
}
for (compute_id, index_ids) in builtin_migration_metadata.previous_index_ids {
self.controller
.compute_mut(compute_id)
.unwrap()
.drop_indexes_unvalidated(index_ids)
.await?;
}
for (compute_id, recorded_view_ids) in
builtin_migration_metadata.previous_materialized_view_ids
{
self.controller
.compute_mut(compute_id)
.unwrap()
.drop_sinks_unvalidated(recorded_view_ids.clone())
.await?;
self.controller
.storage_mut()
.drop_sources_unvalidated(recorded_view_ids)
.await?;
}
self.controller
.storage_mut()
.drop_sources_unvalidated(builtin_migration_metadata.previous_source_ids)
.await?;

let mut entries: Vec<_> = self.catalog.entries().cloned().collect();
// Topologically sort entries based on the used_by relationship
entries.sort_unstable_by(|a, b| {
Expand Down Expand Up @@ -799,19 +833,20 @@ pub async fn serve<S: Append + 'static>(
// Coordinator::sequence_create_compute_instance_replica.
availability_zones.shuffle(&mut rand::thread_rng());

let (mut catalog, builtin_table_updates) = Catalog::open(catalog::Config {
storage,
unsafe_mode,
build_info,
now: now.clone(),
skip_migrations: false,
metrics_registry: &metrics_registry,
cluster_replica_sizes,
storage_host_sizes,
default_storage_host_size,
availability_zones,
})
.await?;
let (mut catalog, builtin_migration_metadata, builtin_table_updates) =
Catalog::open(catalog::Config {
storage,
unsafe_mode,
build_info,
now: now.clone(),
skip_migrations: false,
metrics_registry: &metrics_registry,
cluster_replica_sizes,
storage_host_sizes,
default_storage_host_size,
availability_zones,
})
.await?;
let cluster_id = catalog.config().cluster_id;
let session_id = catalog.config().session_id;
let start_instant = catalog.config().start_instant;
Expand Down Expand Up @@ -878,7 +913,8 @@ pub async fn serve<S: Append + 'static>(
connection_context,
transient_replica_metadata: HashMap::new(),
};
let bootstrap = handle.block_on(coord.bootstrap(builtin_table_updates));
let bootstrap =
handle.block_on(coord.bootstrap(builtin_migration_metadata, builtin_table_updates));
let ok = bootstrap.is_ok();
bootstrap_tx.send(bootstrap).unwrap();
if ok {
Expand Down
20 changes: 20 additions & 0 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use std::num::TryFromIntError;
use dec::TryFromDecimalError;
use tokio::sync::oneshot;

use mz_compute_client::controller::ComputeError;
use mz_expr::{EvalError, UnmaterializableFunc};
use mz_ore::stack::RecursionLimitError;
use mz_ore::str::StrExt;
use mz_repr::explain_new::ExplainError;
use mz_repr::NotNullViolation;
use mz_sql::plan::PlanError;
use mz_sql::query_model::QGMError;
use mz_storage::controller::StorageError;
use mz_transform::TransformError;

use crate::catalog;
Expand Down Expand Up @@ -157,6 +159,10 @@ pub enum AdapterError {
WriteOnlyTransaction,
/// The transaction only supports single table writes
MultiTableWriteTransaction,
/// An error occurred in the storage layer
Storage(mz_storage::controller::StorageError),
/// An error occurred in the compute layer
Compute(mz_compute_client::controller::ComputeError),
}

impl AdapterError {
Expand Down Expand Up @@ -392,6 +398,8 @@ impl fmt::Display for AdapterError {
AdapterError::MultiTableWriteTransaction => {
f.write_str("write transactions only support writes to a single table")
}
AdapterError::Storage(e) => e.fmt(f),
AdapterError::Compute(e) => e.fmt(f),
}
}
}
Expand Down Expand Up @@ -477,4 +485,16 @@ impl From<oneshot::error::RecvError> for AdapterError {
}
}

impl From<StorageError> for AdapterError {
fn from(e: StorageError) -> Self {
AdapterError::Storage(e)
}
}

impl From<ComputeError> for AdapterError {
fn from(e: ComputeError) -> Self {
AdapterError::Compute(e)
}
}

impl Error for AdapterError {}
64 changes: 64 additions & 0 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,27 @@ where
self.allow_compaction(compaction_commands).await?;
Ok(())
}
/// Drops the read capability for the sinks and allows their resources to be reclaimed.
/// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
/// controller starts/restarts it has no durable state. That means that it has no way of
/// remembering any past commands sent. In the future we plan on persisting state for the
/// controller so that it is aware of past commands.
/// Therefore this method is for dropping sinks that we know to have been previously
/// created, but have been forgotten by the controller due to a restart.
/// Once command history becomes durable we can remove this method and use the normal
/// `drop_sinks`
pub async fn drop_sinks_unvalidated(
&mut self,
identifiers: Vec<GlobalId>,
) -> Result<(), ComputeError> {
let compaction_commands = identifiers
.into_iter()
.map(|id| (id, Antichain::new()))
.collect();
self.allow_compaction_unvalidated(compaction_commands)
.await?;
Ok(())
}
/// Drops the read capability for the indexes and allows their resources to be reclaimed.
pub async fn drop_indexes(&mut self, identifiers: Vec<GlobalId>) -> Result<(), ComputeError> {
// Validate that the ids exist.
Expand All @@ -488,6 +509,27 @@ where
self.allow_compaction(compaction_commands).await?;
Ok(())
}
/// Drops the read capability for the indexes and allows their resources to be reclaimed.
/// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
/// controller starts/restarts it has no durable state. That means that it has no way of
/// remembering any past commands sent. In the future we plan on persisting state for the
/// controller so that it is aware of past commands.
/// Therefore this method is for dropping indexes that we know to have been previously
/// created, but have been forgotten by the controller due to a restart.
/// Once command history becomes durable we can remove this method and use the normal
/// `drop_sinks`
pub async fn drop_indexes_unvalidated(
&mut self,
identifiers: Vec<GlobalId>,
) -> Result<(), ComputeError> {
let compaction_commands = identifiers
.into_iter()
.map(|id| (id, Antichain::new()))
.collect();
self.allow_compaction_unvalidated(compaction_commands)
.await?;
Ok(())
}
/// Initiate a peek request for the contents of `id` at `timestamp`.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn peek(
Expand Down Expand Up @@ -564,6 +606,28 @@ where
Ok(())
}

/// Downgrade the read capabilities of specific identifiers to specific frontiers.
///
/// Downgrading any read capability to the empty frontier will drop the item and eventually reclaim its resources.
/// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
/// controller starts/restarts it has no durable state. That means that it has no way of
/// remembering any past commands sent. In the future we plan on persisting state for the
/// controller so that it is aware of past commands.
/// Therefore this method is for allowing compaction on objects that we know to have been
/// previously created, but have been forgotten by the controller due to a restart.
/// Once command history becomes durable we can remove this method and use the normal
/// `allow_compaction`
async fn allow_compaction_unvalidated(
&mut self,
frontiers: Vec<(GlobalId, Antichain<T>)>,
) -> Result<(), ComputeError> {
let policies = frontiers
.into_iter()
.map(|(id, frontier)| (id, ReadPolicy::ValidFrom(frontier)));
self.set_read_policy(policies.collect()).await?;
Ok(())
}

/// Assigns a read policy to specific identifiers.
///
/// The policies are assigned in the order presented, and repeated identifiers should
Expand Down
1 change: 1 addition & 0 deletions src/pgwire/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ impl ErrorResponse {
// code, so it's probably the best choice.
AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::MultiTableWriteTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::Storage(_) | AdapterError::Compute(_) => SqlState::INTERNAL_ERROR,
};
ErrorResponse {
severity,
Expand Down
Loading