Skip to content

Commit

Permalink
Add integration test for the shared cache
Browse files Browse the repository at this point in the history
Signed-off-by: Vlad Volodkin <vlaad@amazon.com>
  • Loading branch information
Vlad Volodkin committed Oct 28, 2024
1 parent e72d7ac commit 878e9b9
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 1 deletion.
43 changes: 42 additions & 1 deletion mountpoint-s3/tests/common/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub mod s3_session {
use mountpoint_s3_client::types::{Checksum, PutObjectTrailingChecksums};
use mountpoint_s3_client::S3CrtClient;

use crate::common::s3::{get_test_bucket_and_prefix, get_test_region, get_test_sdk_client};
use crate::common::s3::{get_standard_bucket, get_test_bucket_and_prefix, get_test_region, get_test_sdk_client};

/// Create a FUSE mount backed by a real S3 client
pub fn new(test_name: &str, test_config: TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) {
Expand Down Expand Up @@ -345,6 +345,47 @@ pub mod s3_session {
}
}

/// Create a FUSE mount backed by a real S3 client with a cache.
/// Note, that the mount uses S3 Standard as a source bucket.
pub fn new_with_cache_factory<Cache, CacheFactory>(
prefix: String,
cache_factory: CacheFactory,
test_config: TestSessionConfig,
block_size: u64,
) -> (TempDir, BackgroundSession, TestClientBox)
where
Cache: DataCache + Send + Sync + 'static,
CacheFactory: FnOnce(S3CrtClient, u64) -> Cache,
{
let mount_dir = tempfile::tempdir().unwrap();

let bucket = get_standard_bucket();
let region = get_test_region();

let client_config = S3ClientConfig::default()
.part_size(test_config.part_size)
.endpoint_config(EndpointConfig::new(&region))
.read_backpressure(true)
.initial_read_window(test_config.initial_read_window_size);
let client = S3CrtClient::new(client_config).unwrap();

let cache = cache_factory(client.clone(), block_size);

let runtime = client.event_loop_group();
let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config);
let session = create_fuse_session(
client,
prefetcher,
&bucket,
&prefix,
mount_dir.path(),
test_config.filesystem_config,
);
let test_client = create_test_client(&region, &bucket, &prefix);

(mount_dir, session, test_client)
}

fn create_test_client(region: &str, bucket: &str, prefix: &str) -> TestClientBox {
let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await });
let test_client = SDKTestClient {
Expand Down
10 changes: 10 additions & 0 deletions mountpoint-s3/tests/common/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) {
(bucket, prefix)
}

#[cfg(feature = "s3express_tests")]
pub fn get_express_cache_bucket() -> String {
std::env::var("S3_EXPRESS_ONE_ZONE_BUCKET_NAME")
.expect("Set S3_EXPRESS_ONE_ZONE_BUCKET_NAME to run integration tests")
}

pub fn get_standard_bucket() -> String {
std::env::var("S3_BUCKET_NAME").expect("Set S3_BUCKET_NAME to run integration tests")
}

pub fn get_test_bucket_forbidden() -> String {
std::env::var("S3_FORBIDDEN_BUCKET_NAME").expect("Set S3_FORBIDDEN_BUCKET_NAME to run integration tests")
}
Expand Down
119 changes: 119 additions & 0 deletions mountpoint-s3/tests/fuse_tests/cache_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use crate::common::fuse::{self, TestSessionConfig};
use crate::common::s3::{get_express_cache_bucket, get_test_bucket_and_prefix};
use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache};
use mountpoint_s3_client::S3CrtClient;
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use std::fs;
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Duration;
use test_case::test_case;

#[test_case("key", 100, 1024; "simple")]
#[test_case("£", 100, 1024; "non-ascii key")]
#[test_case("key", 1024, 1024; "long key")]
#[test_case("key", 100, 1024 * 1024; "big file")]
fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
cache_write_read_base(
key_suffix,
key_size,
object_size,
express_cache_factory,
"express_cache_write_read",
)
}

#[test_case("key", 100, 1024; "simple")]
#[test_case("£", 100, 1024; "non-ascii key")]
#[test_case("key", 1024, 1024; "long key")]
#[test_case("key", 100, 1024 * 1024; "big file")]
fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let cache_dir = tempfile::tempdir().unwrap();
cache_write_read_base(
key_suffix,
key_size,
object_size,
disk_cache_factory(cache_dir.path().to_owned()),
"disk_cache_write_read",
);
}

fn cache_write_read_base<Cache, CacheFactory>(
key_suffix: &str,
key_size: usize,
object_size: usize,
cache_factory: CacheFactory,
test_name: &str,
) where
Cache: DataCache + Send + Sync + 'static,
CacheFactory: FnOnce(S3CrtClient, u64) -> Cache,
{
// mount a bucket
const BLOCK_SIZE: u64 = 512 * 1024;
let mut test_config: TestSessionConfig = TestSessionConfig::default();
test_config.filesystem_config.cache_config.serve_lookup_from_cache = true;
test_config.filesystem_config.cache_config.dir_ttl = Duration::from_secs(3600);
test_config.filesystem_config.cache_config.file_ttl = Duration::from_secs(3600);
let (_, prefix) = get_test_bucket_and_prefix(test_name);
let (mount_point, _session, mut client) =
fuse::s3_session::new_with_cache_factory(prefix.clone(), cache_factory, test_config, BLOCK_SIZE);

// write an object, no caching happens yet
let key = get_object_key(&prefix, key_suffix, key_size);
let path = mount_point.path().join(&key);
let written = random_binary_data(object_size);
fs::write(&path, &written).expect("write should succeed");

// first read should be from the source bucket and be cached
let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, written);

// cache population is async, 1 second should be enough for it to finish
sleep(Duration::from_secs(1));

// ensure data may not be served from the source bucket
client.remove_object(&key).expect("remove must succeed");
assert!(
!client.contains_key(&key).expect("head object must succeed"),
"object should not exist in the source bucket"
);

// second read should be from the cache
let read = fs::read(&path).expect("read from the cache should succeed");
assert_eq!(read, written);
}

fn express_cache_factory(client: S3CrtClient, block_size: u64) -> ExpressDataCache<S3CrtClient> {
let express_bucket_name = get_express_cache_bucket();
ExpressDataCache::new(&express_bucket_name, client, &express_bucket_name, block_size)
}

fn disk_cache_factory(cache_dir: PathBuf) -> impl FnOnce(S3CrtClient, u64) -> DiskDataCache {
move |_, block_size| {
let cache_config = DiskDataCacheConfig {
block_size,
limit: Default::default(),
};
DiskDataCache::new(cache_dir, cache_config)
}
}

fn random_binary_data(size_in_bytes: usize) -> Vec<u8> {
let seed = rand::thread_rng().gen();
let mut rng = ChaChaRng::seed_from_u64(seed);
let mut data = vec![0; size_in_bytes];
rng.fill_bytes(&mut data);
data
}

// Creates a random key which has a size of at least `min_size_in_bytes`
fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String {
let random_suffix: u64 = rand::thread_rng().gen();
let last_key_part = format!("{key_suffix}{random_suffix}"); // part of the key after all the "/"
let full_key = format!("{key_prefix}{last_key_part}");
let full_key_size = full_key.as_bytes().len();
let padding_size = min_size_in_bytes.saturating_sub(full_key_size);
let padding = "0".repeat(padding_size);
format!("{last_key_part}{padding}")
}
2 changes: 2 additions & 0 deletions mountpoint-s3/tests/fuse_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
mod cache_test;
mod consistency_test;
mod fork_test;
mod lookup_test;
Expand Down

0 comments on commit 878e9b9

Please sign in to comment.