From ab75d768a8ba2b9acdd52fcded6e6ff5ed68fc66 Mon Sep 17 00:00:00 2001 From: Fabrice Clementz Date: Sun, 22 May 2022 23:48:23 +0200 Subject: [PATCH] =?UTF-8?q?chore:=20add=20a=20release=20script,=20version?= =?UTF-8?q?=20number=20in=20metadata.json=20file=20and=E2=80=A6=20(#129)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: add a release script, version number in metadata.json file and rename backups to dumps * refactor: add a trait to easily add more migrations --- Cargo.lock | 6 +- README.md | 2 +- docs/DESIGN.md | 8 +- dump-parser/Cargo.toml | 6 +- release.sh | 84 ++++++ replibyte/Cargo.toml | 2 +- replibyte/src/cli.rs | 33 ++- replibyte/src/commands/dump.rs | 17 +- replibyte/src/datastore/local_disk.rs | 244 ++++++++++----- replibyte/src/datastore/mod.rs | 60 ++-- replibyte/src/datastore/s3.rs | 280 ++++++++++++------ replibyte/src/main.rs | 8 + replibyte/src/migration/mod.rs | 246 +++++++++++++++ .../src/migration/rename_backups_to_dumps.rs | 95 ++++++ .../src/migration/update_version_number.rs | 84 ++++++ replibyte/src/tasks/full_dump.rs | 3 - replibyte/src/tasks/full_restore.rs | 11 +- replibyte/src/utils.rs | 4 + subset/Cargo.toml | 2 +- website/docs/faq.md | 4 +- .../docs/guides/deploy-replibyte/container.md | 2 +- 21 files changed, 971 insertions(+), 230 deletions(-) create mode 100755 release.sh create mode 100644 replibyte/src/migration/mod.rs create mode 100644 replibyte/src/migration/rename_backups_to_dumps.rs create mode 100644 replibyte/src/migration/update_version_number.rs diff --git a/Cargo.lock b/Cargo.lock index e9bc9983..b6e30244 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -909,7 +909,7 @@ checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" [[package]] name = "dump-parser" -version = "0.1.0" +version = "0.7.3" dependencies = [ "bson", "crc", @@ -2205,7 +2205,7 @@ dependencies = [ [[package]] name = "replibyte" -version = "0.1.0" +version = "0.7.3" dependencies = [ "aes-gcm", "anyhow", @@ -2693,7 +2693,7 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "subset" -version = "0.1.0" +version = "0.7.3" dependencies = [ "dump-parser", "md5", diff --git a/README.md b/README.md index be871533..9e165c07 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ replibyte -c conf.yaml dump restore remote -v latest ## Features -- [x] Support data backup and restore for PostgreSQL, MySQL and MongoDB +- [x] Support data dump and restore for PostgreSQL, MySQL and MongoDB - [x] Replace sensitive data with fake data - [x] Works on large database (> 10GB) - [x] Database Subsetting: Scale down a production database to a more reasonable size 🔥 diff --git a/docs/DESIGN.md b/docs/DESIGN.md index f5fd997a..b8de4a08 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -57,20 +57,20 @@ replicate the data in a stream of bytes and does not store anything on a local d ### Limitations - Tested with PostgreSQL 13 and 14. It should work with prior versions. -- RepliByte as not been designed to run multiple backups targeting the same Bridge. The Index File does not manage concurrent write (ATM). +- RepliByte as not been designed to run multiple dumps targeting the same Bridge. The Index File does not manage concurrent write (ATM). ### Index file structure -An index file describe the structure of your backups and all of them. +An index file describe the structure of your dumps and all of them. Here is the manifest file that you can find at the root of your target `Bridge` (E.g: S3). ```json { - "backups": [ + "dumps": [ { "size": 1024000, - "directory_name": "backup-{epoch timestamp}", + "directory_name": "dump-{epoch timestamp}", "created_at": "epoch timestamp", "compressed": true, "encrypted": true diff --git a/dump-parser/Cargo.toml b/dump-parser/Cargo.toml index 2762de7f..6de6d6a4 100644 --- a/dump-parser/Cargo.toml +++ b/dump-parser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dump-parser" -version = "0.1.0" +version = "0.7.3" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -9,11 +9,11 @@ edition = "2021" bson = "2.1" serde = "1.0" -########## WARNING ############# +########## WARNING ############# # DO NOT UPGRADE THE CRC CRATE # # version 2 (or higher) is not compatible with the current crc64 algorithm that 'mongorestore' uses in its archive parser. # mongorestore ECMA: https://go.dev/src/hash/crc64/crc64.go#L28 # crc-rs 1.8 ECMA: https://github.com/mrhooray/crc-rs/blob/1.8.1/build.rs#L41 (COMPATIBLE) # crc-rs ^2.0 ECMA: https://github.com/akhilles/crc-catalog/blob/2.0.1/src/catalog.rs#L104 (INCOMPATIBLE) -crc = "1.8" +crc = "1.8" ################################ diff --git a/release.sh b/release.sh new file mode 100755 index 00000000..0c90d8ab --- /dev/null +++ b/release.sh @@ -0,0 +1,84 @@ +#!/usr/bin/env bash + +set -e + +TOML_FILES="\ +replibyte/Cargo.toml \ +subset/Cargo.toml \ +dump-parser/Cargo.toml +" + +old=$1 +new=$2 + +if [ -z "${old}" ] || [ -z "${new}" ] +then + echo "please run: $0 " + exit 1 +fi + +if [ "$(git status --porcelain=v1 2>/dev/null | wc -l)" -ne 0 ] +then + git status + echo "There are unsaved changes in the repository, press CTRL-C to abort now or return to continue." + read -r answer +fi + +echo -n "Release process from starting from '${old}' -> '${new}', do you want to continue? [y/N] " +read -r answer + + +case "${answer}" in + Y*|y*) + ;; + *) + echo "Aborting" + exit 0 + ;; +esac; + +echo "==> ${answer}" + +echo -n "Updating TOML files:" +for toml in ${TOML_FILES} +do + echo -n " ${toml}" + sed -e "s/^version = \"${old}\"$/version = \"${new}\"/" -i.release "${toml}" +done +echo "." + +echo "Please review the following changes. (return to continue)" +read -r answer + +git diff + +echo "Do you want to Continue or Rollback? [c/R]" +read -r answer + +case "${answer}" in + C*|c*) + git checkout -b "release-v${new}" + git commit -sa -m "Release v${new}" + git push --set-upstream origin "release-v${new}" + ;; + *) + git checkout . + exit + ;; +esac; + +echo "Please open the following pull request we'll wait here continue when it is merged." +echo +echo " >> https://github.com/qovery/replibyte/pull/new/release-v${new} <<" +echo +echo "Once you continue we'll generate and push the release tag with the latest 'main'" +read -r answer + +echo "Generating release tag v${new}" + +git checkout main +git pull +git tag -a -m"Release v${new}" "v${new}" +git push --tags + +echo "Congrats release v${new} is done!" \ No newline at end of file diff --git a/replibyte/Cargo.toml b/replibyte/Cargo.toml index de478b66..2c488479 100644 --- a/replibyte/Cargo.toml +++ b/replibyte/Cargo.toml @@ -1,6 +1,6 @@ [package] edition = "2021" -version = "0.1.0" +version = "0.7.3" name = "replibyte" authors = ["Qovery Team", "Fab", "Benny", "Contributos"] diff --git a/replibyte/src/cli.rs b/replibyte/src/cli.rs index 5026c534..b1901a21 100644 --- a/replibyte/src/cli.rs +++ b/replibyte/src/cli.rs @@ -4,7 +4,8 @@ use clap::{Args, Parser, Subcommand}; /// Replibyte is a tool to seed your databases with your production data while keeping sensitive data safe, just pass `-h` #[derive(Parser, Debug)] -#[clap(about, long_about = None)] +#[clap(version, about, long_about = None)] +#[clap(propagate_version = true)] pub struct CLI { /// Replibyte configuration file #[clap(short, long, parse(from_os_str), value_name = "configuration file")] @@ -19,7 +20,7 @@ pub struct CLI { /// sub commands #[derive(Subcommand, Debug)] pub enum SubCommand { - /// all backup commands + /// all dump commands #[clap(subcommand)] Dump(DumpCommand), /// all transformers command @@ -30,14 +31,14 @@ pub enum SubCommand { /// all dump commands #[derive(Subcommand, Debug)] pub enum DumpCommand { - /// list available backups + /// list available dumps List, - /// launch backup -- use `-h` to show all the options + /// launch dump -- use `-h` to show all the options Create(DumpCreateArgs), /// all restore commands #[clap(subcommand)] Restore(RestoreCommand), - /// delete a backup from the defined datastore + /// delete a dump from the defined datastore Delete(DumpDeleteArgs), } @@ -51,28 +52,28 @@ pub enum TransformerCommand { /// all restore commands #[derive(Subcommand, Debug)] pub enum RestoreCommand { - /// Restore backup inside a local Docker container + /// Restore dump inside a local Docker container Local(RestoreLocalArgs), - /// Restore backup inside the configured destination + /// Restore dump inside the configured destination Remote(RestoreArgs), } /// all restore commands #[derive(Args, Debug)] pub struct RestoreArgs { - /// restore backup -- set `latest` or `` - use `backup list` command to list all backups available - #[clap(short, long, value_name = "[latest | backup name]")] + /// restore dump -- set `latest` or `` - use `dump list` command to list all dumps available + #[clap(short, long, value_name = "[latest | dump name]")] pub value: String, /// stream output on stdout #[clap(short, long)] pub output: bool, } -/// restore backup in a local Docker container +/// restore dump in a local Docker container #[derive(Args, Debug)] pub struct RestoreLocalArgs { - /// restore backup -- set `latest` or `` - use `backup list` command to list all backups available - #[clap(short, long, value_name = "[latest | backup name]")] + /// restore dump -- set `latest` or `` - use `dump list` command to list all dumps available + #[clap(short, long, value_name = "[latest | dump name]")] pub value: String, /// stream output on stdout #[clap(short, long)] @@ -91,7 +92,7 @@ pub struct RestoreLocalArgs { pub image: Option, } -/// all backup run commands +/// all dump run commands #[derive(Args, Debug)] pub struct DumpCreateArgs { #[clap(name = "source_type", short, long, value_name = "[postgresql | mysql | mongodb]", possible_values = &["postgresql", "mysql", "mongodb"], requires = "input")] @@ -111,13 +112,13 @@ pub struct DumpCreateArgs { #[derive(Args, Debug)] #[clap(group = clap::ArgGroup::new("delete-mode").multiple(false))] pub struct DumpDeleteArgs { - /// Name of the backup to delete + /// Name of the dump to delete #[clap(group = "delete-mode")] pub dump: Option, - /// Remove all backups older than the specified number of days. Example: `14d` for deleting backups older than 14 days + /// Remove all dumps older than the specified number of days. Example: `14d` for deleting dumps older than 14 days #[clap(long, group = "delete-mode")] pub older_than: Option, - /// Keep only the last N backups + /// Keep only the last N dumps #[clap(long, group = "delete-mode")] pub keep_last: Option, } diff --git a/replibyte/src/commands/dump.rs b/replibyte/src/commands/dump.rs index 42018d21..3ddea1e6 100644 --- a/replibyte/src/commands/dump.rs +++ b/replibyte/src/commands/dump.rs @@ -35,28 +35,27 @@ use clap::CommandFactory; /// List all dumps pub fn list(datastore: &mut Box) -> Result<(), Error> { - let _ = datastore.init()?; let mut index_file = datastore.index_file()?; - if index_file.backups.is_empty() { + if index_file.dumps.is_empty() { println!(" no dumps available\n"); return Ok(()); } - index_file.backups.sort_by(|a, b| a.cmp(b).reverse()); + index_file.dumps.sort_by(|a, b| a.cmp(b).reverse()); let mut table = table(); table.set_titles(row!["name", "size", "when", "compressed", "encrypted"]); let formatter = Formatter::new(); let now = epoch_millis(); - for backup in index_file.backups { + for dump in index_file.dumps { table.add_row(row![ - backup.directory_name.as_str(), - to_human_readable_unit(backup.size), - formatter.convert(Duration::from_millis((now - backup.created_at) as u64)), - backup.compressed, - backup.encrypted, + dump.directory_name.as_str(), + to_human_readable_unit(dump.size), + formatter.convert(Duration::from_millis((now - dump.created_at) as u64)), + dump.compressed, + dump.encrypted, ]); } diff --git a/replibyte/src/datastore/local_disk.rs b/replibyte/src/datastore/local_disk.rs index a18615ce..4717eb24 100644 --- a/replibyte/src/datastore/local_disk.rs +++ b/replibyte/src/datastore/local_disk.rs @@ -1,15 +1,15 @@ use std::fs::{read, read_dir, remove_dir_all, write, DirBuilder, OpenOptions}; use std::io::{BufReader, Error, Read, Write}; +use std::path::Path; use log::{debug, error, info}; +use serde_json::Value; use crate::connector::Connector; use crate::types; use crate::utils::epoch_millis; -use super::{ - compress, decompress, decrypt, encrypt, Backup, Datastore, IndexFile, INDEX_FILE_NAME, -}; +use super::{compress, decompress, decrypt, encrypt, Datastore, Dump, IndexFile, INDEX_FILE_NAME}; pub struct LocalDisk { dir: String, @@ -32,7 +32,8 @@ impl LocalDisk { match self.index_file() { Ok(index_file) => Ok(index_file), Err(_) => { - let index_file = IndexFile { backups: vec![] }; + info!("creating a new index_file"); + let index_file = IndexFile::new(); let _ = self.write_index_file(&index_file)?; Ok(index_file) } @@ -42,17 +43,14 @@ impl LocalDisk { impl Connector for LocalDisk { fn init(&mut self) -> Result<(), Error> { - debug!("initializing local_disk datastore"); + debug!("initializing datastore"); self.create_index_file().map(|_| ()) } } impl Datastore for LocalDisk { fn index_file(&self) -> Result { - info!( - "reading index_file from local_disk datastore at: {}", - &self.dir - ); + debug!("reading index_file at: {}", &self.dir); let file = OpenOptions::new() .read(true) @@ -66,8 +64,22 @@ impl Datastore for LocalDisk { Ok(index_file) } + fn raw_index_file(&self) -> Result { + info!("reading raw index_file at: {}", &self.dir); + + let file = OpenOptions::new() + .read(true) + .open(format!("{}/{}", self.dir, INDEX_FILE_NAME))?; + + let reader = BufReader::new(file); + + let raw_index_file = serde_json::from_reader(reader).map_err(|err| Error::from(err))?; + + Ok(raw_index_file) + } + fn write_index_file(&self, index_file: &IndexFile) -> Result<(), Error> { - info!("writing index_file to local_disk datastore"); + info!("writing index_file"); let index_file_path = format!("{}/{}", self.dir, INDEX_FILE_NAME); debug!("opening index_file at {}", index_file_path); @@ -81,6 +93,21 @@ impl Datastore for LocalDisk { serde_json::to_writer(file, index_file).map_err(|err| Error::from(err)) } + fn write_raw_index_file(&self, raw_index_file: &Value) -> Result<(), Error> { + info!("writing raw index_file"); + let index_file_path = format!("{}/{}", self.dir, INDEX_FILE_NAME); + + debug!("opening index_file at {}", index_file_path); + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&index_file_path)?; + + debug!("writing raw index_file at {}", index_file_path.as_str()); + serde_json::to_writer(file, raw_index_file).map_err(|err| Error::from(err)) + } + fn write(&self, file_part: u16, data: types::Bytes) -> Result<(), Error> { // compress data? let data = if self.compression_enabled() { @@ -117,7 +144,7 @@ impl Datastore for LocalDisk { // update index file let mut index_file = self.index_file()?; - let mut new_backup = Backup { + let mut new_dump = Dump { directory_name: self.dump_name.to_string(), size: 0, created_at: epoch_millis(), @@ -125,21 +152,21 @@ impl Datastore for LocalDisk { encrypted: self.encryption_key().is_some(), }; - // find or create Backup - let mut backup = index_file - .backups + // find or create Dump + let mut dump = index_file + .dumps .iter_mut() .find(|b| b.directory_name.as_str() == self.dump_name) - .unwrap_or(&mut new_backup); + .unwrap_or(&mut new_dump); - if backup.size == 0 { - // it means it's a new backup. - // We need to add it into the index_file.backups - new_backup.size = data_size; - index_file.backups.push(new_backup); + if dump.size == 0 { + // it means it's a new dump. + // We need to add it into the index_file.dumps + new_dump.size = data_size; + index_file.dumps.push(new_dump); } else { - // update total backup size - backup.size = backup.size + data_size; + // update total dump size + dump.size = dump.size + data_size; } // save index file @@ -152,16 +179,16 @@ impl Datastore for LocalDisk { data_callback: &mut dyn FnMut(types::Bytes), ) -> Result<(), Error> { let mut index_file = self.index_file()?; - let backup = index_file.find_backup(options)?; - let entries = read_dir(format!("{}/{}", self.dir, backup.directory_name))?; + let dump = index_file.find_dump(options)?; + let entries = read_dir(format!("{}/{}", self.dir, dump.directory_name))?; for entry in entries { let entry = entry?; let data = read(entry.path())?; // decrypt data? - let data = if backup.encrypted { - // It should be safe to unwrap here because the backup is marked as encrypted in the backup manifest + let data = if dump.encrypted { + // It should be safe to unwrap here because the dump is marked as encrypted in the dump manifest // so if there is no encryption key set at the datastore level we want to panic. let encryption_key = self.encryption_key.as_ref().unwrap(); decrypt(data, encryption_key.as_str())? @@ -170,7 +197,7 @@ impl Datastore for LocalDisk { }; // decompress data? - let data = if backup.compressed { + let data = if dump.compressed { decompress(data)? } else { data @@ -217,7 +244,7 @@ impl Datastore for LocalDisk { })?; // update the index_file. - index_file.backups.retain(|b| b.directory_name != name); + index_file.dumps.retain(|b| b.directory_name != name); self.write_index_file(&index_file) } @@ -225,31 +252,35 @@ impl Datastore for LocalDisk { #[cfg(test)] mod tests { - use std::path::Path; + use std::{fs::OpenOptions, path::Path}; use chrono::{Duration, Utc}; + use serde_json::json; use tempfile::tempdir; use crate::{ cli::DumpDeleteArgs, connector::Connector, - datastore::{Backup, Datastore, ReadOptions}, + datastore::{Datastore, Dump, ReadOptions, INDEX_FILE_NAME}, + migration::{ + rename_backups_to_dumps::RenameBackupsToDump, + update_version_number::UpdateVersionNumber, Migrator, + }, utils::epoch_millis, }; use super::LocalDisk; - // update_backup_date is a helper function that updates the date of a dump inside the index file. - fn update_backup_date(local_disk: &LocalDisk, dump_name: String, days_before_now: i64) { + // update_dump_date is a helper function that updates the date of a dump inside the index file. + fn update_dump_date(local_disk: &LocalDisk, dump_name: String, days_before_now: i64) { let mut index_file = local_disk.index_file().unwrap(); - let mut backup = index_file - .backups + let mut dump = index_file + .dumps .iter_mut() .find(|b| b.directory_name == dump_name) .unwrap(); - backup.created_at = - (Utc::now() - Duration::days(days_before_now)).timestamp_millis() as u128; + dump.created_at = (Utc::now() - Duration::days(days_before_now)).timestamp_millis() as u128; let _ = local_disk.write_index_file(&index_file); } @@ -276,9 +307,9 @@ mod tests { // index_file should contain 1 dump let mut index_file = local_disk.index_file().unwrap(); - assert_eq!(index_file.backups.len(), 1); + assert_eq!(index_file.dumps.len(), 1); - let dump = index_file.find_backup(&ReadOptions::Latest).unwrap(); + let dump = index_file.find_dump(&ReadOptions::Latest).unwrap(); // part 1 of dump should exists assert!(Path::new(&format!( @@ -308,10 +339,10 @@ mod tests { let mut index_file = local_disk.index_file().unwrap(); - assert!(index_file.backups.is_empty()); + assert!(index_file.dumps.is_empty()); - index_file.backups.push(Backup { - directory_name: "backup-1".to_string(), + index_file.dumps.push(Dump { + directory_name: "dump-1".to_string(), size: 0, created_at: epoch_millis(), compressed: true, @@ -320,17 +351,17 @@ mod tests { assert!(local_disk.write_index_file(&index_file).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 1); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 1); } #[test] - fn test_backup_name() { + fn test_dump_name() { let dir = tempdir().expect("cannot create tempdir"); let mut local_disk = LocalDisk::new(dir.path().to_str().unwrap().to_string()); - local_disk.set_dump_name("custom-backup-name".to_string()); + local_disk.set_dump_name("custom-dump-name".to_string()); - assert_eq!(local_disk.dump_name, "custom-backup-name".to_string()) + assert_eq!(local_disk.dump_name, "custom-dump-name".to_string()) } #[test] @@ -342,20 +373,20 @@ mod tests { let _ = local_disk.init().expect("local_disk init failed"); assert!(local_disk.index_file().is_ok()); let index_file = local_disk.index_file().unwrap(); - assert!(index_file.backups.is_empty()); + assert!(index_file.dumps.is_empty()); // create dump 1 local_disk.set_dump_name("dump-1".to_string()); let bytes: Vec = b"hello world from dump-1".to_vec(); assert!(local_disk.write(1, bytes).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 1); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 1); assert!(Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); // create dump 2 local_disk.set_dump_name("dump-2".to_string()); let bytes: Vec = b"hello world from dump-2".to_vec(); assert!(local_disk.write(1, bytes).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 2); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 2); assert!(Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); // remove dump 1 @@ -366,7 +397,7 @@ mod tests { keep_last: None }) .is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 1); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 1); assert!(!Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); // remove dump 2 @@ -377,7 +408,7 @@ mod tests { keep_last: None }) .is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 0); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 0); assert!(!Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); } @@ -390,27 +421,27 @@ mod tests { let _ = local_disk.init().expect("local_disk init failed"); assert!(local_disk.index_file().is_ok()); let index_file = local_disk.index_file().unwrap(); - assert!(index_file.backups.is_empty()); + assert!(index_file.dumps.is_empty()); // create dump 1 local_disk.set_dump_name("dump-1".to_string()); let bytes: Vec = b"hello world from dump-1".to_vec(); assert!(local_disk.write(1, bytes).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 1); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 1); assert!(Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); // create dump 2 local_disk.set_dump_name("dump-2".to_string()); let bytes: Vec = b"hello world from dump-2".to_vec(); assert!(local_disk.write(1, bytes).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 2); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 2); assert!(Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); // create dump 3 local_disk.set_dump_name("dump-3".to_string()); let bytes: Vec = b"hello world from dump-3".to_vec(); assert!(local_disk.write(1, bytes).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 3); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 3); assert!(Path::new(&format!("{}/dump-3", dir.path().to_str().unwrap())).exists()); assert!(local_disk @@ -420,7 +451,7 @@ mod tests { keep_last: Some(2), }) .is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 2); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 2); // only dump-1 must be deleted assert!(!Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); assert!(Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); @@ -433,7 +464,7 @@ mod tests { keep_last: Some(1), }) .is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 1); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 1); // only dump-3 must exists assert!(!Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); assert!(!Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); @@ -449,29 +480,29 @@ mod tests { let _ = local_disk.init().expect("local_disk init failed"); assert!(local_disk.index_file().is_ok()); let index_file = local_disk.index_file().unwrap(); - assert!(index_file.backups.is_empty()); + assert!(index_file.dumps.is_empty()); // create dump 1 local_disk.set_dump_name("dump-1".to_string()); let bytes: Vec = b"hello world from dump-1".to_vec(); assert!(local_disk.write(1, bytes).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 1); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 1); assert!(Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); - update_backup_date(&local_disk, "dump-1".to_string(), 5); + update_dump_date(&local_disk, "dump-1".to_string(), 5); // create dump 2 local_disk.set_dump_name("dump-2".to_string()); let bytes: Vec = b"hello world from dump-2".to_vec(); assert!(local_disk.write(1, bytes).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 2); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 2); assert!(Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); - update_backup_date(&local_disk, "dump-2".to_string(), 3); + update_dump_date(&local_disk, "dump-2".to_string(), 3); // create dump 3 local_disk.set_dump_name("dump-3".to_string()); let bytes: Vec = b"hello world from dump-3".to_vec(); assert!(local_disk.write(1, bytes).is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 3); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 3); assert!(Path::new(&format!("{}/dump-3", dir.path().to_str().unwrap())).exists()); // delete dump older than 6 days doesn't remove any dump @@ -482,7 +513,7 @@ mod tests { keep_last: None, }) .is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 3); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 3); assert!(Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); assert!(Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); assert!(Path::new(&format!("{}/dump-3", dir.path().to_str().unwrap())).exists()); @@ -495,7 +526,7 @@ mod tests { keep_last: None, }) .is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 2); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 2); assert!(!Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); assert!(Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); assert!(Path::new(&format!("{}/dump-3", dir.path().to_str().unwrap())).exists()); @@ -508,7 +539,7 @@ mod tests { keep_last: None, }) .is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 1); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 1); assert!(!Path::new(&format!("{}/dump-1", dir.path().to_str().unwrap())).exists()); assert!(!Path::new(&format!("{}/dump-2", dir.path().to_str().unwrap())).exists()); assert!(Path::new(&format!("{}/dump-3", dir.path().to_str().unwrap())).exists()); @@ -521,7 +552,88 @@ mod tests { keep_last: None, }) .is_ok()); - assert_eq!(local_disk.index_file().unwrap().backups.len(), 0); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 0); assert!(!Path::new(&format!("{}/dump-3", dir.path().to_str().unwrap())).exists()); } + + #[test] + fn test_migrate_update_index_file_version_and_rename_backups_to_dumps() { + // arrange + let dir = tempdir().expect("cannot create tempdir"); + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&format!( + "{}/{}", + dir.path().to_str().unwrap(), + INDEX_FILE_NAME + )) + .expect("cannot create test metadata.json"); + + // overwrite the index file + let value = json!({ + "backups": [ + { + "directory_name": "dump-1653170039392", + "size": 62279, + "created_at": 1234, + "compressed": true, + "encrypted": false + }, + { + "directory_name": "dump-1653170570014", + "size": 62283, + "created_at": 5678, + "compressed": true, + "encrypted": false + } + ] + }); + serde_json::to_writer(file, &value).expect("cannot write test metadata.json"); + + // create,init and migrate datastore + let mut local_disk: Box = + Box::new(LocalDisk::new(dir.path().to_str().unwrap().to_string())); + + let migrator = Migrator::new( + "0.7.3", + &local_disk, + vec![ + Box::new(UpdateVersionNumber::new("0.7.3")), + Box::new(RenameBackupsToDump::default()), + ], + ); + assert!(migrator.migrate().is_ok()); + + let _ = local_disk.init().expect("local_disk init failed"); + + // assert + assert!(local_disk.index_file().is_ok()); + assert_eq!( + local_disk.index_file().unwrap().v, + Some("0.7.3".to_string()) + ); + assert_eq!(local_disk.index_file().unwrap().dumps.len(), 2); + assert_eq!( + local_disk.index_file().unwrap().dumps.get(0), + Some(&Dump { + directory_name: "dump-1653170039392".to_string(), + size: 62279, + created_at: 1234, + compressed: true, + encrypted: false + }) + ); + assert_eq!( + local_disk.index_file().unwrap().dumps.get(1), + Some(&Dump { + directory_name: "dump-1653170570014".to_string(), + size: 62283, + created_at: 5678, + compressed: true, + encrypted: false + }) + ); + } } diff --git a/replibyte/src/datastore/mod.rs b/replibyte/src/datastore/mod.rs index 6b6e75fd..a846423a 100644 --- a/replibyte/src/datastore/mod.rs +++ b/replibyte/src/datastore/mod.rs @@ -1,6 +1,7 @@ use aes_gcm::aead::{Aead, NewAead}; use aes_gcm::{Aes256Gcm, Key, Nonce}; use chrono::{Duration, Utc}; +use serde_json::Value; use std::io::{Error, ErrorKind, Read, Write}; use flate2::read::ZlibDecoder; @@ -11,6 +12,7 @@ use serde::{Deserialize, Serialize}; use crate::cli::DumpDeleteArgs; use crate::connector::Connector; use crate::types::Bytes; +use crate::utils::get_replibyte_version; pub mod local_disk; pub mod s3; @@ -18,9 +20,11 @@ pub mod s3; const INDEX_FILE_NAME: &str = "metadata.json"; pub trait Datastore: Connector + Send + Sync { - /// Getting Index file with all the backups information + /// Getting Index file with all the dumps information fn index_file(&self) -> Result; + fn raw_index_file(&self) -> Result; fn write_index_file(&self, index_file: &IndexFile) -> Result<(), Error>; + fn write_raw_index_file(&self, raw_index_file: &Value) -> Result<(), Error>; fn write(&self, file_part: u16, data: Bytes) -> Result<(), Error>; fn read( &self, @@ -35,8 +39,8 @@ pub trait Datastore: Connector + Send + Sync { fn delete_by_name(&self, name: String) -> Result<(), Error>; fn delete(&self, args: &DumpDeleteArgs) -> Result<(), Error> { - if let Some(backup_name) = &args.dump { - return self.delete_by_name(backup_name.to_string()); + if let Some(dump_name) = &args.dump { + return self.delete_by_name(dump_name.to_string()); } if let Some(older_than) = &args.older_than { @@ -81,14 +85,14 @@ pub trait Datastore: Connector + Send + Sync { let threshold_date = Utc::now() - Duration::days(days); let threshold_date = threshold_date.timestamp_millis() as u128; - let backups_to_delete: Vec = index_file - .backups + let dumps_to_delete: Vec = index_file + .dumps .into_iter() .filter(|b| b.created_at.lt(&threshold_date)) .collect(); - for backup in backups_to_delete { - let dump_name = backup.directory_name; + for dump in dumps_to_delete { + let dump_name = dump.directory_name; self.delete_by_name(dump_name)? } @@ -99,12 +103,12 @@ pub trait Datastore: Connector + Send + Sync { let mut index_file = self.index_file()?; index_file - .backups + .dumps .sort_by(|a, b| b.created_at.cmp(&a.created_at)); - if let Some(backups) = index_file.backups.get(keep_last..) { - for backup in backups { - let dump_name = &backup.directory_name; + if let Some(dumps) = index_file.dumps.get(keep_last..) { + for dump in dumps { + let dump_name = &dump.directory_name; self.delete_by_name(dump_name.to_string())?; } } @@ -113,33 +117,41 @@ pub trait Datastore: Connector + Send + Sync { } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct IndexFile { - pub backups: Vec, + pub v: Option, + pub dumps: Vec, } impl IndexFile { - pub fn find_backup(&mut self, options: &ReadOptions) -> Result<&Backup, Error> { + pub fn new() -> Self { + Self { + v: Some(get_replibyte_version().to_string()), + dumps: vec![], + } + } + + pub fn find_dump(&mut self, options: &ReadOptions) -> Result<&Dump, Error> { match options { ReadOptions::Latest => { - self.backups.sort_by(|a, b| a.created_at.cmp(&b.created_at)); + self.dumps.sort_by(|a, b| a.created_at.cmp(&b.created_at)); - match self.backups.last() { - Some(backup) => Ok(backup), - None => return Err(Error::new(ErrorKind::Other, "No backups available.")), + match self.dumps.last() { + Some(dump) => Ok(dump), + None => return Err(Error::new(ErrorKind::Other, "No dumps available.")), } } ReadOptions::Dump { name } => { match self - .backups + .dumps .iter() - .find(|backup| backup.directory_name.as_str() == name.as_str()) + .find(|dump| dump.directory_name.as_str() == name.as_str()) { - Some(backup) => Ok(backup), + Some(dump) => Ok(dump), None => { return Err(Error::new( ErrorKind::Other, - format!("Can't find backup with name '{}'", name), + format!("Can't find dump with name '{}'", name), )); } } @@ -148,8 +160,8 @@ impl IndexFile { } } -#[derive(Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq)] -pub struct Backup { +#[derive(Debug, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq)] +pub struct Dump { pub directory_name: String, pub size: usize, pub created_at: u128, diff --git a/replibyte/src/datastore/s3.rs b/replibyte/src/datastore/s3.rs index 6d59856c..faddb4c6 100644 --- a/replibyte/src/datastore/s3.rs +++ b/replibyte/src/datastore/s3.rs @@ -12,12 +12,13 @@ use aws_sdk_s3::{Client, Endpoint as SdkEndpoint}; use aws_types::region::Region; use aws_types::Credentials; use log::{error, info}; +use serde_json::Value; use crate::config::{AwsCredentials, Endpoint}; use crate::connector::Connector; use crate::datastore::s3::S3Error::FailedObjectUpload; use crate::datastore::{ - compress, decompress, decrypt, encrypt, Backup, Datastore, IndexFile, ReadOptions, + compress, decompress, decrypt, encrypt, Datastore, Dump, IndexFile, ReadOptions, }; use crate::runtime::block_on; use crate::types::Bytes; @@ -148,7 +149,7 @@ impl S3 { match self.index_file() { Ok(index_file) => Ok(index_file), Err(_) => { - let index_file = IndexFile { backups: vec![] }; + let index_file = IndexFile::new(); let _ = self.write_index_file(&index_file)?; Ok(index_file) } @@ -178,6 +179,13 @@ impl Datastore for S3 { Ok(index_file) } + fn raw_index_file(&self) -> Result { + let object = get_object(&self.client, self.bucket.as_str(), INDEX_FILE_NAME)?; + let index_file = serde_json::from_slice(object.as_slice())?; + + Ok(index_file) + } + fn write_index_file(&self, index_file: &IndexFile) -> Result<(), Error> { let index_file_json = serde_json::to_vec(index_file)?; @@ -190,6 +198,18 @@ impl Datastore for S3 { .map_err(|err| Error::from(err)) } + fn write_raw_index_file(&self, raw_index_file: &Value) -> Result<(), Error> { + let index_file_json = serde_json::to_vec(raw_index_file)?; + + create_object( + &self.client, + self.bucket.as_str(), + INDEX_FILE_NAME, + index_file_json, + ) + .map_err(|err| Error::from(err)) + } + fn write(&self, file_part: u16, data: Bytes) -> Result<(), Error> { write_objects( self, @@ -207,18 +227,18 @@ impl Datastore for S3 { mut data_callback: &mut dyn FnMut(Bytes), ) -> Result<(), Error> { let mut index_file = self.index_file()?; - let backup = index_file.find_backup(options)?; + let dump = index_file.find_dump(options)?; for object in list_objects( &self.client, self.bucket.as_str(), - Some(backup.directory_name.as_str()), + Some(dump.directory_name.as_str()), )? { let data = get_object(&self.client, self.bucket.as_str(), object.key().unwrap())?; // decrypt data? - let data = if backup.encrypted { - // It should be safe to unwrap here because the backup is marked as encrypted in the backup manifest + let data = if dump.encrypted { + // It should be safe to unwrap here because the dump is marked as encrypted in the dump manifest // so if there is no encryption key set at the datastore level we want to panic. let encryption_key = self.encryption_key.as_ref().unwrap(); decrypt(data, encryption_key.as_str())? @@ -227,7 +247,7 @@ impl Datastore for S3 { }; // decompress data? - let data = if backup.compressed { + let data = if dump.compressed { decompress(data)? } else { data @@ -266,7 +286,7 @@ impl Datastore for S3 { let _ = delete_directory(&self.client, bucket, &name).map_err(|err| Error::from(err))?; - index_file.backups.retain(|b| b.directory_name != name); + index_file.dumps.retain(|b| b.directory_name != name); self.write_index_file(&index_file) } @@ -303,7 +323,7 @@ fn write_objects( // update index file let mut index_file = datastore.index_file()?; - let mut new_backup = Backup { + let mut new_dump = Dump { directory_name: root_key.to_string(), size: 0, created_at: epoch_millis(), @@ -311,21 +331,21 @@ fn write_objects( encrypted: datastore.encryption_key().is_some(), }; - // find or create Backup - let mut backup = index_file - .backups + // find or create dump + let mut dump = index_file + .dumps .iter_mut() .find(|b| b.directory_name.as_str() == root_key) - .unwrap_or(&mut new_backup); + .unwrap_or(&mut new_dump); - if backup.size == 0 { - // it means it's a new backup. - // We need to add it into the index_file.backups - new_backup.size = data_size; - index_file.backups.push(new_backup); + if dump.size == 0 { + // it means it's a new dump. + // We need to add it into the index_file.dumps + new_dump.size = data_size; + index_file.dumps.push(new_dump); } else { - // update total backup size - backup.size = backup.size + data_size; + // update total dump size + dump.size = dump.size + data_size; } // save index file @@ -429,7 +449,7 @@ fn create_bucket<'a, S: AsRef>( match result { Ok(_) => {} Err(err) => { - error!("{}", err); + error!("{}", err.to_string()); return Err(S3Error::FailedToCreateBucket { bucket }); } } @@ -476,7 +496,8 @@ fn create_object<'a>( .send(), ); - if let Err(_) = result { + if let Err(err) = result { + error!("{}", err.to_string()); return Err(S3Error::FailedObjectUpload { bucket, key }); } @@ -504,7 +525,7 @@ fn list_objects<'a>( let objects = match objects { Ok(objects) => objects, Err(err) => { - error!("{}", err); + error!("{}", err.to_string()); return Err(S3Error::FailedToListObjects { bucket }); } }; @@ -571,7 +592,7 @@ fn delete_directory<'a>( ) { Ok(_) => Ok(()), Err(err) => { - eprintln!("{}", err); + error!("{}", err.to_string()); Err(S3Error::FailedToDeleteDirectory { bucket, directory }) } } @@ -584,16 +605,21 @@ fn delete_directory<'a>( mod tests { use chrono::{Duration, Utc}; use fake::{Fake, Faker}; + use serde_json::json; use crate::cli::DumpDeleteArgs; use crate::config::{AwsCredentials, Endpoint}; use crate::connector::Connector; - use crate::datastore::s3::{create_object, delete_bucket, delete_object, get_object, S3Error}; - use crate::datastore::{Backup, Datastore}; + use crate::datastore::s3::{ + create_bucket, create_object, delete_bucket, delete_object, get_object, S3Error, + }; + use crate::datastore::{Datastore, Dump, INDEX_FILE_NAME}; + use crate::migration::rename_backups_to_dumps::RenameBackupsToDump; + use crate::migration::update_version_number::UpdateVersionNumber; + use crate::migration::Migrator; use crate::utils::epoch_millis; use crate::S3; - const BUCKET_NAME: &str = "replibyte-test"; const REGION: &str = "us-east-2"; const MINIO_ENDPOINT: &str = "http://localhost:9000"; const MINIO_CREDENTIALS: &str = "minioadmin"; @@ -618,7 +644,7 @@ mod tests { S3::aws( bucket.to_string(), - None, + Some(REGION.to_string()), None, Some(AwsCredentials { access_key_id, @@ -818,10 +844,10 @@ mod tests { let mut index_file = s3.index_file().unwrap(); - assert!(index_file.backups.is_empty()); + assert!(index_file.dumps.is_empty()); - index_file.backups.push(Backup { - directory_name: "backup-1".to_string(), + index_file.dumps.push(Dump { + directory_name: "dump-1".to_string(), size: 0, created_at: epoch_millis(), compressed: true, @@ -830,23 +856,23 @@ mod tests { assert!(s3.write_index_file(&index_file).is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 1); + assert_eq!(s3.index_file().unwrap().dumps.len(), 1); assert!(delete_bucket(&s3.client, bucket.as_str(), true).is_ok()); } #[test] - fn test_backup_name() { + fn test_dump_name() { let bucket = aws_bucket(); let mut s3 = aws_s3(bucket.as_str()); - s3.set_dump_name("custom-backup-name".to_string()); + s3.set_dump_name("custom-dump-name".to_string()); - assert_eq!(s3.root_key, "custom-backup-name".to_string()) + assert_eq!(s3.root_key, "custom-dump-name".to_string()) } #[test] - fn test_s3_backup_delete_by_name() { + fn test_s3_dump_delete_by_name() { let bucket = aws_bucket(); let mut s3 = aws_s3(bucket.as_str()); @@ -856,19 +882,19 @@ mod tests { let mut index_file = s3.index_file().unwrap(); - assert!(index_file.backups.is_empty()); + assert!(index_file.dumps.is_empty()); - // Add 2 backups in the manifest - index_file.backups.push(Backup { - directory_name: "backup-1".to_string(), + // Add 2 dumps in the manifest + index_file.dumps.push(Dump { + directory_name: "dump-1".to_string(), size: 0, created_at: epoch_millis(), compressed: true, encrypted: false, }); - index_file.backups.push(Backup { - directory_name: "backup-2".to_string(), + index_file.dumps.push(Dump { + directory_name: "dump-2".to_string(), size: 0, created_at: epoch_millis(), compressed: true, @@ -876,12 +902,12 @@ mod tests { }); assert!(s3.write_index_file(&index_file).is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 2); + assert_eq!(s3.index_file().unwrap().dumps.len(), 2); assert!(create_object( &s3.client, bucket.as_str(), - "backup-1/testing-key.dump", + "dump-1/testing-key.dump", b"hello w0rld".to_vec(), ) .is_ok()); @@ -889,36 +915,36 @@ mod tests { assert!(create_object( &s3.client, bucket.as_str(), - "backup-2/testing-key.dump", + "dump-2/testing-key.dump", b"hello w0rld".to_vec(), ) .is_ok()); assert!(s3 .delete(&DumpDeleteArgs { - dump: Some("backup-1".to_string()), + dump: Some("dump-1".to_string()), older_than: None, keep_last: None, }) .is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 1); - assert!(get_object(&s3.client, bucket.as_str(), "backup-1/testing-key.dump").is_err()); - assert!(get_object(&s3.client, bucket.as_str(), "backup-2/testing-key.dump").is_ok()); + assert_eq!(s3.index_file().unwrap().dumps.len(), 1); + assert!(get_object(&s3.client, bucket.as_str(), "dump-1/testing-key.dump").is_err()); + assert!(get_object(&s3.client, bucket.as_str(), "dump-2/testing-key.dump").is_ok()); assert!(s3 .delete(&DumpDeleteArgs { - dump: Some("backup-2".to_string()), + dump: Some("dump-2".to_string()), older_than: None, keep_last: None, }) .is_ok()); - assert!(s3.index_file().unwrap().backups.is_empty()); - assert!(get_object(&s3.client, bucket.as_str(), "backup-2/testing-key.dump").is_err()); + assert!(s3.index_file().unwrap().dumps.is_empty()); + assert!(get_object(&s3.client, bucket.as_str(), "dump-2/testing-key.dump").is_err()); } #[test] - fn test_s3_backup_delete_older_than() { + fn test_s3_dump_delete_older_than() { let bucket = aws_bucket(); let mut s3 = aws_s3(bucket.as_str()); @@ -928,20 +954,20 @@ mod tests { let mut index_file = s3.index_file().unwrap(); - assert!(index_file.backups.is_empty()); + assert!(index_file.dumps.is_empty()); - // Add a backup from 5 days ago - index_file.backups.push(Backup { - directory_name: "backup-1".to_string(), + // Add a dump from 5 days ago + index_file.dumps.push(Dump { + directory_name: "dump-1".to_string(), size: 0, created_at: (Utc::now() - Duration::days(5)).timestamp_millis() as u128, compressed: true, encrypted: false, }); - // Add a backup from now - index_file.backups.push(Backup { - directory_name: "backup-2".to_string(), + // Add a dump from now + index_file.dumps.push(Dump { + directory_name: "dump-2".to_string(), size: 0, created_at: epoch_millis(), compressed: true, @@ -949,12 +975,12 @@ mod tests { }); assert!(s3.write_index_file(&index_file).is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 2); + assert_eq!(s3.index_file().unwrap().dumps.len(), 2); assert!(create_object( &s3.client, bucket.as_str(), - "backup-1/testing-key.dump", + "dump-1/testing-key.dump", b"hello w0rld".to_vec(), ) .is_ok()); @@ -962,7 +988,7 @@ mod tests { assert!(create_object( &s3.client, bucket.as_str(), - "backup-2/testing-key.dump", + "dump-2/testing-key.dump", b"hello w0rld".to_vec(), ) .is_ok()); @@ -975,9 +1001,9 @@ mod tests { }) .is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 2); - assert!(get_object(&s3.client, bucket.as_str(), "backup-1/testing-key.dump").is_ok()); - assert!(get_object(&s3.client, bucket.as_str(), "backup-2/testing-key.dump").is_ok()); + assert_eq!(s3.index_file().unwrap().dumps.len(), 2); + assert!(get_object(&s3.client, bucket.as_str(), "dump-1/testing-key.dump").is_ok()); + assert!(get_object(&s3.client, bucket.as_str(), "dump-2/testing-key.dump").is_ok()); assert!(s3 .delete(&DumpDeleteArgs { @@ -987,13 +1013,13 @@ mod tests { }) .is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 1); - assert!(get_object(&s3.client, bucket.as_str(), "backup-1/testing-key.dump").is_err()); - assert!(get_object(&s3.client, bucket.as_str(), "backup-2/testing-key.dump").is_ok()); + assert_eq!(s3.index_file().unwrap().dumps.len(), 1); + assert!(get_object(&s3.client, bucket.as_str(), "dump-1/testing-key.dump").is_err()); + assert!(get_object(&s3.client, bucket.as_str(), "dump-2/testing-key.dump").is_ok()); } #[test] - fn test_s3_backup_keep_last() { + fn test_s3_dump_keep_last() { let bucket = aws_bucket(); let mut s3 = aws_s3(bucket.as_str()); @@ -1003,26 +1029,26 @@ mod tests { let mut index_file = s3.index_file().unwrap(); - assert!(index_file.backups.is_empty()); + assert!(index_file.dumps.is_empty()); - index_file.backups.push(Backup { - directory_name: "backup-1".to_string(), + index_file.dumps.push(Dump { + directory_name: "dump-1".to_string(), size: 0, created_at: (Utc::now() - Duration::days(3)).timestamp_millis() as u128, compressed: true, encrypted: false, }); - index_file.backups.push(Backup { - directory_name: "backup-2".to_string(), + index_file.dumps.push(Dump { + directory_name: "dump-2".to_string(), size: 0, created_at: (Utc::now() - Duration::days(5)).timestamp_millis() as u128, compressed: true, encrypted: false, }); - index_file.backups.push(Backup { - directory_name: "backup-3".to_string(), + index_file.dumps.push(Dump { + directory_name: "dump-3".to_string(), size: 0, created_at: epoch_millis(), compressed: true, @@ -1030,12 +1056,12 @@ mod tests { }); assert!(s3.write_index_file(&index_file).is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 3); + assert_eq!(s3.index_file().unwrap().dumps.len(), 3); assert!(create_object( &s3.client, bucket.as_str(), - "backup-1/testing-key.dump", + "dump-1/testing-key.dump", b"hello w0rld".to_vec(), ) .is_ok()); @@ -1043,7 +1069,7 @@ mod tests { assert!(create_object( &s3.client, bucket.as_str(), - "backup-2/testing-key.dump", + "dump-2/testing-key.dump", b"hello w0rld".to_vec(), ) .is_ok()); @@ -1051,7 +1077,7 @@ mod tests { assert!(create_object( &s3.client, bucket.as_str(), - "backup-3/testing-key.dump", + "dump-3/testing-key.dump", b"hello w0rld".to_vec(), ) .is_ok()); @@ -1064,10 +1090,10 @@ mod tests { }) .is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 2); - assert!(get_object(&s3.client, bucket.as_str(), "backup-1/testing-key.dump").is_ok()); - assert!(get_object(&s3.client, bucket.as_str(), "backup-2/testing-key.dump").is_err()); - assert!(get_object(&s3.client, bucket.as_str(), "backup-3/testing-key.dump").is_ok()); + assert_eq!(s3.index_file().unwrap().dumps.len(), 2); + assert!(get_object(&s3.client, bucket.as_str(), "dump-1/testing-key.dump").is_ok()); + assert!(get_object(&s3.client, bucket.as_str(), "dump-2/testing-key.dump").is_err()); + assert!(get_object(&s3.client, bucket.as_str(), "dump-3/testing-key.dump").is_ok()); assert!(s3 .delete(&DumpDeleteArgs { @@ -1077,9 +1103,85 @@ mod tests { }) .is_ok()); - assert_eq!(s3.index_file().unwrap().backups.len(), 1); - assert!(get_object(&s3.client, bucket.as_str(), "backup-1/testing-key.dump").is_err()); - assert!(get_object(&s3.client, bucket.as_str(), "backup-2/testing-key.dump").is_err()); - assert!(get_object(&s3.client, bucket.as_str(), "backup-3/testing-key.dump").is_ok()); + assert_eq!(s3.index_file().unwrap().dumps.len(), 1); + assert!(get_object(&s3.client, bucket.as_str(), "dump-1/testing-key.dump").is_err()); + assert!(get_object(&s3.client, bucket.as_str(), "dump-2/testing-key.dump").is_err()); + assert!(get_object(&s3.client, bucket.as_str(), "dump-3/testing-key.dump").is_ok()); + } + + #[test] + fn test_migrate_add_index_file_version_and_rename_backups_to_dumps() { + let bucket = aws_bucket(); + let s3 = aws_s3(bucket.as_str()); + + // create a fake index file + let value = json!({ + "backups": [ + { + "directory_name": "dump-1653170039392", + "size": 62279, + "created_at": 1234, + "compressed": true, + "encrypted": false + }, + { + "directory_name": "dump-1653170570014", + "size": 62283, + "created_at": 5678, + "compressed": true, + "encrypted": false + } + ] + }); + + // create a test bucket + assert!(create_bucket(&s3.client, bucket.as_str(), Some(REGION.to_string())).is_ok()); + + // create a test metadata.json file + assert!(create_object( + &s3.client, + bucket.as_str(), + INDEX_FILE_NAME, + value.to_string().into_bytes() + ) + .is_ok()); + + let mut s3: Box = Box::new(s3); + let migrator = Migrator::new( + "0.7.3", + &s3, + vec![ + Box::new(UpdateVersionNumber::new("0.7.3")), + Box::new(RenameBackupsToDump::default()), + ], + ); + assert!(migrator.migrate().is_ok()); + + let _ = s3.init().expect("s3 init failed"); + + // assert + assert!(s3.index_file().is_ok()); + assert_eq!(s3.index_file().unwrap().v, Some("0.7.3".to_string())); + assert_eq!(s3.index_file().unwrap().dumps.len(), 2); + assert_eq!( + s3.index_file().unwrap().dumps.get(0), + Some(&Dump { + directory_name: "dump-1653170039392".to_string(), + size: 62279, + created_at: 1234, + compressed: true, + encrypted: false + }) + ); + assert_eq!( + s3.index_file().unwrap().dumps.get(1), + Some(&Dump { + directory_name: "dump-1653170570014".to_string(), + size: 62283, + created_at: 5678, + compressed: true, + encrypted: false + }) + ); } } diff --git a/replibyte/src/main.rs b/replibyte/src/main.rs index 22bc3763..47d0c973 100644 --- a/replibyte/src/main.rs +++ b/replibyte/src/main.rs @@ -10,6 +10,8 @@ use std::{env, thread}; use clap::Parser; use indicatif::{ProgressBar, ProgressStyle}; +use migration::{migrations, Migrator}; +use utils::get_replibyte_version; use crate::cli::{DumpCommand, RestoreCommand, SubCommand, TransformerCommand, CLI}; use crate::config::{Config, DatabaseSubsetConfig, DatastoreConfig}; @@ -27,6 +29,7 @@ mod config; mod connector; mod datastore; mod destination; +mod migration; mod runtime; mod source; mod tasks; @@ -129,6 +132,11 @@ fn run(config: Config, sub_commands: &SubCommand) -> anyhow::Result<()> { DatastoreConfig::LocalDisk(config) => Box::new(LocalDisk::new(config.dir()?)), }; + let migrator = Migrator::new(get_replibyte_version(), &datastore, migrations()); + let _ = migrator.migrate()?; + + let _ = datastore.init()?; + let (tx_pb, rx_pb) = mpsc::sync_channel::<(TransferredBytes, MaxBytes)>(1000); match sub_commands { diff --git a/replibyte/src/migration/mod.rs b/replibyte/src/migration/mod.rs new file mode 100644 index 00000000..c6f840ce --- /dev/null +++ b/replibyte/src/migration/mod.rs @@ -0,0 +1,246 @@ +use std::io::{Error, ErrorKind}; +use std::str::FromStr; + +use crate::datastore::Datastore; +use crate::migration::rename_backups_to_dumps::RenameBackupsToDump; +use crate::migration::update_version_number::UpdateVersionNumber; +use crate::utils::get_replibyte_version; + +pub mod rename_backups_to_dumps; +pub mod update_version_number; + +#[derive(Debug, PartialEq, PartialOrd)] +pub struct Version { + major: u8, + minor: u8, + patch: u8, +} + +impl FromStr for Version { + type Err = Error; + + fn from_str(v: &str) -> Result { + let numbers = v.split_terminator('.').collect::>(); + + match numbers.len() { + 3 => { + // unwrap is safe here as we know we have 3 items in vec. + let major = parse_str_to_u8(numbers.get(0).unwrap())?; + let minor = parse_str_to_u8(numbers.get(1).unwrap())?; + let patch = parse_str_to_u8(numbers.get(2).unwrap())?; + + Ok(Self { + major, + minor, + patch, + }) + } + _ => Err(Error::new( + ErrorKind::Other, + format!("migration: version number '{}' is invalid, must have 'major.minor.patch' format", v), + )), + } + } +} + +pub trait Migration { + /// minimal version for which the migration needs to be triggered. + fn minimal_version(&self) -> Version; + /// run the migration. + fn run(&self, datastore: &Box) -> Result<(), Error>; +} + +// All registered migrations +pub fn migrations() -> Vec> { + vec![ + Box::new(UpdateVersionNumber::new(get_replibyte_version())), + Box::new(RenameBackupsToDump::default()), + ] +} + +pub struct Migrator<'a> { + current_replibyte_version: &'a str, + datastore: &'a Box, + migrations: Vec>, +} + +impl<'a> Migrator<'a> { + pub fn new( + version: &'a str, + datastore: &'a Box, + migrations: Vec>, + ) -> Self { + Self { + current_replibyte_version: version, + datastore, + migrations, + } + } + + /// run all registered migrations when the minimal version is matched. + pub fn migrate(&self) -> Result<(), Error> { + for migration in &self.migrations { + if self.should_run_migration(migration) { + let _ = migration.run(&self.datastore)?; + } + } + + Ok(()) + } + + fn should_run_migration(&self, migration: &Box) -> bool { + let current_version = Version::from_str(self.current_replibyte_version).unwrap(); + + current_version >= migration.minimal_version() + } +} + +fn parse_str_to_u8(s: &str) -> Result { + s.parse::() + .map_err(|err| Error::new(ErrorKind::Other, err.to_string())) +} + +#[cfg(test)] +mod tests { + use std::{ + io::{Error, ErrorKind}, + str::FromStr, + }; + + use serde_json::json; + + use crate::connector::Connector; + use crate::datastore::{Datastore, IndexFile, ReadOptions}; + + use super::{Migration, Migrator, Version}; + + struct FakeMigration {} + impl Migration for FakeMigration { + fn minimal_version(&self) -> Version { + Version::from_str("0.7.2").unwrap() + } + + fn run(&self, _datastore: &Box) -> Result<(), std::io::Error> { + // trigger an error so we can assert against it + Err(Error::new(ErrorKind::Other, "should not run")) + } + } + + // an in memory datastore to test the migrator struct logic. + struct InMemoryDatastore { + index_file: IndexFile, + } + + impl Connector for InMemoryDatastore { + fn init(&mut self) -> Result<(), Error> { + Ok(()) + } + } + + impl Datastore for InMemoryDatastore { + fn index_file(&self) -> Result { + Ok(IndexFile { + v: None, + dumps: vec![], + }) + } + + fn raw_index_file(&self) -> Result { + Ok(json!(self.index_file)) + } + + fn write_index_file(&self, _index_file: &IndexFile) -> Result<(), Error> { + unimplemented!() + } + + fn write_raw_index_file(&self, _raw_index_file: &serde_json::Value) -> Result<(), Error> { + unimplemented!() + } + + fn write(&self, _file_part: u16, _data: crate::types::Bytes) -> Result<(), Error> { + unimplemented!() + } + + fn read( + &self, + _options: &ReadOptions, + _data_callback: &mut dyn FnMut(crate::types::Bytes), + ) -> Result<(), Error> { + unimplemented!() + } + + fn compression_enabled(&self) -> bool { + true + } + + fn set_compression(&mut self, _enable: bool) { + unimplemented!() + } + + fn encryption_key(&self) -> &Option { + unimplemented!() + } + + fn set_encryption_key(&mut self, _key: String) { + unimplemented!() + } + + fn set_dump_name(&mut self, _name: String) { + unimplemented!() + } + + fn delete_by_name(&self, _name: String) -> Result<(), Error> { + unimplemented!() + } + } + + #[test] + fn str_to_version() { + let version = Version::from_str("0.7.2").unwrap(); + assert_eq!(version.major, 0); + assert_eq!(version.minor, 7); + assert_eq!(version.patch, 2); + + assert!(Version::from_str("0.7").is_err()); + } + + #[test] + fn compare_version() { + let old_version = Version::from_str("0.7.2").unwrap(); + let new_version = Version::from_str("0.7.3").unwrap(); + assert!(old_version < new_version); + + let old_version = Version::from_str("1.7.0").unwrap(); + let new_version = Version::from_str("1.7.1").unwrap(); + assert!(old_version < new_version); + + let old_version = Version::from_str("0.7.0").unwrap(); + let new_version = Version::from_str("1.0.0").unwrap(); + assert!(old_version < new_version); + } + + #[test] + fn test_migrator() { + let store: Box = Box::new(InMemoryDatastore { + index_file: IndexFile { + v: None, + dumps: vec![], + }, + }); + + let m = Migrator::new("0.7.3", &store, vec![Box::new(FakeMigration {})]); + // migrate returns an error as FakeMigration is run + assert!(m.migrate().is_err()); + + let store: Box = Box::new(InMemoryDatastore { + index_file: IndexFile { + v: None, + dumps: vec![], + }, + }); + + let m = Migrator::new("0.7.0", &store, vec![Box::new(FakeMigration {})]); + // migrate returns Ok as FakeMigration doesn't run + assert!(m.migrate().is_ok()); + } +} diff --git a/replibyte/src/migration/rename_backups_to_dumps.rs b/replibyte/src/migration/rename_backups_to_dumps.rs new file mode 100644 index 00000000..9c54e796 --- /dev/null +++ b/replibyte/src/migration/rename_backups_to_dumps.rs @@ -0,0 +1,95 @@ +use std::{ + io::{Error, ErrorKind}, + str::FromStr, +}; + +use log::info; +use serde_json::{json, Value}; + +use crate::datastore::Datastore; + +use super::{Migration, Version}; + +pub struct RenameBackupsToDump {} + +impl RenameBackupsToDump { + pub fn default() -> Self { + Self {} + } +} + +impl Migration for RenameBackupsToDump { + fn minimal_version(&self) -> Version { + Version::from_str("0.7.3").unwrap() + } + + fn run(&self, datastore: &Box) -> Result<(), Error> { + info!("migrate: rename backups to dumps"); + + let mut raw_index_file = datastore.raw_index_file()?; + let _ = rename_backups_to_dumps(&mut raw_index_file)?; + datastore.write_raw_index_file(&raw_index_file) + } +} + +fn rename_backups_to_dumps(metadata_json: &mut Value) -> Result<(), Error> { + match metadata_json.as_object_mut() { + Some(metadata) => { + // we rename the `backups` key to `dumps` + if metadata.contains_key("backups") { + let backups = metadata.get("backups").unwrap_or(&json!([])).clone(); + metadata.insert("dumps".to_string(), backups); + metadata.remove("backups"); + } + Ok(()) + } + None => Err(Error::new( + ErrorKind::Other, + "migrate: metadata.json is not an object", + )), + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use crate::migration::rename_backups_to_dumps::rename_backups_to_dumps; + + #[test] + fn test_rename_backup_to_dumps() { + let mut metadata_json = json!({"backups": []}); + assert!(rename_backups_to_dumps(&mut metadata_json).is_ok()); + assert!(metadata_json.get("backups").is_none()); + assert!(metadata_json.get("dumps").is_some()); + assert!(metadata_json.get("dumps").unwrap().is_array()); + + let mut metadata_json = json!({ + "backups": [ + { + "directory_name":"dump-1653170039392", + "size":62279, + "created_at":1234, + "compressed":true, + "encrypted":false + } + ] + }); + assert!(rename_backups_to_dumps(&mut metadata_json).is_ok()); + assert!(metadata_json.get("backups").is_none()); + assert!(metadata_json.get("dumps").is_some()); + assert!(metadata_json.get("dumps").unwrap().is_array()); + assert!(metadata_json + .get("dumps") + .unwrap() + .as_array() + .unwrap() + .contains(&json!({ + "directory_name":"dump-1653170039392", + "size":62279, + "created_at":1234, + "compressed":true, + "encrypted":false + }))); + } +} diff --git a/replibyte/src/migration/update_version_number.rs b/replibyte/src/migration/update_version_number.rs new file mode 100644 index 00000000..fa4e5f48 --- /dev/null +++ b/replibyte/src/migration/update_version_number.rs @@ -0,0 +1,84 @@ +use std::{ + io::{Error, ErrorKind}, + str::FromStr, +}; + +use log::info; +use serde_json::{json, Value}; + +use crate::datastore::Datastore; + +use super::{Migration, Version}; + +pub struct UpdateVersionNumber<'a> { + version: &'a str, +} + +impl<'a> UpdateVersionNumber<'a> { + pub fn new(version: &'a str) -> Self { + Self { version } + } +} + +impl<'a> Migration for UpdateVersionNumber<'a> { + fn minimal_version(&self) -> Version { + Version::from_str("0.7.3").unwrap() + } + + fn run(&self, datastore: &Box) -> Result<(), Error> { + info!("migrate: update version number"); + + let mut raw_index_file = datastore.raw_index_file()?; + let _ = update_version(&mut raw_index_file, self.version)?; + datastore.write_raw_index_file(&raw_index_file) + } +} + +fn update_version(metadata_json: &mut Value, version: &str) -> Result<(), Error> { + match metadata_json.as_object_mut() { + Some(metadata) => { + metadata.insert("v".to_string(), json!(version)); + Ok(()) + } + None => Err(Error::new( + ErrorKind::Other, + "migrate: metadata.json is not an object", + )), + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use crate::migration::update_version_number::update_version; + + #[test] + fn test_update_version() { + let mut metadata_json = json!({"backups": []}); + + assert!(update_version(&mut metadata_json, "0.1.0").is_ok()); + assert!(metadata_json.get("v").is_some()); + assert_eq!(metadata_json.get("v").unwrap(), "0.1.0"); + + let mut metadata_json = json!({ + "backups": [ + { + "directory_name":"dump-1653170039392", + "size":62279, + "created_at":1234, + "compressed":true, + "encrypted":false + } + ] + }); + assert!(update_version(&mut metadata_json, "0.2.0").is_ok()); + assert!(metadata_json.get("v").is_some()); + assert_eq!(metadata_json.get("v").unwrap(), "0.2.0"); + + let mut metadata_json = json!({"v": "0.7.3", "backups": []}); + assert!(update_version(&mut metadata_json, "0.7.4").is_ok()); + assert!(metadata_json.get("v").is_some()); + assert_eq!(metadata_json.get("v").unwrap(), "0.7.4"); + } +} diff --git a/replibyte/src/tasks/full_dump.rs b/replibyte/src/tasks/full_dump.rs index 9f2d8758..907d271a 100644 --- a/replibyte/src/tasks/full_dump.rs +++ b/replibyte/src/tasks/full_dump.rs @@ -44,9 +44,6 @@ where // initialize the source let _ = self.source.init()?; - // initialize the datastore - let _ = self.datastore.init()?; - let (tx, rx) = mpsc::sync_channel::>(1); let datastore = self.datastore; diff --git a/replibyte/src/tasks/full_restore.rs b/replibyte/src/tasks/full_restore.rs index f1f097ef..e2a7dadf 100644 --- a/replibyte/src/tasks/full_restore.rs +++ b/replibyte/src/tasks/full_restore.rs @@ -45,18 +45,15 @@ where // initialize the destination let _ = self.destination.init()?; - // initialize the datastore - let _ = self.datastore.init()?; - // bound to 1 to avoid eating too much memory if we download the dump faster than we ingest it let (tx, rx) = mpsc::sync_channel::>(1); let datastore = self.datastore; let mut index_file = datastore.index_file()?; - let backup = index_file.find_backup(&self.read_options)?; + let dump = index_file.find_dump(&self.read_options)?; // init progress - progress_callback(0, backup.size); + progress_callback(0, dump.size); let read_options = self.read_options.clone(); @@ -82,7 +79,7 @@ where Err(err) => panic!("{:?}", err), // FIXME what should I do here? }; - progress_callback(data.len(), backup.size); + progress_callback(data.len(), dump.size); let _ = self.destination.write(data)?; } @@ -90,7 +87,7 @@ where // wait for end of download execution let _ = join_handle.join(); // FIXME catch result here - progress_callback(backup.size, backup.size); + progress_callback(dump.size, dump.size); Ok(()) } diff --git a/replibyte/src/utils.rs b/replibyte/src/utils.rs index 74077ce0..0ac0ec0e 100644 --- a/replibyte/src/utils.rs +++ b/replibyte/src/utils.rs @@ -76,3 +76,7 @@ pub fn wait_for_command(process: &mut Child) -> Result<(), Error> { )), } } + +pub fn get_replibyte_version() -> &'static str { + env!("CARGO_PKG_VERSION") +} diff --git a/subset/Cargo.toml b/subset/Cargo.toml index 0892f86b..84ecb53f 100644 --- a/subset/Cargo.toml +++ b/subset/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "subset" -version = "0.1.0" +version = "0.7.3" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/website/docs/faq.md b/website/docs/faq.md index c1a81144..9ab4feb3 100644 --- a/website/docs/faq.md +++ b/website/docs/faq.md @@ -41,9 +41,9 @@ and replibyte -c conf.yaml backup run -s postgres -f dump.sql ``` -### How RepliByte can list the backups? Is there an API? +### How RepliByte can list the dumps? Is there an API? -There is no API, RepliByte is fully stateless and store the backup list into the datastore (E.g. S3) via an metadata file. +There is no API, RepliByte is fully stateless and store the dump list into the datastore (E.g. S3) via an metadata file. ### How can I contact you? diff --git a/website/docs/guides/deploy-replibyte/container.md b/website/docs/guides/deploy-replibyte/container.md index d32aafe2..925f0dc4 100644 --- a/website/docs/guides/deploy-replibyte/container.md +++ b/website/docs/guides/deploy-replibyte/container.md @@ -147,4 +147,4 @@ docker run -e S3_ACCESS_KEY_ID=XXX \ --- -Do you have any questions? Feel free to join the channel #replibyte on [our Discord server](https://discord.qovery.com). +Do you have any questions? Feel free to join the channel #replibyte on [our Discord server](https://discord.qovery.com). \ No newline at end of file