From c8cb1ef5bcf5ee3b6ce40b9696cdb51de6d19c57 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 7 Aug 2023 14:22:05 +0800 Subject: [PATCH] feat: add schema and catalog key migration tool (#2048) * feat: add schema and catalog key migration tool * chore: apply suggestions from CR --- src/cmd/src/cli/upgrade.rs | 195 ++++++++++++++++++++++++++++-------- src/cmd/src/error.rs | 7 ++ src/common/meta/src/util.rs | 1 + 3 files changed, 164 insertions(+), 39 deletions(-) diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 37504ecb8c12..18c203328c62 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -12,26 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use async_trait::async_trait; use clap::Parser; -use common_meta::helper::TableGlobalValue; +use common_meta::error as MetaError; +use common_meta::helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue}; +use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue}; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; +use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::table_info::{TableInfoKey, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameValue}; use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue}; use common_meta::key::TableMetaKey; -use common_meta::rpc::store::{ - BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest, RangeResponse, -}; +use common_meta::range_stream::PaginationStream; +use common_meta::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest}; +use common_meta::rpc::KeyValue; use common_meta::util::get_prefix_end_key; use common_telemetry::info; use etcd_client::Client; +use futures::TryStreamExt; use meta_srv::service::store::etcd::EtcdStore; -use meta_srv::service::store::kv::KvStoreRef; +use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef}; use snafu::ResultExt; use crate::cli::{Instance, Tool}; -use crate::error::{ConnectEtcdSnafu, Result}; +use crate::error::{self, ConnectEtcdSnafu, Result}; #[derive(Debug, Default, Parser)] pub struct UpgradeCommand { @@ -63,56 +69,167 @@ struct MigrateTableMetadata { #[async_trait] impl Tool for MigrateTableMetadata { + // migrates database's metadata from 0.3 to 0.4. async fn do_work(&self) -> Result<()> { - let mut key = b"__tg".to_vec(); + self.migrate_table_global_values().await?; + self.migrate_catalog_keys().await?; + self.migrate_schema_keys().await?; + Ok(()) + } +} + +const PAGE_SIZE: usize = 1000; + +impl MigrateTableMetadata { + async fn migrate_schema_keys(&self) -> Result<()> { + // The schema key prefix. + let key = b"__s".to_vec(); let range_end = get_prefix_end_key(&key); - let mut processed_keys = 0; - loop { - info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); + let mut keys = Vec::new(); + info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); + let mut stream = PaginationStream::new( + KvBackendAdapter::wrap(self.etcd_store.clone()), + RangeRequest::new().with_range(key, range_end), + PAGE_SIZE, + Arc::new(|kv: KeyValue| { + let key_str = + std::str::from_utf8(&kv.key).context(MetaError::ConvertRawKeySnafu)?; + let key = v1SchemaKey::parse(key_str) + .unwrap_or_else(|e| panic!("schema key is corrupted: {e}, key: {key_str}")); - let req = RangeRequest::new() - .with_range(key, range_end.clone()) - .with_limit(1000); - let resp = self.etcd_store.range(req).await.unwrap(); - for kv in resp.kvs.iter() { - let key = String::from_utf8_lossy(kv.key()); - let value = TableGlobalValue::from_bytes(kv.value()) - .unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}")); + Ok((key, ())) + }), + ); + while let Some((key, _)) = stream.try_next().await.context(error::IterStreamSnafu)? { + let _ = self.migrate_schema_key(&key).await; + keys.push(key.to_string().as_bytes().to_vec()); + } + info!("Total migrated SchemaKeys: {}", keys.len()); + self.delete_migrated_keys(keys).await; - self.create_table_name_key(&value).await; + Ok(()) + } - self.create_datanode_table_keys(&value).await; + async fn migrate_schema_key(&self, key: &v1SchemaKey) -> Result<()> { + let new_key = SchemaNameKey::new(&key.catalog_name, &key.schema_name); + let schema_name_value = SchemaNameValue; - self.split_table_global_value(&key, value).await; - } + info!("Creating '{new_key}'"); - self.delete_migrated_keys(&resp).await; + if self.dryrun { + info!("Dryrun: do nothing"); + } else { + self.etcd_store + .put( + PutRequest::new() + .with_key(new_key.as_raw_key()) + .with_value(schema_name_value.try_as_raw_value().unwrap()), + ) + .await + .unwrap(); + } + + Ok(()) + } - processed_keys += resp.kvs.len(); + async fn migrate_catalog_keys(&self) -> Result<()> { + // The catalog key prefix. + let key = b"__c".to_vec(); + let range_end = get_prefix_end_key(&key); - if resp.more { - key = get_prefix_end_key(resp.kvs.last().unwrap().key()); - } else { - break; - } + let mut keys = Vec::new(); + info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); + let mut stream = PaginationStream::new( + KvBackendAdapter::wrap(self.etcd_store.clone()), + RangeRequest::new().with_range(key, range_end), + PAGE_SIZE, + Arc::new(|kv: KeyValue| { + let key_str = + std::str::from_utf8(&kv.key).context(MetaError::ConvertRawKeySnafu)?; + let key = v1CatalogKey::parse(key_str) + .unwrap_or_else(|e| panic!("catalog key is corrupted: {e}, key: {key_str}")); + + Ok((key, ())) + }), + ); + while let Some((key, _)) = stream.try_next().await.context(error::IterStreamSnafu)? { + let _ = self.migrate_catalog_key(&key).await; + keys.push(key.to_string().as_bytes().to_vec()); } - info!("Total migrated TableGlobalKeys: {processed_keys}"); + info!("Total migrated CatalogKeys: {}", keys.len()); + self.delete_migrated_keys(keys).await; + Ok(()) } -} -impl MigrateTableMetadata { - async fn delete_migrated_keys(&self, resp: &RangeResponse) { - info!("Deleting {} TableGlobalKeys", resp.kvs.len()); - let req = BatchDeleteRequest { - keys: resp.kvs.iter().map(|kv| kv.key().to_vec()).collect(), - prev_kv: false, - }; + async fn migrate_catalog_key(&self, key: &v1CatalogKey) { + let new_key = CatalogNameKey::new(&key.catalog_name); + let catalog_name_value = CatalogNameValue; + + info!("Creating '{new_key}'"); + if self.dryrun { info!("Dryrun: do nothing"); } else { - self.etcd_store.batch_delete(req).await.unwrap(); + self.etcd_store + .put( + PutRequest::new() + .with_key(new_key.as_raw_key()) + .with_value(catalog_name_value.try_as_raw_value().unwrap()), + ) + .await + .unwrap(); + } + } + + async fn migrate_table_global_values(&self) -> Result<()> { + let key = b"__tg".to_vec(); + let range_end = get_prefix_end_key(&key); + + let mut keys = Vec::new(); + + info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); + let mut stream = PaginationStream::new( + KvBackendAdapter::wrap(self.etcd_store.clone()), + RangeRequest::new().with_range(key, range_end.clone()), + PAGE_SIZE, + Arc::new(|kv: KeyValue| { + let key = String::from_utf8_lossy(kv.key()).to_string(); + let value = TableGlobalValue::from_bytes(kv.value()) + .unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}")); + + Ok((key, value)) + }), + ); + while let Some((key, value)) = stream.try_next().await.context(error::IterStreamSnafu)? { + self.create_table_name_key(&value).await; + + self.create_datanode_table_keys(&value).await; + + self.split_table_global_value(&key, value).await; + + keys.push(key.as_bytes().to_vec()); + } + + info!("Total migrated TableGlobalKeys: {}", keys.len()); + self.delete_migrated_keys(keys).await; + + Ok(()) + } + + async fn delete_migrated_keys(&self, keys: Vec>) { + for keys in keys.chunks(PAGE_SIZE) { + info!("Deleting {} TableGlobalKeys", keys.len()); + let req = BatchDeleteRequest { + keys: keys.to_vec(), + prev_kv: false, + }; + if self.dryrun { + info!("Dryrun: do nothing"); + } else { + self.etcd_store.batch_delete(req).await.unwrap(); + } } } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index c291ef5d4c10..e288f57d1725 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -23,6 +23,12 @@ use snafu::{Location, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to iter stream, source: {}", source))] + IterStream { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to start datanode, source: {}", source))] StartDatanode { location: Location, @@ -176,6 +182,7 @@ impl ErrorExt for Error { Error::ShutdownMetaServer { source, .. } => source.status_code(), Error::BuildMetaServer { source, .. } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), + Error::IterStream { source, .. } => source.status_code(), Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } diff --git a/src/common/meta/src/util.rs b/src/common/meta/src/util.rs index 80742cece5c7..7a823aba9d30 100644 --- a/src/common/meta/src/util.rs +++ b/src/common/meta/src/util.rs @@ -39,6 +39,7 @@ pub fn get_next_prefix_key(key: &[u8]) -> Vec { #[cfg(test)] mod tests { + use super::*; #[test]