Skip to content

Commit

Permalink
fix: avoid error when disk cache miss (apache#790)
Browse files Browse the repository at this point in the history
* fix: avoid error when disk cache miss

* fix: ignore error when recover and update
  • Loading branch information
ShiKaiWi authored Mar 30, 2023
1 parent f3fbd9e commit 2c997f9
Showing 1 changed file with 95 additions and 13 deletions.
108 changes: 95 additions & 13 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! An ObjectStore implementation with disk as cache.
//! The disk cache is a read-through caching, with page as its minimal cache
Expand Down Expand Up @@ -129,8 +129,11 @@ impl DiskCache {
}
}

/// Update the cache.
///
/// The returned value denotes whether succeed.
// TODO: We now hold lock when doing IO, possible to release it?
async fn update_cache(&self, key: String, value: Option<Bytes>) -> Result<()> {
async fn update_cache(&self, key: String, value: Option<Bytes>) -> bool {
let mut cache = self.cache.lock().await;
debug!(
"Disk cache update, key:{}, len:{}, cap:{}.",
Expand All @@ -139,6 +142,7 @@ impl DiskCache {
self.cap
);

// TODO: remove a batch of files to avoid IO during the following update cache.
if cache.len() >= self.cap {
let (filename, _) = cache.pop_lru().unwrap();
let file_path = std::path::Path::new(&self.root_dir)
Expand All @@ -153,30 +157,45 @@ impl DiskCache {
}
}

// Persist the value if needed
if let Some(value) = value {
self.persist_bytes(&key, value).await?;
if let Err(e) = self.persist_bytes(&key, value).await {
error!("Failed to persist cache, key:{}, err:{}.", key, e);
return false;
}
}

// Update the key
cache.push(key, ());

Ok(())
true
}

async fn insert(&self, key: String, value: Bytes) -> Result<()> {
async fn insert(&self, key: String, value: Bytes) -> bool {
self.update_cache(key, Some(value)).await
}

async fn recover(&self, filename: String) -> Result<()> {
async fn recover(&self, filename: String) -> bool {
self.update_cache(filename, None).await
}

async fn get(&self, key: &str) -> Result<Option<Bytes>> {
async fn get(&self, key: &str) -> Option<Bytes> {
let mut cache = self.cache.lock().await;
if cache.get(key).is_some() {
// TODO: release lock when doing IO
return self.read_bytes(key).await.map(Some);
match self.read_bytes(key).await {
Ok(v) => Some(v),
Err(e) => {
error!(
"Read disk cache failed but ignored, key:{}, err:{}.",
key, e
);
None
}
}
} else {
None
}

Ok(None)
}

async fn persist_bytes(&self, filename: &str, value: Bytes) -> Result<()> {
Expand Down Expand Up @@ -349,7 +368,7 @@ impl DiskCacheStore {
info!("Disk cache recover_cache, filename:{}.", &filename);

if filename != MANIFEST_FILE {
cache.recover(filename).await?;
cache.recover(filename).await;
}
}

Expand Down Expand Up @@ -437,7 +456,7 @@ impl ObjectStore for DiskCacheStore {
let mut missing_ranges = Vec::new();
for range in aligned_ranges {
let cache_key = Self::cache_key(location, &range);
if let Some(bytes) = self.cache.get(&cache_key).await? {
if let Some(bytes) = self.cache.get(&cache_key).await {
ranged_bytes.insert(range.start, bytes);
} else {
missing_ranges.push(range);
Expand All @@ -447,8 +466,9 @@ impl ObjectStore for DiskCacheStore {
for range in missing_ranges {
let range_start = range.start;
let cache_key = Self::cache_key(location, &range);
// TODO: we should use get_ranges here.
let bytes = self.underlying_store.get_range(location, range).await?;
self.cache.insert(cache_key, bytes.clone()).await?;
self.cache.insert(cache_key, bytes.clone()).await;
ranged_bytes.insert(range_start, bytes);
}

Expand Down Expand Up @@ -782,4 +802,66 @@ mod test {
assert_eq!(actual, expect);
}
}

#[tokio::test]
async fn corrupted_disk_cache() {
let StoreWithCacheDir {
inner: store,
cache_dir,
} = prepare_store(16, 1024).await;
let test_file_name = "corrupted_disk_cache_file";
let test_file_path = Path::from(test_file_name);
let test_file_bytes = Bytes::from("corrupted_disk_cache_file_data");

// Put data into store and get it to let the cache load the data.
store
.put(&test_file_path, test_file_bytes.clone())
.await
.unwrap();

// The data should be in the cache.
let got_bytes = store
.get_range(&test_file_path, 0..test_file_bytes.len())
.await
.unwrap();
assert_eq!(got_bytes, test_file_bytes);

// Corrupt files in the cache dir.
let mut cache_read_dir = tokio::fs::read_dir(cache_dir.as_ref()).await.unwrap();
while let Some(entry) = cache_read_dir.next_entry().await.unwrap() {
let path_buf = entry.path();
let path = path_buf.to_str().unwrap();
if path.contains(test_file_name) {
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.open(path)
.await
.unwrap();
file.write_all(b"corrupted").await.unwrap();
}
}

// The data should be removed from the cache.
let got_bytes = store
.get_range(&test_file_path, 0..test_file_bytes.len())
.await
.unwrap();
assert_eq!(got_bytes, test_file_bytes);
// The cache should be updated.
let mut cache_read_dir = tokio::fs::read_dir(cache_dir.as_ref()).await.unwrap();
while let Some(entry) = cache_read_dir.next_entry().await.unwrap() {
let path_buf = entry.path();
let path = path_buf.to_str().unwrap();
if path.contains(test_file_name) {
let mut file = tokio::fs::OpenOptions::new()
.read(true)
.open(path)
.await
.unwrap();
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await.unwrap();
assert_ne!(buffer, b"corrupted");
}
}
}
}

0 comments on commit 2c997f9

Please sign in to comment.