Skip to content

Commit

Permalink
feat: add schema and catalog key migration tool (GreptimeTeam#2048)
Browse files Browse the repository at this point in the history
* feat: add schema and catalog key migration tool

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Aug 7, 2023
1 parent d5cadee commit c8cb1ef
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 39 deletions.
195 changes: 156 additions & 39 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use clap::Parser;
use common_meta::helper::TableGlobalValue;
use common_meta::error as MetaError;
use common_meta::helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue};
use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue};
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue};
use common_meta::key::TableMetaKey;
use common_meta::rpc::store::{
BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest, RangeResponse,
};
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
use common_meta::rpc::KeyValue;
use common_meta::util::get_prefix_end_key;
use common_telemetry::info;
use etcd_client::Client;
use futures::TryStreamExt;
use meta_srv::service::store::etcd::EtcdStore;
use meta_srv::service::store::kv::KvStoreRef;
use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef};
use snafu::ResultExt;

use crate::cli::{Instance, Tool};
use crate::error::{ConnectEtcdSnafu, Result};
use crate::error::{self, ConnectEtcdSnafu, Result};

#[derive(Debug, Default, Parser)]
pub struct UpgradeCommand {
Expand Down Expand Up @@ -63,56 +69,167 @@ struct MigrateTableMetadata {

#[async_trait]
impl Tool for MigrateTableMetadata {
// migrates database's metadata from 0.3 to 0.4.
async fn do_work(&self) -> Result<()> {
let mut key = b"__tg".to_vec();
self.migrate_table_global_values().await?;
self.migrate_catalog_keys().await?;
self.migrate_schema_keys().await?;
Ok(())
}
}

const PAGE_SIZE: usize = 1000;

impl MigrateTableMetadata {
async fn migrate_schema_keys(&self) -> Result<()> {
// The schema key prefix.
let key = b"__s".to_vec();
let range_end = get_prefix_end_key(&key);

let mut processed_keys = 0;
loop {
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut keys = Vec::new();
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
let key_str =
std::str::from_utf8(&kv.key).context(MetaError::ConvertRawKeySnafu)?;
let key = v1SchemaKey::parse(key_str)
.unwrap_or_else(|e| panic!("schema key is corrupted: {e}, key: {key_str}"));

let req = RangeRequest::new()
.with_range(key, range_end.clone())
.with_limit(1000);
let resp = self.etcd_store.range(req).await.unwrap();
for kv in resp.kvs.iter() {
let key = String::from_utf8_lossy(kv.key());
let value = TableGlobalValue::from_bytes(kv.value())
.unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}"));
Ok((key, ()))
}),
);
while let Some((key, _)) = stream.try_next().await.context(error::IterStreamSnafu)? {
let _ = self.migrate_schema_key(&key).await;
keys.push(key.to_string().as_bytes().to_vec());
}
info!("Total migrated SchemaKeys: {}", keys.len());
self.delete_migrated_keys(keys).await;

self.create_table_name_key(&value).await;
Ok(())
}

self.create_datanode_table_keys(&value).await;
async fn migrate_schema_key(&self, key: &v1SchemaKey) -> Result<()> {
let new_key = SchemaNameKey::new(&key.catalog_name, &key.schema_name);
let schema_name_value = SchemaNameValue;

self.split_table_global_value(&key, value).await;
}
info!("Creating '{new_key}'");

self.delete_migrated_keys(&resp).await;
if self.dryrun {
info!("Dryrun: do nothing");
} else {
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_value(schema_name_value.try_as_raw_value().unwrap()),
)
.await
.unwrap();
}

Ok(())
}

processed_keys += resp.kvs.len();
async fn migrate_catalog_keys(&self) -> Result<()> {
// The catalog key prefix.
let key = b"__c".to_vec();
let range_end = get_prefix_end_key(&key);

if resp.more {
key = get_prefix_end_key(resp.kvs.last().unwrap().key());
} else {
break;
}
let mut keys = Vec::new();
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
let key_str =
std::str::from_utf8(&kv.key).context(MetaError::ConvertRawKeySnafu)?;
let key = v1CatalogKey::parse(key_str)
.unwrap_or_else(|e| panic!("catalog key is corrupted: {e}, key: {key_str}"));

Ok((key, ()))
}),
);
while let Some((key, _)) = stream.try_next().await.context(error::IterStreamSnafu)? {
let _ = self.migrate_catalog_key(&key).await;
keys.push(key.to_string().as_bytes().to_vec());
}
info!("Total migrated TableGlobalKeys: {processed_keys}");
info!("Total migrated CatalogKeys: {}", keys.len());
self.delete_migrated_keys(keys).await;

Ok(())
}
}

impl MigrateTableMetadata {
async fn delete_migrated_keys(&self, resp: &RangeResponse) {
info!("Deleting {} TableGlobalKeys", resp.kvs.len());
let req = BatchDeleteRequest {
keys: resp.kvs.iter().map(|kv| kv.key().to_vec()).collect(),
prev_kv: false,
};
async fn migrate_catalog_key(&self, key: &v1CatalogKey) {
let new_key = CatalogNameKey::new(&key.catalog_name);
let catalog_name_value = CatalogNameValue;

info!("Creating '{new_key}'");

if self.dryrun {
info!("Dryrun: do nothing");
} else {
self.etcd_store.batch_delete(req).await.unwrap();
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_value(catalog_name_value.try_as_raw_value().unwrap()),
)
.await
.unwrap();
}
}

async fn migrate_table_global_values(&self) -> Result<()> {
let key = b"__tg".to_vec();
let range_end = get_prefix_end_key(&key);

let mut keys = Vec::new();

info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
RangeRequest::new().with_range(key, range_end.clone()),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
let key = String::from_utf8_lossy(kv.key()).to_string();
let value = TableGlobalValue::from_bytes(kv.value())
.unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}"));

Ok((key, value))
}),
);
while let Some((key, value)) = stream.try_next().await.context(error::IterStreamSnafu)? {
self.create_table_name_key(&value).await;

self.create_datanode_table_keys(&value).await;

self.split_table_global_value(&key, value).await;

keys.push(key.as_bytes().to_vec());
}

info!("Total migrated TableGlobalKeys: {}", keys.len());
self.delete_migrated_keys(keys).await;

Ok(())
}

async fn delete_migrated_keys(&self, keys: Vec<Vec<u8>>) {
for keys in keys.chunks(PAGE_SIZE) {
info!("Deleting {} TableGlobalKeys", keys.len());
let req = BatchDeleteRequest {
keys: keys.to_vec(),
prev_kv: false,
};
if self.dryrun {
info!("Dryrun: do nothing");
} else {
self.etcd_store.batch_delete(req).await.unwrap();
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ use snafu::{Location, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to iter stream, source: {}", source))]
IterStream {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to start datanode, source: {}", source))]
StartDatanode {
location: Location,
Expand Down Expand Up @@ -176,6 +182,7 @@ impl ErrorExt for Error {
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::IterStream { source, .. } => source.status_code(),
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn get_next_prefix_key(key: &[u8]) -> Vec<u8> {

#[cfg(test)]
mod tests {

use super::*;

#[test]
Expand Down

0 comments on commit c8cb1ef

Please sign in to comment.