Skip to content

Commit a5ad3dd

Browse files
Add backup + restore commands (need to be implemented)
Signed-off-by: Valerian Saliou <valerian@valeriansaliou.name>
1 parent 4f3f802 commit a5ad3dd

File tree

4 files changed

+95
-15
lines changed

4 files changed

+95
-15
lines changed

PROTOCOL.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -169,15 +169,16 @@ _The Sonic Channel Control mode is used for administration purposes. Once in thi
169169

170170
**➡️ Available commands:**
171171

172-
* `TRIGGER`: trigger an action (syntax: `TRIGGER [<action>]?`; time complexity: `O(1)`)
172+
* `TRIGGER`: trigger an action (syntax: `TRIGGER [<action>]? [<data>]?`; time complexity: `O(1)`)
173173
* `INFO`: get server information (syntax: `INFO`; time complexity: `O(1)`)
174174
* `PING`: ping server (syntax: `PING`; time complexity: `O(1)`)
175175
* `HELP`: show help (syntax: `HELP [<manual>]?`; time complexity: `O(1)`)
176176
* `QUIT`: stop connection (syntax: `QUIT`; time complexity: `O(1)`)
177177

178178
**⏩ Syntax terminology:**
179179

180-
* `<action>`: action to be triggered (available actions: `consolidate`);
180+
* `<action>`: action to be triggered (available actions: `consolidate`, `backup`, `restore`);
181+
* `<data>`: additional data to provide to the action (required for: `backup`, `restore`);
181182
* `<manual>`: help manual to be shown (available manuals: `commands`);
182183

183184
**⬇️ Control flow example (via `telnet`):**

src/channel/command.rs

+48-11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use hashbrown::HashMap;
88
use rand::distributions::Alphanumeric;
99
use rand::{thread_rng, Rng};
10+
use std::path::Path;
1011
use std::str::{self, SplitWhitespace};
1112
use std::vec::Vec;
1213

@@ -15,6 +16,7 @@ use super::statistics::ChannelStatistics;
1516
use crate::query::builder::{QueryBuilder, QueryBuilderResult};
1617
use crate::query::types::{QueryGenericLang, QuerySearchLimit, QuerySearchOffset};
1718
use crate::store::fst::StoreFSTPool;
19+
use crate::store::kv::StoreKVPool;
1820
use crate::store::operation::StoreOperationDispatch;
1921
use crate::APP_CONF;
2022

@@ -64,7 +66,8 @@ lazy_static! {
6466
vec!["PUSH", "POP", "COUNT", "FLUSHC", "FLUSHB", "FLUSHO", "PING", "HELP", "QUIT"];
6567
pub static ref COMMANDS_MODE_CONTROL: Vec<&'static str> =
6668
vec!["TRIGGER", "INFO", "PING", "HELP", "QUIT"];
67-
pub static ref CONTROL_TRIGGER_ACTIONS: Vec<&'static str> = vec!["consolidate"];
69+
pub static ref CONTROL_TRIGGER_ACTIONS: Vec<&'static str> =
70+
vec!["consolidate", "backup", "restore"];
6871
static ref MANUAL_MODE_SEARCH: HashMap<&'static str, &'static Vec<&'static str>> =
6972
[("commands", &*COMMANDS_MODE_SEARCH)]
7073
.iter()
@@ -776,26 +779,60 @@ impl ChannelCommandIngest {
776779

777780
impl ChannelCommandControl {
778781
pub fn dispatch_trigger(mut parts: SplitWhitespace) -> ChannelResult {
779-
match (parts.next(), parts.next()) {
780-
(None, _) => Ok(vec![ChannelCommandResponse::Result(format!(
782+
match (parts.next(), parts.next(), parts.next()) {
783+
(None, _, _) => Ok(vec![ChannelCommandResponse::Result(format!(
781784
"actions({})",
782785
CONTROL_TRIGGER_ACTIONS.join(", ")
783786
))]),
784-
(Some(action_key), next_part) => {
785-
if next_part.is_none() {
786-
let action_key_lower = action_key.to_lowercase();
787+
(Some(action_key), data_part, last_part) => {
788+
let action_key_lower = action_key.to_lowercase();
787789

788-
match action_key_lower.as_str() {
789-
"consolidate" => {
790+
match action_key_lower.as_str() {
791+
"consolidate" => {
792+
if data_part.is_none() {
790793
// Force a FST consolidate
791794
StoreFSTPool::consolidate(true);
792795

793796
Ok(vec![ChannelCommandResponse::Ok])
797+
} else {
798+
Err(ChannelCommandError::InvalidFormat("TRIGGER consolidate"))
794799
}
795-
_ => Err(ChannelCommandError::NotFound),
796800
}
797-
} else {
798-
Err(ChannelCommandError::InvalidFormat("TRIGGER [<action>]?"))
801+
"backup" => {
802+
match (data_part, last_part) {
803+
(Some(path), None) => {
804+
// Proceed KV + FST backup
805+
let path = Path::new(path);
806+
807+
if StoreKVPool::backup(path).is_ok()
808+
&& StoreFSTPool::backup(path).is_ok()
809+
{
810+
Ok(vec![ChannelCommandResponse::Ok])
811+
} else {
812+
Err(ChannelCommandError::InternalError)
813+
}
814+
}
815+
_ => Err(ChannelCommandError::InvalidFormat("TRIGGER backup <path>")),
816+
}
817+
}
818+
"restore" => {
819+
match (data_part, last_part) {
820+
(Some(path), None) => {
821+
// Proceed KV + FST restore
822+
let path = Path::new(path);
823+
824+
if StoreKVPool::restore(path).is_ok()
825+
&& StoreFSTPool::restore(path).is_ok()
826+
{
827+
Ok(vec![ChannelCommandResponse::Ok])
828+
} else {
829+
Err(ChannelCommandError::InternalError)
830+
}
831+
}
832+
_ => Err(ChannelCommandError::InvalidFormat("TRIGGER restore <path>")),
833+
}
834+
}
835+
_ => Err(ChannelCommandError::NotFound),
799836
}
800837
}
801838
}

src/store/fst.rs

+23-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::fmt;
1818
use std::fs::{self, File};
1919
use std::io::BufWriter;
2020
use std::iter::FromIterator;
21-
use std::path::PathBuf;
21+
use std::path::{Path, PathBuf};
2222
use std::str;
2323
use std::sync::{Arc, Mutex, RwLock};
2424
use std::thread;
@@ -137,6 +137,28 @@ impl StoreFSTPool {
137137
)
138138
}
139139

140+
pub fn backup(path: &Path) -> Result<(), ()> {
141+
debug!("backing up all fst stores to path: {:?}", path);
142+
143+
// TODO: read all FSTs one-by-one
144+
// TODO: stream each FST word to a .fst.bck file, for each fst
145+
146+
Ok(())
147+
}
148+
149+
pub fn restore(path: &Path) -> Result<(), ()> {
150+
debug!("restoring all fst stores from path: {:?}", path);
151+
152+
// TODO: <iteratively do this>:
153+
// -- TODO: iteratively lock; for each FST file being processed (access + write locks; \
154+
// same as destroy lock)
155+
// -- TODO: read all FST backups one-by-one
156+
// -- TODO: re-build actual ordered FST
157+
// -- TODO: close opened FST + nuke their consolidate (if they appear in the backup)
158+
159+
Ok(())
160+
}
161+
140162
pub fn consolidate(force: bool) {
141163
debug!("scanning for fst store pool items to consolidate");
142164

src/store/kv.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use rocksdb::{
1313
use std::fmt;
1414
use std::fs;
1515
use std::io::Cursor;
16-
use std::path::PathBuf;
16+
use std::path::{Path, PathBuf};
1717
use std::sync::{Arc, Mutex, RwLock};
1818
use std::time::SystemTime;
1919
use std::vec::Drain;
@@ -122,6 +122,26 @@ impl StoreKVPool {
122122
&*STORE_ACCESS_LOCK,
123123
)
124124
}
125+
126+
pub fn backup(path: &Path) -> Result<(), ()> {
127+
debug!("backing up all kv stores to path: {:?}", path);
128+
129+
// TODO
130+
131+
Ok(())
132+
}
133+
134+
pub fn restore(path: &Path) -> Result<(), ()> {
135+
debug!("restoring all kv stores from path: {:?}", path);
136+
137+
// TODO: <iteratively do this>:
138+
// -- TODO: lock for each KV dump being processed (access + write locks; same as destroy \
139+
// lock)
140+
// -- TODO: read all KV dumps one-by-one, and read by RocksDB manager
141+
// -- TODO: close opened KV (if they appear in the backup)
142+
143+
Ok(())
144+
}
125145
}
126146

127147
impl StoreGenericPool<StoreKVKey, StoreKV, StoreKVBuilder> for StoreKVPool {}

0 commit comments

Comments
 (0)