Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: disk cache deduped get_ranges #1218

Merged
merged 5 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 73 additions & 26 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use chrono::{DateTime, Utc};
use crc::{Crc, CRC_32_ISCSI};
use futures::stream::BoxStream;
use hash_ext::SeaHasherBuilder;
use log::{debug, error, info, warn};
use log::{debug, info, warn};
use lru::LruCache;
use notifier::notifier::{ExecutionGuard, RequestNotifiers};
use partitioned_lock::PartitionedMutex;
Expand Down Expand Up @@ -167,36 +167,83 @@ impl Manifest {
}
}

/// The encoder of the page file in the disk cache.
/// The writer of the page file in the disk cache.
///
/// Following the payload, a footer [`PageFileEncoder::MAGIC_FOOTER`] is
/// appended.
struct PageFileEncoder {
payload: Bytes,
struct PageFileWriter {
output: String,
tmp_file: String,
need_clean_tmpfile: bool,
}

impl PageFileEncoder {
impl Drop for PageFileWriter {
fn drop(&mut self) {
if self.need_clean_tmpfile {
if let Err(e) = std::fs::remove_file(&self.tmp_file) {
warn!(
"Disk cache remove page tmp file failed, file:{}, err:{e}",
&self.tmp_file
);
}
}
}
}

impl PageFileWriter {
const MAGIC_FOOTER: [u8; 8] = [0, 0, 0, 0, b'c', b'e', b'r', b'e'];

async fn encode_and_persist<W>(&self, writer: &mut W, name: &str) -> Result<()>
where
W: AsyncWrite + std::marker::Unpin,
{
fn new(output: String) -> Self {
let tmp_file = Self::tmp_file(&output);

Self {
output,
tmp_file,
need_clean_tmpfile: true,
}
}

fn tmp_file(input: &str) -> String {
format!("{}.tmp", input)
}

async fn write_inner(&self, bytes: Bytes) -> Result<()> {
let tmp_file = &self.tmp_file;
let mut writer = File::create(tmp_file)
.await
.context(Io { file: tmp_file })?;
writer
.write_all(&self.payload[..])
.write_all(&bytes)
.await
.context(Io { file: name })?;
.context(Io { file: tmp_file })?;

writer
.write_all(&Self::MAGIC_FOOTER)
.await
.context(Io { file: name })?;
.context(Io { file: tmp_file })?;

writer.flush().await.context(Io { file: tmp_file })?;

writer.flush().await.context(Io { file: name })?;
tokio::fs::rename(tmp_file, &self.output)
.await
.context(Io { file: &self.output })?;

Ok(())
}

// When write bytes to file, the cache lock is released, so when one thread is
// reading, another thread may update it, so we write to tmp file first,
// then rename to expected filename to avoid other threads see partial
// content.
async fn write_and_flush(mut self, bytes: Bytes) -> Result<()> {
let write_result = self.write_inner(bytes).await;
if write_result.is_ok() {
self.need_clean_tmpfile = false;
}

write_result
}

#[inline]
fn encoded_size(payload_len: usize) -> usize {
payload_len + Self::MAGIC_FOOTER.len()
Expand Down Expand Up @@ -262,7 +309,7 @@ impl DiskCache {

async fn insert_data(&self, filename: String, value: Bytes) {
let page_meta = {
let file_size = PageFileEncoder::encoded_size(value.len());
let file_size = PageFileWriter::encoded_size(value.len());
PageMeta { file_size }
};
let evicted_file = self.insert_page_meta(filename.clone(), page_meta);
Expand Down Expand Up @@ -357,18 +404,16 @@ impl DiskCache {
}

async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> {
let file_path = std::path::Path::new(&self.root_dir)
let dest_filepath = std::path::Path::new(&self.root_dir)
.join(filename)
.into_os_string()
.into_string()
.unwrap();

let mut file = File::create(&file_path)
.await
.context(Io { file: &file_path })?;
let writer = PageFileWriter::new(dest_filepath);
writer.write_and_flush(payload).await?;

let encoding = PageFileEncoder { payload };
encoding.encode_and_persist(&mut file, filename).await
Ok(())
}

/// Read the bytes from the cached file.
Expand All @@ -381,7 +426,7 @@ impl DiskCache {
range: &Range<usize>,
expect_file_size: usize,
) -> std::io::Result<ReadBytesResult> {
if PageFileEncoder::encoded_size(range.len()) > expect_file_size {
if PageFileWriter::encoded_size(range.len()) > expect_file_size {
return Ok(ReadBytesResult::OutOfRange);
}

Expand Down Expand Up @@ -681,7 +726,7 @@ impl DiskCacheStore {
}
.fail(),
) {
error!("Failed to send notifier error result, err:{e:?}.");
warn!("Failed to send notifier error result, err:{e:?}.");
}
}
}
Expand All @@ -698,8 +743,10 @@ impl DiskCacheStore {
{
self.cache.insert_data(cache_key, bytes.clone()).await;
for notifier in notifiers {
if let Err(e) = notifier.send(Ok(bytes.clone())) {
error!("Failed to send notifier success result, err:{e:?}.");
if notifier.send(Ok(bytes.clone())).is_err() {
// The error contains sent bytes, which maybe very large,
// so we don't log error.
warn!("Failed to send notifier success result");
}
}
}
Expand Down Expand Up @@ -940,6 +987,7 @@ mod test {
use upstream::local::LocalFileSystem;

use super::*;
use crate::test_util::MemoryStore;

struct StoreWithCacheDir {
inner: DiskCacheStore,
Expand All @@ -951,8 +999,7 @@ mod test {
cap: usize,
partition_bits: usize,
) -> StoreWithCacheDir {
let local_path = tempdir().unwrap();
let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
let local_store = Arc::new(MemoryStore::default());

let cache_dir = tempdir().unwrap();
let store = DiskCacheStore::try_new(
Expand Down
2 changes: 2 additions & 0 deletions components/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ pub mod multipart;
pub mod obkv;
pub mod prefix;
pub mod s3;
#[cfg(test)]
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
pub mod test_util;

pub type ObjectStoreRef = Arc<dyn ObjectStore>;
172 changes: 172 additions & 0 deletions components/object_store/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, fmt::Display, ops::Range, sync::RwLock};

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::{self, BoxStream};
use tokio::io::AsyncWrite;
use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};

#[derive(Debug)]
struct StoreError {
path: Path,
msg: String,
}

impl Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StoreError")
.field("path", &self.path)
.field("msg", &self.msg)
.finish()
}
}

impl std::error::Error for StoreError {}

/// A memory based object store implementation, mainly used for testing.
#[derive(Debug, Default)]
pub struct MemoryStore {
files: RwLock<HashMap<Path, Bytes>>,
get_range_counts: RwLock<HashMap<Path, usize>>,
}

impl Display for MemoryStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryStore")
.field("counts", &self.get_counts())
.finish()
}
}

impl MemoryStore {
pub fn get_counts(&self) -> HashMap<Path, usize> {
let counts = self.get_range_counts.read().unwrap();
counts.clone().into_iter().collect()
}
}

#[async_trait]
impl ObjectStore for MemoryStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let mut files = self.files.write().unwrap();
files.insert(location.clone(), bytes);
Ok(())
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
let bs = bs.clone();
Ok(GetResult::Stream(Box::pin(stream::once(
async move { Ok(bs) },
))))
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "get",
source,
})
}
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
{
let mut counts = self.get_range_counts.write().unwrap();
counts
.entry(location.clone())
.and_modify(|c| *c += 1)
.or_insert(1);
}

let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
Ok(bs.slice(range))
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "get_range",
source,
})
}
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let files = self.files.read().unwrap();

if let Some(bs) = files.get(location) {
Ok(ObjectMeta {
location: location.clone(),
size: bs.len(),
last_modified: Default::default(),
})
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "head",
source,
})
}
}

async fn put_multipart(
&self,
_location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
unimplemented!()
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
unimplemented!()
}

async fn delete(&self, _location: &Path) -> Result<()> {
unimplemented!()
}

async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
unimplemented!()
}

async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
unimplemented!()
}

async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}
}
Loading