Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support persist manifest efficientlly #1596

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions horaedb/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions horaedb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ itertools = "0.3"
lazy_static = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
log = "0.4"

# This profile optimizes for good runtime performance.
[profile.release]
Expand Down
1 change: 1 addition & 0 deletions horaedb/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
macros = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true, features = ["object_store"] }
Expand Down
254 changes: 236 additions & 18 deletions horaedb/metric_engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use anyhow::Context;
use bytes::Bytes;
use futures::StreamExt;
use log::{error, info};
use object_store::{path::Path, PutPayload};
use prost::Message;
use tokio::sync::RwLock;
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
RwLock,
},
time,
};

use crate::{
sst::{FileId, FileMeta, SstFile},
Expand All @@ -29,13 +39,16 @@ use crate::{

pub const PREFIX_PATH: &str = "manifest";
pub const SNAPSHOT_FILENAME: &str = "snapshot";
pub const SST_PREFIX: &str = "sst";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those files should not be named sst, sst is only used in LSM, and represent sorted string table, here those file are delta files based on snapshot.


pub struct Manifest {
path: String,
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,
sender: Sender<SstsMergeTask>,

payload: RwLock<Payload>,
payload: Arc<RwLock<Payload>>,
}

pub struct Payload {
Expand Down Expand Up @@ -69,8 +82,29 @@ impl From<Payload> for pb_types::Manifest {
}

impl Manifest {
pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
pub async fn try_new(
path: String,
store: ObjectStoreRef,
merge_options: SstsMergeOptions,
) -> Result<Self> {
let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
let sst_path = Path::from(format!("{path}/{SST_PREFIX}"));

let (tx, rx) = mpsc::channel(merge_options.channel_size);
let mut ssts_merge = SstsMerge::try_new(
snapshot_path.clone(),
sst_path.clone(),
store.clone(),
rx,
merge_options,
)
.await?;

// Start merge ssts task
tokio::spawn(async move {
ssts_merge.run().await;
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When start up, we also need to read delta files, if we only read snapshot, then part of sst won't be queried later?

let payload = match store.get(&snapshot_path).await {
Ok(v) => {
let bytes = v
Expand All @@ -90,43 +124,42 @@ impl Manifest {
}
}
};
let payload = Arc::new(RwLock::new(payload));

Ok(Self {
path,
snapshot_path,
sst_path,
store,
payload: RwLock::new(payload),
payload,
sender: tx,
})
}

// TODO: Now this functions is poorly implemented, we concat new_sst to
// snapshot, and upload it back in a whole.
// In more efficient way, we can create a new diff file, and do compaction in
// background to merge them to `snapshot`.
pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
let mut payload = self.payload.write().await;
let mut tmp_ssts = payload.files.clone();
let new_sst_path = Path::from(format!("{}/{id}", self.sst_path));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are too many pending delta files, we should block the write here.

let new_sst = SstFile { id, meta };
tmp_ssts.push(new_sst.clone());
let pb_manifest = pb_types::Manifest {
files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
};

let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
pb_manifest
let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
let mut buf: Vec<u8> = Vec::with_capacity(new_sst_payload.encoded_len());
new_sst_payload
.encode(&mut buf)
.context("failed to encode manifest")?;
let put_payload = PutPayload::from_bytes(Bytes::from(buf));

// 1. Persist the snapshot
// 1. Persist the sst file
self.store
.put(&self.snapshot_path, put_payload)
.put(&new_sst_path, put_payload)
.await
.context("Failed to update manifest")?;

// 2. Update cached payload
let mut payload = self.payload.write().await;
payload.files.push(new_sst);

// 3. Schedule ssts merge
self.sender(SstsMergeTask::MergeSsts(1)).await;

Ok(())
}

Expand All @@ -140,4 +173,189 @@ impl Manifest {
.cloned()
.collect()
}

async fn sender(&self, task: SstsMergeTask) {
if let Err(err) = self.sender.send(task).await {
error!("Failed to send merge ssts task, err: {:?}", err);
}
}
}

enum SstsMergeTask {
ForceMergeSsts,
MergeSsts(usize),
}

pub struct SstsMergeOptions {
channel_size: usize,
max_interval_seconds: usize,
merge_threshold: usize,
}

impl Default for SstsMergeOptions {
fn default() -> Self {
Self {
channel_size: 100,
max_interval_seconds: 5,
merge_threshold: 50,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10-20 is a reasonable choice.

}
}
}

struct SstsMerge {
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,
receiver: Receiver<SstsMergeTask>,
sst_num: usize,
merge_options: SstsMergeOptions,
}

impl SstsMerge {
async fn try_new(
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,
rx: Receiver<SstsMergeTask>,
merge_options: SstsMergeOptions,
) -> Result<Self> {
let ssts_merge = Self {
snapshot_path,
sst_path,
store,
receiver: rx,
sst_num: 0,
merge_options,
};
// Merge all ssts when start
ssts_merge.merge_ssts().await?;

Ok(ssts_merge)
}

async fn run(&mut self) {
let interval = time::Duration::from_secs(self.merge_options.max_interval_seconds as u64);
let threshold = self.merge_options.merge_threshold;

loop {
match time::timeout(interval, self.receiver.recv()).await {
Ok(Some(SstsMergeTask::ForceMergeSsts)) => match self.merge_ssts().await {
Ok(_) => {
self.sst_num = 0;
}
Err(err) => {
error!("Failed to force merge ssts, err: {:?}", err);
}
},
Ok(Some(SstsMergeTask::MergeSsts(num))) => {
self.sst_num += num;
if self.sst_num >= threshold {
match self.merge_ssts().await {
Ok(_) => {
self.sst_num = 0;
}
Err(err) => {
error!("Failed to merge ssts, err: {:?}", err);
}
}
}
}
Ok(None) => {
// The channel is disconnected.
info!("Channel disconnected, merge ssts task exit");
break;
}
Err(_) => {
info!("Timeout receive merge ssts task");
}
}
}
}

async fn merge_ssts(&self) -> Result<()> {
let meta_infos = self
.store
.list(Some(&self.sst_path))
.collect::<Vec<_>>()
.await;
if meta_infos.is_empty() {
return Ok(());
}

let mut paths = Vec::with_capacity(meta_infos.len());
for meta_info in meta_infos {
let path = meta_info
.context("failed to get path of manifest sst")?
.location;
paths.push(path);
}

let mut sst_files = Vec::with_capacity(paths.len());
for path in &paths {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could fetch those files concurrently, one after one is too slow.

let bytes = self
.store
.get(path)
.await
.context("failed to read sst file")?
.bytes()
.await
.context("failed to read sst file")?;
let pb_sst = pb_types::SstFile::decode(bytes).context("failed to decode sst file")?;
sst_files.push(SstFile::try_from(pb_sst)?);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to dedup files before merge, since there may exists old delta files, which are failed to delete in last merge.

}

let mut payload = match self.store.get(&self.snapshot_path).await {
Ok(v) => {
let bytes = v
.bytes()
.await
.context("failed to read manifest snapshot")?;
let pb_payload = pb_types::Manifest::decode(bytes)
.context("failed to decode manifest snapshot")?;
Payload::try_from(pb_payload)?
}
Err(err) => {
if err.to_string().contains("not found") {
Payload { files: vec![] }
} else {
let context = format!(
"Failed to get manifest snapshot, path:{}",
self.snapshot_path
);
return Err(AnyhowError::new(err).context(context).into());
}
}
};

payload.files.extend(sst_files.clone());
let pb_manifest = pb_types::Manifest {
files: payload
.files
.into_iter()
.map(|f| f.into())
.collect::<Vec<_>>(),
};

let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
pb_manifest
.encode(&mut buf)
.context("failed to encode manifest")?;
let put_payload = PutPayload::from_bytes(Bytes::from(buf));

// 1. Persist the snapshot
self.store
.put(&self.snapshot_path, put_payload)
.await
.context("Failed to update manifest")?;

// 2. Delete the old sst files
for path in &paths {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete concurrently

self.store
.delete(path)
.await
.context("failed to delete sst file")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add path name to log

}

Ok(())
}
}
13 changes: 10 additions & 3 deletions horaedb/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use parquet::{
};

use crate::{
manifest::Manifest,
manifest::{Manifest, SstsMergeOptions},
read::DefaultParquetFileReaderFactory,
sst::{allocate_id, FileId, FileMeta},
types::{ObjectStoreRef, TimeRange, Timestamp, WriteOptions, WriteResult},
Expand Down Expand Up @@ -112,10 +112,15 @@ impl CloudObjectStorage {
num_primary_key: usize,
timestamp_index: usize,
write_options: WriteOptions,
merge_options: SstsMergeOptions,
) -> Result<Self> {
let manifest_prefix = crate::manifest::PREFIX_PATH;
let manifest =
Manifest::try_new(format!("{root_path}/{manifest_prefix}"), store.clone()).await?;
let manifest = Manifest::try_new(
format!("{root_path}/{manifest_prefix}"),
store.clone(),
merge_options,
)
.await?;
let df_schema = DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
let write_props = Self::build_write_props(write_options, num_primary_key);
Ok(Self {
Expand Down Expand Up @@ -326,6 +331,7 @@ mod tests {
use object_store::local::LocalFileSystem;

use super::*;
use crate::manifest::SstsMergeOptions;

#[tokio::test]
async fn test_sort_batch() {
Expand All @@ -344,6 +350,7 @@ mod tests {
1,
1,
WriteOptions::default(),
SstsMergeOptions::default(),
)
.await
.unwrap();
Expand Down
Loading