Skip to content

Commit

Permalink
support persist manifest efficientlly
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Nov 18, 2024
1 parent e65c504 commit 96179be
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 24 deletions.
1 change: 1 addition & 0 deletions horaedb/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions horaedb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ itertools = "0.3"
lazy_static = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
log = "0.4"

# This profile optimizes for good runtime performance.
[profile.release]
Expand Down
1 change: 1 addition & 0 deletions horaedb/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
macros = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true, features = ["object_store"] }
Expand Down
253 changes: 236 additions & 17 deletions horaedb/metric_engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use anyhow::Context;
use bytes::Bytes;
use futures::StreamExt;
use log::{error, info};
use object_store::{path::Path, PutPayload};
use prost::Message;
use tokio::sync::RwLock;
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
RwLock,
},
time,
};

use crate::{
sst::{FileId, FileMeta, SstFile},
Expand All @@ -29,13 +39,16 @@ use crate::{

pub const PREFIX_PATH: &str = "manifest";
pub const SNAPSHOT_FILENAME: &str = "snapshot";
pub const SST_PREFIX: &str = "sst";

pub struct Manifest {
path: String,
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,
sender: Sender<SstsMergeTask>,

payload: RwLock<Payload>,
payload: Arc<RwLock<Payload>>,
}

pub struct Payload {
Expand Down Expand Up @@ -69,8 +82,29 @@ impl From<Payload> for pb_types::Manifest {
}

impl Manifest {
pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
pub async fn try_new(
path: String,
store: ObjectStoreRef,
merge_options: SstsMergeOptions,
) -> Result<Self> {
let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
let sst_path = Path::from(format!("{path}/{SST_PREFIX}"));

let (tx, rx) = mpsc::channel(merge_options.channel_size);
let mut merge_ssts = SstsMerge::try_new(
snapshot_path.clone(),
sst_path.clone(),
store.clone(),
rx,
merge_options,
)
.await?;

// Start merge ssts task
tokio::spawn(async move {
merge_ssts.run().await;
});

let payload = match store.get(&snapshot_path).await {
Ok(v) => {
let bytes = v
Expand All @@ -90,43 +124,42 @@ impl Manifest {
}
}
};
let payload = Arc::new(RwLock::new(payload));

Ok(Self {
path,
snapshot_path,
sst_path,
store,
payload: RwLock::new(payload),
payload,
sender: tx,
})
}

// TODO: Now this functions is poorly implemented, we concat new_sst to
// snapshot, and upload it back in a whole.
// In more efficient way, we can create a new diff file, and do compaction in
// background to merge them to `snapshot`.
pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
let mut payload = self.payload.write().await;
let mut tmp_ssts = payload.files.clone();
let new_sst_path = Path::from(format!("{}/{id}", self.sst_path));
let new_sst = SstFile { id, meta };
tmp_ssts.push(new_sst.clone());
let pb_manifest = pb_types::Manifest {
files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
};

let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
pb_manifest
let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
let mut buf: Vec<u8> = Vec::with_capacity(new_sst_payload.encoded_len());
new_sst_payload
.encode(&mut buf)
.context("failed to encode manifest")?;
let put_payload = PutPayload::from_bytes(Bytes::from(buf));

// 1. Persist the snapshot
self.store
.put(&self.snapshot_path, put_payload)
.put(&new_sst_path, put_payload)
.await
.context("Failed to update manifest")?;

// 2. Update cached payload
let mut payload = self.payload.write().await;
payload.files.push(new_sst);

// 3. Schedule ssts merge
self.sender(SstsMergeTask::MergeSsts(1)).await;

Ok(())
}

Expand All @@ -140,4 +173,190 @@ impl Manifest {
.cloned()
.collect()
}

async fn sender(&self, task: SstsMergeTask) {
if let Err(err) = self.sender.send(task).await {
error!("Failed to send merge ssts task, err: {:?}", err);
}
}
}

enum SstsMergeTask {
ForceMergeSsts,
MergeSsts(usize),
}

pub struct SstsMergeOptions {
channel_size: usize,
max_interval_seconds: usize,
merge_threshold: usize,
}

impl Default for SstsMergeOptions {
fn default() -> Self {
Self {
channel_size: 100,
max_interval_seconds: 5,
merge_threshold: 50,
}
}
}

struct SstsMerge {
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,
receiver: Receiver<SstsMergeTask>,
sst_num: RwLock<usize>,
merge_options: SstsMergeOptions,
}

impl SstsMerge {
async fn try_new(
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,
rx: Receiver<SstsMergeTask>,
merge_options: SstsMergeOptions,
) -> Result<Self> {
let ssts_merge = Self {
snapshot_path,
sst_path,
store,
receiver: rx,
sst_num: RwLock::new(0),
merge_options,
};
// Merge all ssts when start
ssts_merge.merge_ssts().await?;

Ok(ssts_merge)
}

async fn run(&mut self) {
let interval = time::Duration::from_secs(self.merge_options.max_interval_seconds as u64);
let threshold = self.merge_options.merge_threshold;

loop {
match time::timeout(interval, self.receiver.recv()).await {
Ok(Some(SstsMergeTask::ForceMergeSsts)) => match self.merge_ssts().await {
Ok(_) => {
*self.sst_num.write().await = 0;
}
Err(err) => {
error!("Failed to force merge ssts, err: {:?}", err);
}
},
Ok(Some(SstsMergeTask::MergeSsts(num))) => {
let mut sst_num = self.sst_num.write().await;
*sst_num += num;
if *sst_num >= threshold {
match self.merge_ssts().await {
Ok(_) => {
*sst_num = 0;
}
Err(err) => {
error!("Failed to merge ssts, err: {:?}", err);
}
}
}
}
Ok(None) => {
// The channel is disconnected.
info!("Channel disconnected, merge ssts task exit");
break;
}
Err(_) => {
info!("Timeout receive merge ssts task");
}
}
}
}

async fn merge_ssts(&self) -> Result<()> {
let meta_infos = self
.store
.list(Some(&self.sst_path))
.collect::<Vec<_>>()
.await;
if meta_infos.is_empty() {
return Ok(());
}

let mut paths = Vec::with_capacity(meta_infos.len());
for meta_info in meta_infos {
let path = meta_info
.context("failed to get path of manifest sst")?
.location;
paths.push(path);
}

let mut sst_files = Vec::with_capacity(paths.len());
for path in &paths {
let bytes = self
.store
.get(path)
.await
.context("failed to read sst file")?
.bytes()
.await
.context("failed to read sst file")?;
let pb_sst = pb_types::SstFile::decode(bytes).context("failed to decode sst file")?;
sst_files.push(SstFile::try_from(pb_sst)?);
}

let mut payload = match self.store.get(&self.snapshot_path).await {
Ok(v) => {
let bytes = v
.bytes()
.await
.context("failed to read manifest snapshot")?;
let pb_payload = pb_types::Manifest::decode(bytes)
.context("failed to decode manifest snapshot")?;
Payload::try_from(pb_payload)?
}
Err(err) => {
if err.to_string().contains("not found") {
Payload { files: vec![] }
} else {
let context = format!(
"Failed to get manifest snapshot, path:{}",
self.snapshot_path
);
return Err(AnyhowError::new(err).context(context).into());
}
}
};

payload.files.extend(sst_files.clone());
let pb_manifest = pb_types::Manifest {
files: payload
.files
.into_iter()
.map(|f| f.into())
.collect::<Vec<_>>(),
};

let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
pb_manifest
.encode(&mut buf)
.context("failed to encode manifest")?;
let put_payload = PutPayload::from_bytes(Bytes::from(buf));

// 1. Persist the snapshot
self.store
.put(&self.snapshot_path, put_payload)
.await
.context("Failed to update manifest")?;

// 2. Delete the old sst files
for path in &paths {
self.store
.delete(path)
.await
.context("failed to delete sst file")?;
}

Ok(())
}
}
26 changes: 19 additions & 7 deletions horaedb/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use parquet::{
};

use crate::{
manifest::Manifest,
manifest::{Manifest, SstsMergeOptions},
read::DefaultParquetFileReaderFactory,
sst::{allocate_id, FileId, FileMeta},
types::{ObjectStoreRef, TimeRange, Timestamp, WriteResult},
Expand Down Expand Up @@ -109,10 +109,15 @@ impl CloudObjectStorage {
arrow_schema: SchemaRef,
num_primary_key: usize,
timestamp_index: usize,
merge_options: SstsMergeOptions,
) -> Result<Self> {
let manifest_prefix = crate::manifest::PREFIX_PATH;
let manifest =
Manifest::try_new(format!("{root_path}/{manifest_prefix}"), store.clone()).await?;
let manifest = Manifest::try_new(
format!("{root_path}/{manifest_prefix}"),
store.clone(),
merge_options,
)
.await?;
let df_schema = DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
Ok(Self {
path: root_path,
Expand Down Expand Up @@ -275,6 +280,7 @@ mod tests {
use object_store::local::LocalFileSystem;

use super::*;
use crate::manifest::SstsMergeOptions;

#[tokio::test]
async fn test_sort_batch() {
Expand All @@ -286,10 +292,16 @@ mod tests {
]));

let store = Arc::new(LocalFileSystem::new());
let storage =
CloudObjectStorage::try_new("/tmp/storage".to_string(), store, schema.clone(), 1, 1)
.await
.unwrap();
let storage = CloudObjectStorage::try_new(
"/tmp/storage".to_string(),
store,
schema.clone(),
1,
1,
SstsMergeOptions::default(),
)
.await
.unwrap();

let batch = RecordBatch::try_new(
schema.clone(),
Expand Down

0 comments on commit 96179be

Please sign in to comment.