From df32eaf7858a16f8fdc7e436d0e4257add461c87 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Mon, 15 Jul 2024 20:57:25 +0000 Subject: [PATCH] Re-implement the prefetcher using backpressure mechanism The prefetcher now uses only one GetObject request to fetch data in advance. This request has a range of entire object but use backpressure mechanism to control how much data it wants to fetch into the part queue instead of spawning up to two requests in parallel. This should make the throughput more stable because previously the two request tasks could compete with each other when fetching data from S3. Also, it will be easier to control how much data we want to store in the part queue. Signed-off-by: Monthon Klongklaew --- mountpoint-s3/src/cli.rs | 14 + mountpoint-s3/src/fs.rs | 2 + mountpoint-s3/src/fs/error.rs | 4 +- mountpoint-s3/src/prefetch.rs | 499 ++++++++++-------- .../src/prefetch/backpressure_controller.rs | 151 ++++++ mountpoint-s3/src/prefetch/caching_stream.rs | 429 +++++++++++---- mountpoint-s3/src/prefetch/part.rs | 119 +++++ mountpoint-s3/src/prefetch/part_queue.rs | 31 +- mountpoint-s3/src/prefetch/part_stream.rs | 149 +++++- mountpoint-s3/src/prefetch/task.rs | 91 ++-- mountpoint-s3/tests/common/fuse.rs | 17 +- mountpoint-s3/tests/common/mod.rs | 2 + mountpoint-s3/tests/fs.rs | 13 +- .../tests/fuse_tests/prefetch_test.rs | 70 ++- mountpoint-s3/tests/mock_s3_tests.rs | 1 + 15 files changed, 1174 insertions(+), 418 deletions(-) create mode 100644 mountpoint-s3/src/prefetch/backpressure_controller.rs diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index 7fdb18b8f..4fccb4039 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -615,10 +615,24 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo user_agent.key_value("mp-cache-ttl", &ttl.to_string()); } + // This is a weird looking number! We really want our first request size to be 1MiB, + // which is a common IO size. But Linux's readahead will try to read an extra 128k on on + // top of a 1MiB read, which we'd have to wait for a second request to service. Because + // FUSE doesn't know the difference between regular reads and readahead reads, it will + // send us a READ request for that 128k, so we'll have to block waiting for it even if + // the application doesn't want it. This is all in the noise for sequential IO, but + // waiting for the readahead hurts random IO. So we add 128k to the first request size + // to avoid the latency hit of the second request. + // + // Note the CRT does not respect this value right now, they always return chunks of part size + // but this is the first window size we prefer. + let initial_read_window_size = 1024 * 1024 + 128 * 1024; let mut client_config = S3ClientConfig::new() .auth_config(auth_config) .throughput_target_gbps(throughput_target_gbps) .part_size(args.part_size as usize) + .read_backpressure(true) + .initial_read_window(initial_read_window_size) .user_agent(user_agent); if args.requester_pays { client_config = client_config.request_payer("requester"); diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 46c3f0915..b03563cd2 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -1349,6 +1349,8 @@ mod tests { let bucket = "bucket"; let client = Arc::new(MockClient::new(MockClientConfig { bucket: bucket.to_owned(), + enable_backpressure: true, + initial_read_window_size: 1024 * 1024, ..Default::default() })); // Create "dir1" in the client to avoid creating it locally diff --git a/mountpoint-s3/src/fs/error.rs b/mountpoint-s3/src/fs/error.rs index d01c53512..68554ac8b 100644 --- a/mountpoint-s3/src/fs/error.rs +++ b/mountpoint-s3/src/fs/error.rs @@ -131,7 +131,9 @@ impl From> fo PrefetchReadError::Integrity(e) => err!(libc::EIO, source:e, "integrity error"), PrefetchReadError::GetRequestFailed(_) | PrefetchReadError::GetRequestTerminatedUnexpectedly - | PrefetchReadError::GetRequestReturnedWrongOffset { .. } => { + | PrefetchReadError::GetRequestReturnedWrongOffset { .. } + | PrefetchReadError::BackpressurePreconditionFailed + | PrefetchReadError::ReadWindowIncrement => { err!(libc::EIO, source:err, "get request failed") } } diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 486a1ceed..ca4565410 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -7,6 +7,7 @@ //! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a //! non-sequential read, we abandon the prefetching and start again with the minimum request size. +mod backpressure_controller; mod caching_stream; mod part; mod part_queue; @@ -14,7 +15,6 @@ mod part_stream; mod seek_window; mod task; -use std::collections::VecDeque; use std::fmt::Debug; use std::time::Duration; @@ -24,6 +24,7 @@ use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; +use task::RequestTaskConfig; use thiserror::Error; use tracing::trace; @@ -79,6 +80,12 @@ pub enum PrefetchReadError { #[error("integrity check failed")] Integrity(#[from] IntegrityError), + + #[error("backpressure must be enabled with non-zero initial read window")] + BackpressurePreconditionFailed, + + #[error("read window increment failed")] + ReadWindowIncrement, } pub type DefaultPrefetcher = Prefetcher>; @@ -110,36 +117,24 @@ where #[derive(Debug, Clone, Copy)] pub struct PrefetcherConfig { - /// Size of the first request in a prefetch run - pub first_request_size: usize, - /// Maximum size of a single prefetch request - pub max_request_size: usize, + /// Maximum size of the read window + max_read_window_size: usize, /// Factor to increase the request size by whenever the reader continues making sequential reads - pub sequential_prefetch_multiplier: usize, + sequential_prefetch_multiplier: usize, /// Timeout to wait for a part to become available - pub read_timeout: Duration, + read_timeout: Duration, /// The maximum amount of unavailable data the prefetcher will tolerate during a seek operation /// before resetting and starting a new S3 request. - pub max_forward_seek_wait_distance: u64, + max_forward_seek_wait_distance: u64, /// The maximum distance the prefetcher will seek backwards before resetting and starting a new /// S3 request. We keep this much data in memory in addition to any inflight requests. - pub max_backward_seek_distance: u64, + max_backward_seek_distance: u64, } impl Default for PrefetcherConfig { fn default() -> Self { - #[allow(clippy::identity_op)] Self { - // This is a weird looking number! We really want our first request size to be 1MiB, - // which is a common IO size. But Linux's readahead will try to read an extra 128k on on - // top of a 1MiB read, which we'd have to wait for a second request to service. Because - // FUSE doesn't know the difference between regular reads and readahead reads, it will - // send us a READ request for that 128k, so we'll have to block waiting for it even if - // the application doesn't want it. This is all in the noise for sequential IO, but - // waiting for the readahead hurts random IO. So we add 128k to the first request size - // to avoid the latency hit of the second request. - first_request_size: 1 * 1024 * 1024 + 128 * 1024, - max_request_size: 2 * 1024 * 1024 * 1024, + max_read_window_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), // We want these large enough to tolerate a single out-of-order Linux readahead, which @@ -147,11 +142,44 @@ impl Default for PrefetcherConfig { // making a guess about where the optimal cut-off point is before it would be faster to // just start a new request instead. max_forward_seek_wait_distance: 16 * 1024 * 1024, - max_backward_seek_distance: 1 * 1024 * 1024, + max_backward_seek_distance: 1024 * 1024, } } } +impl PrefetcherConfig { + /// Create a prefetcher config with default values + pub fn new() -> Self { + // Initialise the builder with default values + Self::default() + } + + pub fn max_read_window_size(mut self, max_read_window_size: usize) -> Self { + self.max_read_window_size = max_read_window_size; + self + } + + pub fn sequential_prefetch_multiplier(mut self, sequential_prefetch_multiplier: usize) -> Self { + self.sequential_prefetch_multiplier = sequential_prefetch_multiplier; + self + } + + pub fn read_timeout(mut self, read_timeout: Duration) -> Self { + self.read_timeout = read_timeout; + self + } + + pub fn max_forward_seek_wait_distance(mut self, max_forward_seek_wait_distance: u64) -> Self { + self.max_forward_seek_wait_distance = max_forward_seek_wait_distance; + self + } + + pub fn max_backward_seek_distance(mut self, max_backward_seek_distance: u64) -> Self { + self.max_backward_seek_distance = max_backward_seek_distance; + self + } +} + /// A [Prefetcher] creates and manages prefetching GetObject requests to objects. #[derive(Debug)] pub struct Prefetcher { @@ -206,11 +234,7 @@ pub struct PrefetchGetObject { client: Arc, part_stream: Arc, config: PrefetcherConfig, - // Invariant: the offset of the first byte in this task's part queue is always - // self.next_sequential_read_offset. - current_task: Option>, - // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: VecDeque>, + backpressure_task: Option>, // Invariant: the offset of the last byte in this window is always // self.next_sequential_read_offset - 1. backward_seek_window: SeekWindow, @@ -221,7 +245,6 @@ pub struct PrefetchGetObject { /// Start offset for sequential read, used for calculating contiguous read metric sequential_read_start_offset: u64, next_sequential_read_offset: u64, - next_request_size: usize, next_request_offset: u64, size: u64, } @@ -273,13 +296,11 @@ where client, part_stream, config, - current_task: None, - future_tasks: Default::default(), + backpressure_task: None, backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize), preferred_part_size: 128 * 1024, sequential_read_start_offset: 0, next_sequential_read_offset: 0, - next_request_size: config.first_request_size, next_request_offset: 0, bucket: bucket.to_owned(), object_id: ObjectId::new(key.to_owned(), etag), @@ -329,12 +350,13 @@ where } assert_eq!(self.next_sequential_read_offset, offset); - self.prepare_requests(); + if self.backpressure_task.is_none() { + self.backpressure_task = Some(self.spawn_read_backpressure_request()?); + } let mut response = ChecksummedBytes::default(); while to_read > 0 { - let Some(current_task) = self.current_task.as_mut() else { - // If [prepare_requests] didn't spawn a request, we've reached the end of the object. + let Some(current_task) = self.backpressure_task.as_mut() else { trace!(offset, length, "read beyond object size"); break; }; @@ -347,8 +369,6 @@ where .unwrap(); self.next_sequential_read_offset += part_bytes.len() as u64; - self.prepare_requests(); - // If we can complete the read with just a single buffer, early return to avoid copying // into a new buffer. This should be the common case as long as part size is larger than // read size, which it almost always is for real S3 clients and FUSE. @@ -364,75 +384,45 @@ where Ok(response) } - /// Runs on every read to prepare and spawn any requests our prefetching logic requires - fn prepare_requests(&mut self) { - let current_task = self.current_task.as_ref(); - if current_task.map(|task| task.remaining() == 0).unwrap_or(true) { - // There's no current task, or the current task is finished. Prepare the next request. - if let Some(next_task) = self.future_tasks.pop_front() { - self.current_task = Some(next_task); - return; - } - self.current_task = self.spawn_next_request(); - } else if current_task - .map(|task| { - // Don't trigger prefetch if we're in a fake task created by backward streaming - task.is_streaming() && task.remaining() <= task.total_size() / 2 - }) - .unwrap_or(false) - && self.future_tasks.is_empty() - { - // The current task is nearing completion, so pre-spawn the next request in anticipation - // of it completing. - if let Some(task) = self.spawn_next_request() { - self.future_tasks.push_back(task); - } - } - } + /// Spawn a backpressure GetObject request which has a range from current offset to the end of the file. + /// We will be using flow-control window to control how much data we want to download into the prefetcher. + fn spawn_read_backpressure_request( + &mut self, + ) -> Result, PrefetchReadError> { + let start = self.next_sequential_read_offset; + let object_size = self.size as usize; + let range = RequestRange::new(object_size, start, object_size); + + // The prefetcher now relies on backpressure mechanism so it must be enabled + let initial_read_window_size = match self.client.initial_read_window_size() { + Some(value) => value, + None => return Err(PrefetchReadError::BackpressurePreconditionFailed), + }; - /// Spawn the next required request - fn spawn_next_request(&mut self) -> Option> { - let start = self.next_request_offset; - if start >= self.size { - return None; + // Make sure that we don't get blocked from the beginning + if initial_read_window_size == 0 { + return Err(PrefetchReadError::BackpressurePreconditionFailed); } - let range = RequestRange::new(self.size as usize, start, self.next_request_size); - let task = self.part_stream.spawn_get_object_request( - &self.client, - &self.bucket, - self.object_id.key(), - self.object_id.etag().clone(), + let config = RequestTaskConfig { + bucket: self.bucket.clone(), + key: self.object_id.key().to_owned(), + if_match: self.object_id.etag().clone(), range, - self.preferred_part_size, - ); - - // [read] will reset these if the reader stops making sequential requests - self.next_request_offset += task.total_size() as u64; - self.next_request_size = self.get_next_request_size(task.total_size()); - - Some(task) - } - - /// Suggest next request size. - /// The next request size is the current request size multiplied by sequential prefetch multiplier. - fn get_next_request_size(&self, request_size: usize) -> usize { - // TODO: this logic doesn't work well right now in the case where part_size < - // first_request_size and sequential_prefetch_multiplier = 1. It ends up just repeatedly - // shrinking the request size until it reaches 1. But this isn't a configuration we - // currently expect to ever run in (part_size will always be >= 5MB for MPU reasons, and a - // prefetcher with multiplier 1 is not very good). - (request_size * self.config.sequential_prefetch_multiplier).min(self.config.max_request_size) + preferred_part_size: self.preferred_part_size, + initial_read_window_size, + max_read_window_size: self.config.max_read_window_size, + read_window_size_multiplier: self.config.sequential_prefetch_multiplier, + }; + Ok(self.part_stream.spawn_get_object_request(&self.client, config)) } /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { - self.current_task = None; - self.future_tasks.drain(..); + self.backpressure_task = None; self.backward_seek_window.clear(); self.sequential_read_start_offset = offset; self.next_sequential_read_offset = offset; - self.next_request_size = self.config.first_request_size; self.next_request_offset = offset; } @@ -445,7 +435,7 @@ where if offset > self.next_sequential_read_offset { self.try_seek_forward(offset).await } else { - self.try_seek_backward(offset) + self.try_seek_backward(offset).await } } @@ -454,43 +444,22 @@ where let total_seek_distance = offset - self.next_sequential_read_offset; histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64); - let Some(current_task) = self.current_task.as_mut() else { + let Some(task) = self.backpressure_task.as_mut() else { // Can't seek if there's no requests in flight at all return Ok(false); }; - // Jump ahead to the right request - if offset >= current_task.end_offset() { - self.next_sequential_read_offset = current_task.end_offset(); - self.current_task = None; - while let Some(next_request) = self.future_tasks.pop_front() { - if next_request.end_offset() > offset { - self.current_task = Some(next_request); - break; - } else { - self.next_sequential_read_offset = next_request.end_offset(); - } - } - if self.current_task.is_none() { - // No inflight task containing the target offset. - trace!(current_offset=?self.next_sequential_read_offset, requested_offset=?offset, "seek failed: not enough inflight data"); - return Ok(false); - } - // We could try harder to preserve the backwards seek buffer if we're near the - // request boundary, but it's probably not worth the trouble. - self.backward_seek_window.clear(); + // Not enough data in the read window to serve the forward seek + if offset >= task.read_window_range() { + return Ok(false); } - // At this point it's guaranteed by the previous if-block that `offset` is in the range of `self.current_task` - let current_task = self - .current_task - .as_mut() - .expect("a request existed that covered this seek offset"); // If we have enough bytes already downloaded (`available`) to skip straight to this read, then do // it. Otherwise, we're willing to wait for the bytes to download only if they're coming "soon", where // soon is defined as up to `max_forward_seek_wait_distance` bytes ahead of the available offset. - let available_offset = current_task.available_offset(); - if offset >= available_offset.saturating_add(self.config.max_forward_seek_wait_distance) { + let available_offset = task.available_offset(); + let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance); + if offset >= available_soon_offset { trace!( requested_offset = offset, available_offset = available_offset, @@ -500,7 +469,7 @@ where } let mut seek_distance = offset - self.next_sequential_read_offset; while seek_distance > 0 { - let part = current_task.read(seek_distance as usize).await?; + let part = task.read(seek_distance as usize).await?; seek_distance -= part.len() as u64; self.next_sequential_read_offset += part.len() as u64; self.backward_seek_window.push(part); @@ -508,7 +477,7 @@ where Ok(true) } - fn try_seek_backward(&mut self, offset: u64) -> Result> { + async fn try_seek_backward(&mut self, offset: u64) -> Result> { assert!(offset < self.next_sequential_read_offset); let backwards_length_needed = self.next_sequential_read_offset - offset; histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64); @@ -517,16 +486,13 @@ where trace!("seek failed: not enough data in backwards seek window"); return Ok(false); }; - // We're going to create a new fake "request" that contains the parts we read out of the - // window. That sounds a bit hacky, but it keeps all the read logic simple rather than - // needing separate paths for backwards seeks vs others. - let request = RequestTask::from_parts(parts, offset); - if let Some(current_task) = self.current_task.take() { - self.future_tasks.push_front(current_task); + if let Some(task) = self.backpressure_task.as_mut() { + task.push_front(parts).await?; + self.next_sequential_read_offset = offset; + Ok(true) + } else { + Ok(false) } - self.current_task = Some(request); - self.next_sequential_read_offset = offset; - Ok(true) } } @@ -552,12 +518,11 @@ mod tests { #![allow(clippy::identity_op)] use crate::data_cache::InMemoryDataCache; - use crate::prefetch::part_stream::ClientPartStream; use super::caching_stream::CachingPartStream; use super::*; use futures::executor::{block_on, ThreadPool}; - use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; + use mountpoint_s3_client::error::GetObjectError; use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap}; use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; use proptest::proptest; @@ -571,9 +536,9 @@ mod tests { #[derive(Debug, Arbitrary)] struct TestConfig { #[proptest(strategy = "16usize..1*1024*1024")] - first_request_size: usize, + initial_read_window_size: usize, #[proptest(strategy = "16usize..1*1024*1024")] - max_request_size: usize, + max_read_window_size: usize, #[proptest(strategy = "1usize..8usize")] sequential_prefetch_multiplier: usize, #[proptest(strategy = "16usize..2*1024*1024")] @@ -582,6 +547,8 @@ mod tests { max_forward_seek_wait_distance: u64, #[proptest(strategy = "1u64..4*1024*1024")] max_backward_seek_distance: u64, + #[proptest(strategy = "16usize..1*1024*1024")] + cache_block_size: usize, } fn default_stream() -> ClientPartStream { @@ -604,6 +571,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -612,14 +581,12 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, - sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, - read_timeout: Duration::from_secs(5), - max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance, - max_backward_seek_distance: test_config.max_backward_seek_distance, - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(test_config.max_read_window_size) + .sequential_prefetch_multiplier(test_config.sequential_prefetch_multiplier) + .read_timeout(Duration::from_secs(5)) + .max_forward_seek_wait_distance(test_config.max_forward_seek_wait_distance) + .max_backward_seek_distance(test_config.max_backward_seek_distance); let prefetcher = Prefetcher::new(part_stream, prefetcher_config); let mut request = prefetcher.prefetch(client, "test-bucket", "hello", size, etag); @@ -645,12 +612,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 1024 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config); } @@ -662,12 +630,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 64 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -679,17 +648,96 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 64 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 256 * 1024 * 1024 + 111, 1024 * 1024, config); } + fn fail_with_backpressure_precondition_test( + part_stream: Stream, + test_config: TestConfig, + client_config: MockClientConfig, + ) where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let client = MockClient::new(client_config); + let read_size = 1 * MB; + let object_size = 8 * MB; + let object = MockObject::ramp(0xaa, object_size, ETag::for_tests()); + let etag = object.etag(); + + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(test_config.max_read_window_size) + .sequential_prefetch_multiplier(test_config.sequential_prefetch_multiplier); + + let prefetcher = Prefetcher::new(part_stream, prefetcher_config); + let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", object_size as u64, etag); + let result = block_on(request.read(0, read_size)); + assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed))); + } + + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn fail_with_backpressure_not_enabled(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let test_config = TestConfig { + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, + sequential_prefetch_multiplier: 8, + client_part_size: 8 * 1024 * 1024, + max_forward_seek_wait_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, + }; + + // backpressure is not enabled for the client + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: test_config.client_part_size, + enable_backpressure: false, + ..Default::default() + }; + + fail_with_backpressure_precondition_test(part_stream, test_config, config); + } + + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn fail_with_backpressure_zero_read_window(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let test_config = TestConfig { + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, + sequential_prefetch_multiplier: 8, + client_part_size: 8 * 1024 * 1024, + max_forward_seek_wait_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, + }; + + // backpressure is enabled but initial read window size is zero + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: 0, + ..Default::default() + }; + + fail_with_backpressure_precondition_test(part_stream, test_config, config); + } + fn fail_sequential_read_test( part_stream: Stream, size: u64, @@ -700,6 +748,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = MockClient::new(config); @@ -710,12 +760,9 @@ mod tests { let client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new()); - let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, - sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(test_config.max_read_window_size) + .sequential_prefetch_multiplier(test_config.sequential_prefetch_multiplier); let prefetcher = Prefetcher::new(part_stream, prefetcher_config); let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", size, etag); @@ -748,23 +795,25 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 1024 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; let mut get_failures = HashMap::new(); - get_failures.insert( - 2, - Err(ObjectClientError::ClientError(MockClientError( - err_value.to_owned().into(), - ))), - ); + // We only have one request with backpressure, so we are going to inject the failure at + // 2nd read from that request stream. + get_failures.insert(1, Ok((2, MockClientError(err_value.to_owned().into())))); + + // Object needs to be bigger than a part size in order to trigger the failure + // because the CRT data returns in chunks of part size. + let object_size = config.client_part_size + 111; - fail_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config, get_failures); + fail_sequential_read_test(part_stream, object_size as u64, 1024 * 1024, config, get_failures); } proptest! { @@ -788,18 +837,17 @@ mod tests { fn proptest_sequential_read_with_cache( size in 1u64..1 * 1024 * 1024, read_size in 1usize..1 * 1024 * 1024, - block_size in 16usize..1 * 1024 * 1024, config: TestConfig, ) { - run_sequential_read_test(caching_stream(block_size), size, read_size, config); + run_sequential_read_test(caching_stream(config.cache_block_size), size, read_size, config); } #[test] fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, - block_size in 16usize..1 * 1024 * 1024, config: TestConfig) { + config: TestConfig) { // Pick read size smaller than the object size let read_size = (size as usize / read_factor).max(1); - run_sequential_read_test(caching_stream(block_size), size, read_size, config); + run_sequential_read_test(caching_stream(config.cache_block_size), size, read_size, config); } } @@ -808,12 +856,13 @@ mod tests { let object_size = 854966; let read_size = 161647; let config = TestConfig { - first_request_size: 484941, - max_request_size: 81509, + initial_read_window_size: 484941, + max_read_window_size: 81509, sequential_prefetch_multiplier: 1, client_part_size: 181682, max_forward_seek_wait_distance: 1, max_backward_seek_distance: 18668, + cache_block_size: 1 * MB, }; run_sequential_read_test(default_stream(), object_size, read_size, config); } @@ -827,6 +876,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -835,14 +886,11 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, - sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, - max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance, - max_backward_seek_distance: test_config.max_backward_seek_distance, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(test_config.max_read_window_size) + .sequential_prefetch_multiplier(test_config.sequential_prefetch_multiplier) + .max_forward_seek_wait_distance(test_config.max_forward_seek_wait_distance) + .max_backward_seek_distance(test_config.max_backward_seek_distance); let prefetcher = Prefetcher::new(part_stream, prefetcher_config); let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, etag); @@ -895,11 +943,10 @@ mod tests { #[test] fn proptest_random_read_with_cache( reads in random_read_strategy(1 * 1024 * 1024), - block_size in 16usize..1 * 1024 * 1024, config: TestConfig, ) { let (object_size, reads) = reads; - run_random_read_test(caching_stream(block_size), object_size, reads, config); + run_random_read_test(caching_stream(config.cache_block_size), object_size, reads, config); } } @@ -908,12 +955,13 @@ mod tests { let object_size = 724314; let reads = vec![(0, 516883)]; let config = TestConfig { - first_request_size: 3684779, - max_request_size: 2147621, + initial_read_window_size: 3684779, + max_read_window_size: 2147621, sequential_prefetch_multiplier: 4, client_part_size: 516882, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -923,12 +971,13 @@ mod tests { let object_size = 755678; let reads = vec![(0, 278499), (311250, 1)]; let config = TestConfig { - first_request_size: 556997, - max_request_size: 105938, + initial_read_window_size: 556997, + max_read_window_size: 105938, sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -938,12 +987,13 @@ mod tests { let object_size = 755678; let reads = vec![(0, 236766), (291204, 1), (280930, 36002)]; let config = TestConfig { - first_request_size: 556997, - max_request_size: 105938, + initial_read_window_size: 556997, + max_read_window_size: 105938, sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_wait_distance: 2260662, max_backward_seek_distance: 2369799, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -953,12 +1003,13 @@ mod tests { let object_size = 14201; let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)]; let config = TestConfig { - first_request_size: 457999, - max_request_size: 863511, + initial_read_window_size: 457999, + max_read_window_size: 863511, sequential_prefetch_multiplier: 5, client_part_size: 1972409, max_forward_seek_wait_distance: 2810651, max_backward_seek_distance: 3531090, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -971,6 +1022,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: PART_SIZE, + enable_backpressure: true, + initial_read_window_size: OBJECT_SIZE, ..Default::default() }; @@ -1003,10 +1056,7 @@ mod tests { )); // For simplicity, prefetch the whole object in one request. - let prefetcher_config = PrefetcherConfig { - first_request_size: OBJECT_SIZE, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new(); let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); block_on(async { let mut request = prefetcher.prefetch(client, "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); @@ -1045,6 +1095,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size: FIRST_REQUEST_SIZE, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1053,10 +1105,7 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: FIRST_REQUEST_SIZE, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new(); let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); @@ -1079,11 +1128,12 @@ mod tests { #[test_case(125, 110; "read in second request")] fn test_backward_seek(first_read_size: usize, part_size: usize) { const OBJECT_SIZE: usize = 200; - const FIRST_REQUEST_SIZE: usize = 100; let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size: part_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1092,10 +1142,7 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: FIRST_REQUEST_SIZE, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new(); let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); // Try every possible seek from first_read_size @@ -1131,16 +1178,18 @@ mod tests { fn sequential_read_stress_helper() { let mut rng = shuttle::rand::thread_rng(); let object_size = rng.gen_range(1u64..1 * 1024 * 1024); - let first_request_size = rng.gen_range(16usize..1 * 1024 * 1024); - let max_request_size = rng.gen_range(16usize..1 * 1024 * 1024); + let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024); let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); + let initial_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1149,14 +1198,11 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size, - max_request_size, - sequential_prefetch_multiplier, - max_forward_seek_wait_distance, - max_backward_seek_distance, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(max_read_window_size) + .sequential_prefetch_multiplier(sequential_prefetch_multiplier) + .max_forward_seek_wait_distance(max_forward_seek_wait_distance) + .max_backward_seek_distance(max_backward_seek_distance); let prefetcher = Prefetcher::new(ClientPartStream::new(ShuttleRuntime), prefetcher_config); let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, file_etag); @@ -1184,20 +1230,22 @@ mod tests { fn random_read_stress_helper() { let mut rng = shuttle::rand::thread_rng(); - let first_request_size = rng.gen_range(16usize..32 * 1024); - let max_request_size = rng.gen_range(16usize..32 * 1024); - // Try to prevent testing very small reads of very large objects, which are easy to OOM - // under Shuttle (lots of concurrent tasks) - let max_object_size = first_request_size.min(max_request_size) * 20; - let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); + let max_read_window_size = rng.gen_range(16usize..32 * 1024); let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..128 * 1024); + let initial_read_window_size = rng.gen_range(16usize..128 * 1024); let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024); let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024); + // Try to prevent testing very small reads of very large objects, which are easy to OOM + // under Shuttle (lots of concurrent tasks) + let max_object_size = initial_read_window_size.min(max_read_window_size) * 20; + let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1206,14 +1254,11 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size, - max_request_size, - sequential_prefetch_multiplier, - max_forward_seek_wait_distance, - max_backward_seek_distance, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(max_read_window_size) + .sequential_prefetch_multiplier(sequential_prefetch_multiplier) + .max_forward_seek_wait_distance(max_forward_seek_wait_distance) + .max_backward_seek_distance(max_backward_seek_distance); let prefetcher = Prefetcher::new(ClientPartStream::new(ShuttleRuntime), prefetcher_config); let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, file_etag); diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs new file mode 100644 index 000000000..6183e064e --- /dev/null +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -0,0 +1,151 @@ +use async_channel::{unbounded, Receiver, Sender}; +use tracing::trace; + +use super::PrefetchReadError; + +#[derive(Debug)] +pub struct BackpressureController { + read_window_updater: Sender, + preferred_read_window_size: usize, + max_read_window_size: usize, + read_window_size_multiplier: usize, + read_window_range: u64, + last_request_offset: u64, +} + +#[derive(Debug)] +pub struct BackpressureLimiter { + read_window_incrementing_queue: Receiver, + read_window_range: u64, +} + +/// Creates a [BackpressureController] and its related [BackpressureLimiter]. +/// We use a pair of these to for providing feedback to backpressure stream. +/// +/// [BackpressureLimiter] is used on producer side of the object stream, that is, any +/// [super::part_stream::ObjectPartStream] that support backpressure. The producer can call +/// `wait_for_read_window_increment` to wait for feedback from the consumer. This method +/// could block when they know that the producer requires read window incrementing. +/// +/// [BackpressureController] will be given to the consumer side of the object stream. +/// It can be used anywhere to set preferred read window size for the stream and tell the +/// producer when its read window should be increased. +pub fn new_backpressure_controller( + preferred_read_window_size: usize, + max_read_window_size: usize, + read_window_size_multiplier: usize, + read_window_range: u64, + last_request_offset: u64, +) -> (BackpressureController, BackpressureLimiter) { + let (read_window_updater, read_window_incrementing_queue) = unbounded(); + let controller = BackpressureController { + read_window_updater, + preferred_read_window_size, + max_read_window_size, + read_window_size_multiplier, + read_window_range, + last_request_offset, + }; + let limiter = BackpressureLimiter { + read_window_incrementing_queue, + read_window_range, + }; + (controller, limiter) +} + +impl BackpressureController { + pub fn set_preferred_read_window_size(&mut self, preferred_read_window_size: usize) { + self.preferred_read_window_size = preferred_read_window_size; + } + + pub fn read_window_range(&self) -> u64 { + self.read_window_range + } + + // Try scaling up preferred read window size with a multiplier configured at initialization. + pub fn try_scaling_up(&mut self) { + if self.preferred_read_window_size < self.max_read_window_size { + let new_read_window_size = + (self.preferred_read_window_size * self.read_window_size_multiplier).min(self.max_read_window_size); + trace!( + current_size = self.preferred_read_window_size, + new_size = new_read_window_size, + "scaling up preferred read window" + ); + self.preferred_read_window_size = new_read_window_size; + } + } + + /// Signal the stream producer about the next offset we might want to read from the stream. The backpressure controller + /// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`. + pub async fn next_offset_hint(&mut self, next_offset: u64) -> Result<(), PrefetchReadError> { + let mut remaining_window = self.read_window_range.saturating_sub(next_offset) as usize; + // Increment the read window only if the remaining window reaches some threshold i.e. half of it left. + while remaining_window < (self.preferred_read_window_size / 2) + && self.read_window_range < self.last_request_offset + { + let to_increase = next_offset + .saturating_add(self.preferred_read_window_size as u64) + .saturating_sub(self.read_window_range) as usize; + trace!( + next_offset, + read_window_range = self.read_window_range, + preferred_read_window_size = self.preferred_read_window_size, + to_increase, + "incrementing read window" + ); + self.increment_read_window(to_increase).await?; + self.read_window_range += to_increase as u64; + remaining_window = self.read_window_range.saturating_sub(next_offset) as usize; + } + Ok(()) + } + + // Send an increment read window request to the stream producer + async fn increment_read_window(&self, len: usize) -> Result<(), PrefetchReadError> { + // This should not block since the channel is unbounded + self.read_window_updater + .send(len) + .await + .map_err(|_| PrefetchReadError::ReadWindowIncrement) + } +} + +impl BackpressureLimiter { + pub fn read_window_range(&self) -> u64 { + self.read_window_range + } + + /// Checks if there is enough read window to put the next item with a given offset to the stream. + /// It blocks until receiving enough incrementing read window requests to serve the next part. + /// + /// Returns the new read window offset. + pub async fn wait_for_read_window_increment( + &mut self, + offset: u64, + ) -> Result, PrefetchReadError> { + if self.read_window_range > offset { + if let Ok(len) = self.read_window_incrementing_queue.try_recv() { + self.read_window_range += len as u64; + Ok(Some(self.read_window_range)) + } else { + Ok(None) + } + } else { + // Block until we have enough read window to read the next chunk + while self.read_window_range <= offset { + trace!( + offset, + read_window_range = self.read_window_range, + "blocking for read window increment" + ); + let recv = self.read_window_incrementing_queue.recv().await; + match recv { + Ok(len) => self.read_window_range += len as u64, + Err(_) => return Err(PrefetchReadError::ReadWindowIncrement), + } + } + Ok(Some(self.read_window_range)) + } + } +} diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index ff5a9e5e4..7937290dd 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -4,18 +4,22 @@ use std::{ops::Range, sync::Arc}; use bytes::Bytes; use futures::task::{Spawn, SpawnExt}; use futures::{pin_mut, StreamExt}; -use mountpoint_s3_client::{types::ETag, ObjectClient}; +use mountpoint_s3_client::{types::ETag, types::GetObjectRequest, ObjectClient}; use tracing::{debug_span, trace, warn, Instrument}; use crate::checksums::ChecksummedBytes; use crate::data_cache::{BlockIndex, DataCache}; use crate::object::ObjectId; +use crate::prefetch::backpressure_controller::new_backpressure_controller; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer}; use crate::prefetch::part_stream::{ObjectPartStream, RequestRange}; use crate::prefetch::task::RequestTask; use crate::prefetch::PrefetchReadError; +use super::backpressure_controller::BackpressureLimiter; +use super::task::RequestTaskConfig; + /// [ObjectPartStream] implementation which maintains a [DataCache] for the object data /// retrieved by an [ObjectClient]. #[derive(Debug)] @@ -41,39 +45,41 @@ where fn spawn_get_object_request( &self, client: &Client, - bucket: &str, - key: &str, - if_match: ETag, - range: RequestRange, - _preferred_part_size: usize, + config: RequestTaskConfig, ) -> RequestTask<::ClientError> where Client: ObjectClient + Clone + Send + Sync + 'static, { - let range = range.align(self.cache.block_size(), false); - - let start = range.start(); - let size = range.len(); + let initial_read_window_range = config.range.start() + config.initial_read_window_size as u64; let (part_queue, part_queue_producer) = unbounded_part_queue(); - trace!(?range, "spawning request"); + let (backpressure_controller, backpressure_limiter) = new_backpressure_controller( + config.initial_read_window_size, + config.max_read_window_size, + config.read_window_size_multiplier, + initial_read_window_range, + config.range.end(), + ); + trace!(range=?config.range, "spawning request"); let request_task = { let request = CachingRequest::new( client.clone(), self.cache.clone(), - bucket.to_owned(), - key.to_owned(), - if_match, + config.bucket, + config.key, + config.if_match, part_queue_producer, ); - let span = debug_span!("prefetch", ?range); - request.get_from_cache(range).instrument(span) + let span = debug_span!("prefetch", range=?config.range); + request + .get_from_cache(config.range, initial_read_window_range, backpressure_limiter) + .instrument(span) }; let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); - RequestTask::from_handle(task_handle, size, start, part_queue) + RequestTask::from_handle(task_handle, config.range, part_queue, backpressure_controller) } } @@ -109,7 +115,12 @@ where } } - async fn get_from_cache(self, range: RequestRange) { + async fn get_from_cache( + mut self, + range: RequestRange, + initial_read_window_range: u64, + mut backpressure_limiter: BackpressureLimiter, + ) { let cache_key = &self.cache_key; let block_size = self.cache.block_size(); let block_range = self.block_indices_for_byte_range(&range); @@ -120,13 +131,20 @@ where // request, but since this stream is already behind the prefetcher, the delay is // already likely negligible. let mut block_offset = block_range.start * block_size; + for block_index in block_range.clone() { match self.cache.get_block(cache_key, block_index, block_offset) { Ok(Some(block)) => { trace!(?cache_key, ?range, block_index, "cache hit"); + let part = self.make_part(block, block_index, block_offset, &range); self.part_queue_producer.push(Ok(part)); block_offset += block_size; + + if let Err(e) = backpressure_limiter.wait_for_read_window_increment(block_offset).await { + self.part_queue_producer.push(Err(e)); + break; + } continue; } Ok(None) => trace!(?cache_key, block_index, ?range, "cache miss - no data for block"), @@ -142,14 +160,25 @@ where metrics::counter!("prefetch.blocks_served_from_cache").increment(block_index - block_range.start); metrics::counter!("prefetch.blocks_requested_to_client").increment(block_range.end - block_index); return self - .get_from_client(range.trim_start(block_offset), block_index..block_range.end) + .get_from_client( + range.trim_start(block_offset), + block_index..block_range.end, + initial_read_window_range, + backpressure_limiter, + ) .await; } // We served the whole range from cache. metrics::counter!("prefetch.blocks_served_from_cache").increment(block_range.end - block_range.start); } - async fn get_from_client(&self, range: RequestRange, block_range: Range) { + async fn get_from_client( + &mut self, + range: RequestRange, + block_range: Range, + initial_read_window_range: u64, + mut backpressure_limiter: BackpressureLimiter, + ) { let key = self.cache_key.key(); let block_size = self.cache.block_size(); assert!(block_size > 0); @@ -157,6 +186,9 @@ where // Always request a range aligned with block boundaries (or to the end of the object). let block_aligned_byte_range = (block_range.start * block_size)..(block_range.end * block_size).min(range.object_size() as u64); + let request_len = (block_aligned_byte_range.end - block_aligned_byte_range.start) as usize; + let block_aligned_byte_range = + RequestRange::new(range.object_size(), block_aligned_byte_range.start, request_len); trace!( ?key, @@ -164,97 +196,259 @@ where original_range =? range, "fetching data from client" ); - let get_object_result = match self - .client - .get_object( - &self.bucket, - key, - Some(block_aligned_byte_range), - Some(self.cache_key.etag().clone()), - ) - .await - { - Ok(get_object_result) => get_object_result, - Err(e) => { - warn!(key, error=?e, "GetObject request failed"); - self.part_queue_producer - .push(Err(PrefetchReadError::GetRequestFailed(e))); - return; - } - }; - pin_mut!(get_object_result); let mut block_index = block_range.start; let mut block_offset = block_range.start * block_size; let mut buffer = ChecksummedBytes::default(); - loop { - assert!( - buffer.len() < block_size as usize, - "buffer should be flushed when we get a full block" - ); - match get_object_result.next().await { - Some(Ok((offset, body))) => { - trace!(offset, length = body.len(), "received GetObject part"); - metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); - - let expected_offset = block_offset + buffer.len() as u64; - if offset != expected_offset { - warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); - self.part_queue_producer - .push(Err(PrefetchReadError::GetRequestReturnedWrongOffset { - offset, - expected_offset, - })); - break; - } - // Split the body into blocks. - let mut body: Bytes = body.into(); - while !body.is_empty() { - let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); - let chunk = body.split_to(remaining); - if let Err(e) = buffer.extend(chunk.into()) { - warn!(key, error=?e, "integrity check for body part failed"); - self.part_queue_producer.push(Err(e.into())); - return; - } - if buffer.len() < block_size as usize { + // Start by issuing the first request that has a range up to initial read window range. + // This is an optimization to lower time to first bytes, see more details in ClientPartStream about why this is needed. + let first_req_range = block_aligned_byte_range.trim_end(initial_read_window_range); + if !first_req_range.is_empty() { + let first_request = match self + .client + .get_object( + &self.bucket, + key, + Some(first_req_range.into()), + Some(self.cache_key.etag().clone()), + ) + .await + { + Ok(get_object_result) => get_object_result, + Err(e) => { + warn!(key, error=?e, "GetObject request failed"); + self.part_queue_producer + .push(Err(PrefetchReadError::GetRequestFailed(e))); + return; + } + }; + + pin_mut!(first_request); + // We want the first request to complete without any blocking + first_request.as_mut().increment_read_window(first_req_range.len()); + loop { + assert!( + buffer.len() < block_size as usize, + "buffer should be flushed when we get a full block" + ); + match first_request.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); + + let expected_offset = block_offset + buffer.len() as u64; + if offset != expected_offset { + warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); + self.part_queue_producer + .push(Err(PrefetchReadError::GetRequestReturnedWrongOffset { + offset, + expected_offset, + })); break; } - // We have a full block: write it to the cache, send it to the queue, and flush the buffer. - self.update_cache(block_index, block_offset, &buffer); + let mut body: Bytes = body.into(); + + // Return some bytes to the part queue even before we can fill an entire caching block because we want to + // start the feedback loop for the flow-control window. + // We need to do this because the read window might be enough to fetch "some data" from S3 but not the entire block. + // For example, consider that we got a file system read request with range 2MB to 4MB and we have to start + // reading from block_offset=0 and block_size=5MB. The first read window might have a range up 4MB which is enough + // to serve the read request but if the prefetcher is not able to read anything it cannot tell this stream to + // move its read window. + let part_range = range.trim_start(offset).trim_end(offset + body.len() as u64); + let trim_start = (part_range.start().saturating_sub(offset)) as usize; + let trim_end = (part_range.end().saturating_sub(offset)) as usize; + // Put to the part queue only if returned data is in the actual request range. + if trim_end > trim_start { + let checksum_bytes = ChecksummedBytes::new(body.clone()); + let bytes = checksum_bytes.slice(trim_start..trim_end); + + let part = Part::new(self.cache_key.clone(), part_range.start(), bytes); + self.part_queue_producer.push(Ok(part)); + } + + // Now we can fill the caching blocks. + while !body.is_empty() { + let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); + let chunk = body.split_to(remaining); + if let Err(e) = buffer.extend(chunk.into()) { + warn!(key, error=?e, "integrity check for body part failed"); + self.part_queue_producer.push(Err(e.into())); + return; + } + if buffer.len() < block_size as usize { + break; + } + + // We have a full block: write it to the cache, send it to the queue, and flush the buffer. + self.update_cache(block_index, block_offset, &buffer); + block_index += 1; + block_offset += block_size; + buffer = ChecksummedBytes::default(); + } + } + Some(Err(e)) => { + warn!(key, error=?e, "GetObject body part failed"); self.part_queue_producer - .push(Ok(self.make_part(buffer, block_index, block_offset, &range))); - block_index += 1; - block_offset += block_size; - buffer = ChecksummedBytes::default(); + .push(Err(PrefetchReadError::GetRequestFailed(e))); + break; } + None => break, } - Some(Err(e)) => { - warn!(key, error=?e, "GetObject body part failed"); + } + } + + // After the first request is completed we will create the second request for the rest of the stream, + // but only if there is something left to be fetched. + let range = block_aligned_byte_range.trim_start(initial_read_window_range); + if !range.is_empty() { + // Blocks until there is enough read window and record the new read window offset before creating the request + let next_read_window_offset = match backpressure_limiter.wait_for_read_window_increment(range.start()).await + { + Ok(next) => next, + Err(e) => { + self.part_queue_producer.push(Err(e)); + return; + } + }; + + let get_object_request = match self + .client + .get_object( + &self.bucket, + key, + Some(range.into()), + Some(self.cache_key.etag().clone()), + ) + .await + { + Ok(get_object_request) => get_object_request, + Err(e) => { + warn!(key, error=?e, "GetObject request failed"); self.part_queue_producer .push(Err(PrefetchReadError::GetRequestFailed(e))); - break; + return; } - None => { - if !buffer.is_empty() { - // If we still have data in the buffer, this must be the last block for this object, - // which can be smaller than block_size (and ends at the end of the object). - assert_eq!( - block_offset as usize + buffer.len(), - range.object_size(), - "a partial block is only allowed at the end of the object" - ); - // Write the last block to the cache. - self.update_cache(block_index, block_offset, &buffer); + }; + + pin_mut!(get_object_request); + + // Increment read window if we got a request from the backpressure limiter earlier + if let Some(next_read_window_offset) = next_read_window_offset { + let diff = + next_read_window_offset.saturating_sub(get_object_request.as_ref().read_window_range()) as usize; + get_object_request.as_mut().increment_read_window(diff); + } + + // Current read window might be larger than the client's read window as some bytes were served from the cache, so we need to match them. + // for example, current read window of the caching stream might be 100MB while the first read window of the client is 8MB. + let read_window_size_diff = backpressure_limiter + .read_window_range() + .saturating_sub(get_object_request.as_ref().read_window_range()) + as usize; + get_object_request.as_mut().increment_read_window(read_window_size_diff); + + loop { + assert!( + buffer.len() < block_size as usize, + "buffer should be flushed when we get a full block" + ); + match get_object_request.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); + + let expected_offset = block_offset + buffer.len() as u64; + if offset != expected_offset { + warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); + self.part_queue_producer + .push(Err(PrefetchReadError::GetRequestReturnedWrongOffset { + offset, + expected_offset, + })); + break; + } + + let mut body: Bytes = body.into(); + + // Same with what we do for the first request, return some bytes to the part queue even before we can fill + // an entire caching block because we want to start the feedback loop for the flow-control window. + let part_range = range.trim_start(offset).trim_end(offset + body.len() as u64); + let trim_start = (part_range.start().saturating_sub(offset)) as usize; + let trim_end = (part_range.end().saturating_sub(offset)) as usize; + // Put to the part queue only if returned data is in the actual request range. + if trim_end > trim_start { + let checksum_bytes = ChecksummedBytes::new(body.clone()); + let bytes = checksum_bytes.slice(trim_start..trim_end); + + let part = Part::new(self.cache_key.clone(), part_range.start(), bytes); + self.part_queue_producer.push(Ok(part)); + } + + // Now we can fill the caching blocks. + while !body.is_empty() { + let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); + let chunk = body.split_to(remaining); + if let Err(e) = buffer.extend(chunk.into()) { + warn!(key, error=?e, "integrity check for body part failed"); + self.part_queue_producer.push(Err(e.into())); + return; + } + if buffer.len() < block_size as usize { + break; + } + + // We have a full block: write it to the cache, send it to the queue, and flush the buffer. + self.update_cache(block_index, block_offset, &buffer); + block_index += 1; + block_offset += block_size; + buffer = ChecksummedBytes::default(); + } + + let next_offset = block_offset + buffer.len() as u64; + match backpressure_limiter.wait_for_read_window_increment(next_offset).await { + Ok(next) => { + if let Some(next_read_window_offset) = next { + let diff = next_read_window_offset + .saturating_sub(get_object_request.as_ref().read_window_range()) + as usize; + get_object_request.as_mut().increment_read_window(diff); + } + } + Err(e) => { + self.part_queue_producer.push(Err(e)); + break; + } + } + } + Some(Err(e)) => { + warn!(key, error=?e, "GetObject body part failed"); self.part_queue_producer - .push(Ok(self.make_part(buffer, block_index, block_offset, &range))); + .push(Err(PrefetchReadError::GetRequestFailed(e))); + break; + } + None => { + break; } - break; } } } + + if !buffer.is_empty() { + // If we still have data in the buffer, this must be the last block for this object, + // which can be smaller than block_size (and ends at the end of the object). + assert_eq!( + block_offset as usize + buffer.len(), + range.object_size(), + "a partial block is only allowed at the end of the object" + ); + // Write the last block to the cache. + self.update_cache(block_index, block_offset, &buffer); + self.part_queue_producer + .push(Ok(self.make_part(buffer, block_index, block_offset, &range))); + } trace!("request finished"); } @@ -350,11 +544,18 @@ mod tests { let etag = object.etag(); let id = ObjectId::new(key.to_owned(), object.etag()); + // backpressure config + let initial_read_window_size = 1 * MB; + let max_read_window_size = 64 * MB; + let read_window_size_multiplier = 2; + let cache = InMemoryDataCache::new(block_size as u64); let bucket = "test-bucket"; let config = MockClientConfig { bucket: bucket.to_string(), part_size: client_part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); @@ -367,7 +568,17 @@ mod tests { let first_read_count = { // First request (from client) let get_object_counter = mock_client.new_counter(Operation::GetObject); - let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + let config = RequestTaskConfig { + bucket: bucket.to_owned(), + key: key.to_owned(), + if_match: etag.clone(), + range, + preferred_part_size: 0, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, + }; + let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -376,7 +587,17 @@ mod tests { let second_read_count = { // Second request (from cache) let get_object_counter = mock_client.new_counter(Operation::GetObject); - let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + let config = RequestTaskConfig { + bucket: bucket.to_owned(), + key: key.to_owned(), + if_match: etag, + range, + preferred_part_size: 0, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, + }; + let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -395,11 +616,18 @@ mod tests { let etag = object.etag(); let id = ObjectId::new(key.to_owned(), object.etag()); + // backpressure config + let initial_read_window_size = 1 * MB; + let max_read_window_size = 64 * MB; + let read_window_size_multiplier = 2; + let cache = InMemoryDataCache::new(block_size as u64); let bucket = "test-bucket"; let config = MockClientConfig { bucket: bucket.to_string(), part_size: client_part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); @@ -410,8 +638,17 @@ mod tests { for offset in [0, 512 * KB, 1 * MB, 4 * MB, 9 * MB] { for preferred_size in [1 * KB, 512 * KB, 4 * MB, 12 * MB, 16 * MB] { - let range = RequestRange::new(object_size, offset as u64, preferred_size); - let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + let config = RequestTaskConfig { + bucket: bucket.to_owned(), + key: key.to_owned(), + if_match: etag.clone(), + range: RequestRange::new(object_size, offset as u64, preferred_size), + preferred_part_size: 0, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, + }; + let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); } } diff --git a/mountpoint-s3/src/prefetch/part.rs b/mountpoint-s3/src/prefetch/part.rs index 8542ed173..45e09375b 100644 --- a/mountpoint-s3/src/prefetch/part.rs +++ b/mountpoint-s3/src/prefetch/part.rs @@ -21,6 +21,17 @@ impl Part { } } + pub fn extend(&mut self, other: &Part) -> Result<(), PartMismatchError> { + let expected_offset = self.offset + self.checksummed_bytes.len() as u64; + other.check(&self.id, expected_offset)?; + self.checksummed_bytes + .extend(other.clone().checksummed_bytes) + .map_err(|_| PartMismatchError::Id { + actual: self.id.clone(), + requested: other.id.clone(), + }) + } + pub fn into_bytes(self, id: &ObjectId, offset: u64) -> Result { self.check(id, offset).map(|_| self.checksummed_bytes) } @@ -38,6 +49,10 @@ impl Part { } } + pub(super) fn offset(&self) -> u64 { + self.offset + } + pub(super) fn len(&self) -> usize { self.checksummed_bytes.len() } @@ -71,3 +86,107 @@ pub enum PartMismatchError { #[error("wrong part offset: actual={actual}, requested={requested}")] Offset { actual: u64, requested: u64 }, } + +#[cfg(test)] +mod tests { + use mountpoint_s3_client::types::ETag; + + use crate::{checksums::ChecksummedBytes, object::ObjectId, prefetch::part::PartMismatchError}; + + use super::Part; + + #[test] + fn test_append() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_part_len = 512; + let second_offset = first_offset + first_part_len as u64; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(object_id.clone(), second_offset, checksummed_bytes); + + first.extend(&second).expect("should be able to extend"); + assert_eq!(first_part_len + second_part_len, first.len()); + first.check(&object_id, first_offset).expect("the part should be valid"); + } + + #[test] + fn test_append_with_mismatch_object_id() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_object_id = ObjectId::new("other".to_owned(), ETag::for_tests()); + let second_part_len = 512; + let second_offset = first_offset; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(second_object_id.clone(), second_offset, checksummed_bytes); + + let result = first.extend(&second); + assert!(matches!( + result, + Err(PartMismatchError::Id { + actual: _, + requested: _ + }) + )); + } + + #[test] + fn test_append_with_mismatch_offset() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_part_len = 512; + let second_offset = first_offset; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(object_id.clone(), second_offset, checksummed_bytes); + + let result = first.extend(&second); + assert!(matches!( + result, + Err(PartMismatchError::Offset { + actual: _, + requested: _ + }) + )); + } +} diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index e680babb1..a7b5753ea 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -1,11 +1,12 @@ use std::time::Instant; +use metrics::atomics::AtomicU64; use tracing::trace; use crate::prefetch::part::Part; use crate::prefetch::PrefetchReadError; use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender}; -use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::{Arc, AsyncMutex}; /// A queue of [Part]s where the first part can be partially read from if the reader doesn't want @@ -16,7 +17,7 @@ pub struct PartQueue { receiver: Receiver>>, failed: AtomicBool, /// The total number of bytes sent to the underlying queue of `self.receiver` - bytes_received: Arc, + bytes_received: Arc, } /// Producer side of the queue of [Part]s. @@ -24,13 +25,13 @@ pub struct PartQueue { pub struct PartQueueProducer { sender: Sender>>, /// The total number of bytes sent to `self.sender` - bytes_sent: Arc, + bytes_sent: Arc, } /// Creates an unbounded [PartQueue] and its related [PartQueueProducer]. pub fn unbounded_part_queue() -> (PartQueue, PartQueueProducer) { let (sender, receiver) = unbounded(); - let bytes_counter = Arc::new(AtomicUsize::new(0)); + let bytes_counter = Arc::new(AtomicU64::new(0)); let part_queue = PartQueue { current_part: AsyncMutex::new(None), receiver, @@ -93,7 +94,25 @@ impl PartQueue { Ok(part) } - pub fn bytes_received(&self) -> usize { + /// Push a new [Part] onto the front of the queue + /// which actually just concatenate it with the current part + pub async fn push_front(&self, mut part: Part) { + let mut current_part = self.current_part.lock().await; + + assert!( + !self.failed.load(Ordering::SeqCst), + "cannot use a PartQueue after failure" + ); + + if let Some(current_part) = current_part.as_mut() { + part.extend(current_part).unwrap(); + *current_part = part; + } else { + *current_part = Some(part); + } + } + + pub fn bytes_received(&self) -> u64 { self.bytes_received.load(Ordering::SeqCst) } } @@ -101,7 +120,7 @@ impl PartQueue { impl PartQueueProducer { /// Push a new [Part] onto the back of the queue pub fn push(&self, part: Result>) { - let part_len = part.as_ref().map_or(0, |part| part.len()); + let part_len = part.as_ref().map_or(0, |part| part.len()) as u64; // Unbounded channel will never actually block let send_result = self.sender.send_blocking(part); diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index 43dcdd0d4..e3650b34b 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -3,16 +3,19 @@ use std::{fmt::Debug, ops::Range}; use bytes::Bytes; use futures::task::SpawnExt; use futures::{pin_mut, task::Spawn, StreamExt}; -use mountpoint_s3_client::{types::ETag, ObjectClient}; +use mountpoint_s3_client::{types::GetObjectRequest, ObjectClient}; use tracing::{debug_span, error, trace, Instrument}; use crate::checksums::ChecksummedBytes; use crate::object::ObjectId; +use crate::prefetch::backpressure_controller::new_backpressure_controller; use crate::prefetch::part::Part; use crate::prefetch::part_queue::unbounded_part_queue; use crate::prefetch::task::RequestTask; use crate::prefetch::PrefetchReadError; +use super::task::RequestTaskConfig; + /// A generic interface to retrieve data from objects in a S3-like store. pub trait ObjectPartStream { /// Spawns a request to get the content of an object. The object data will be retrieved in fixed size @@ -21,11 +24,7 @@ pub trait ObjectPartStream { fn spawn_get_object_request( &self, client: &Client, - bucket: &str, - key: &str, - if_match: ETag, - range: RequestRange, - preferred_part_size: usize, + config: RequestTaskConfig, ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static; @@ -166,32 +165,45 @@ where fn spawn_get_object_request( &self, client: &Client, - bucket: &str, - key: &str, - if_match: ETag, - range: RequestRange, - preferred_part_size: usize, + config: RequestTaskConfig, ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static, { - assert!(preferred_part_size > 0); - let request_range = range.align(client.part_size().unwrap_or(8 * 1024 * 1024) as u64, true); - let start = request_range.start(); - let size = request_range.len(); + assert!(config.preferred_part_size > 0); + + let initial_read_window_range = config.range.start() + config.initial_read_window_size as u64; let (part_queue, part_queue_producer) = unbounded_part_queue(); - trace!(range=?request_range, "spawning request"); + let (backpressure_controller, mut backpressure_limiter) = new_backpressure_controller( + config.initial_read_window_size, + config.max_read_window_size, + config.read_window_size_multiplier, + initial_read_window_range, + config.range.end(), + ); + trace!(range=?config.range, "spawning request"); let request_task = { let client = client.clone(); - let bucket = bucket.to_owned(); - let id = ObjectId::new(key.to_owned(), if_match); - let span = debug_span!("prefetch", range=?request_range); + let bucket = config.bucket; + let id = ObjectId::new(config.key, config.if_match); + let span = debug_span!("prefetch", range=?config.range); async move { - let get_object_result = match client - .get_object(&bucket, id.key(), Some(request_range.into()), Some(id.etag().clone())) + // Normally, initial read window size should be very small (~1MB) so that we can serve the first read request as soon as possible, + // but right now the CRT only returns data in chunks of part size (default to 8MB) even if initial read window is smaller than that. + // This makes time to first byte much higher than expected. + // + // To workaround this issue, we instead create two requests for the part stream where the first request has the range exactly equal to + // the initial read window size to force the CRT to return data immediately, and the second request for the rest of the stream. + // From this, our first read window range must be 2x of the initial read window size because we make two requests, each with the same + // initial read window size. + // + // Let's start by issuing the first request with a range trimmed to initial read window size + let first_req_range = config.range.trim_end(initial_read_window_range); + let first_request = match client + .get_object(&bucket, id.key(), Some(first_req_range.into()), Some(id.etag().clone())) .await { Ok(get_object_result) => get_object_result, @@ -202,9 +214,11 @@ where } }; - pin_mut!(get_object_result); + pin_mut!(first_request); + // Make there is enough read window because we don't want to block this request at all + first_request.as_mut().increment_read_window(first_req_range.len()); loop { - match get_object_result.next().await { + match first_request.next().await { Some(Ok((offset, body))) => { trace!(offset, length = body.len(), "received GetObject part"); metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); @@ -213,7 +227,7 @@ where let mut body: Bytes = body.into(); let mut curr_offset = offset; loop { - let chunk_size = preferred_part_size.min(body.len()); + let chunk_size = config.preferred_part_size.min(body.len()); if chunk_size == 0 { break; } @@ -234,6 +248,91 @@ where None => break, } } + + // After the first request is completed we will create the second request for the rest of the stream, + // but only if there is something left to be fetched. + let range = config.range.trim_start(initial_read_window_range); + if !range.is_empty() { + // First, blocks until there is enough read window and record the new read window offset before creating the request + let next_read_window_offset = + match backpressure_limiter.wait_for_read_window_increment(range.start()).await { + Ok(next) => next, + Err(e) => { + part_queue_producer.push(Err(e)); + return; + } + }; + + let request = match client + .get_object(&bucket, id.key(), Some(range.into()), Some(id.etag().clone())) + .await + { + Ok(get_object_result) => get_object_result, + Err(e) => { + error!(key=id.key(), error=?e, "GetObject request failed"); + part_queue_producer.push(Err(PrefetchReadError::GetRequestFailed(e))); + return; + } + }; + + pin_mut!(request); + // Increment read window if we got a request from the backpressure limiter earlier + if let Some(next_read_window_offset) = next_read_window_offset { + let diff = + next_read_window_offset.saturating_sub(request.as_ref().read_window_range()) as usize; + request.as_mut().increment_read_window(diff); + } + + loop { + match request.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + metrics::counter!("s3.client.total_bytes", "type" => "read") + .increment(body.len() as u64); + // pre-split the body into multiple parts as suggested by preferred part size + // in order to avoid validating checksum on large parts at read. + let mut body: Bytes = body.into(); + let mut curr_offset = offset; + loop { + let chunk_size = config.preferred_part_size.min(body.len()); + if chunk_size == 0 { + break; + } + let chunk = body.split_to(chunk_size); + // S3 doesn't provide checksum for us if the request range is not aligned to + // object part boundaries, so we're computing our own checksum here. + let checksum_bytes = ChecksummedBytes::new(chunk); + let part = Part::new(id.clone(), curr_offset, checksum_bytes); + curr_offset += part.len() as u64; + part_queue_producer.push(Ok(part)); + } + + // Blocks if read window increment if it's not enough to read the next offset + let next_offset = curr_offset; + match backpressure_limiter.wait_for_read_window_increment(next_offset).await { + Ok(next) => { + if let Some(next_read_window_offset) = next { + let diff = next_read_window_offset + .saturating_sub(request.as_ref().read_window_range()) + as usize; + request.as_mut().increment_read_window(diff); + } + } + Err(e) => { + part_queue_producer.push(Err(e)); + break; + } + } + } + Some(Err(e)) => { + error!(key=id.key(), error=?e, "GetObject body part failed"); + part_queue_producer.push(Err(PrefetchReadError::GetRequestFailed(e))); + break; + } + None => break, + } + } + } trace!("request finished"); } .instrument(span) @@ -241,7 +340,7 @@ where let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); - RequestTask::from_handle(task_handle, size, start, part_queue) + RequestTask::from_handle(task_handle, config.range, part_queue, backpressure_controller) } } diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index c55ebf532..551ac214d 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -1,65 +1,98 @@ use futures::future::RemoteHandle; +use mountpoint_s3_client::types::ETag; use crate::prefetch::part::Part; -use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; +use crate::prefetch::part_queue::PartQueue; use crate::prefetch::PrefetchReadError; +use super::backpressure_controller::BackpressureController; +use super::part_stream::RequestRange; + +pub struct RequestTaskConfig { + pub(super) bucket: String, + pub(super) key: String, + pub(super) if_match: ETag, + pub(super) range: RequestRange, + pub(super) preferred_part_size: usize, + pub(super) initial_read_window_size: usize, + pub(super) max_read_window_size: usize, + pub(super) read_window_size_multiplier: usize, +} + /// A single GetObject request submitted to the S3 client #[derive(Debug)] pub struct RequestTask { /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if /// the request is fake (created by seeking backwards in the stream) - task_handle: Option>, + _task_handle: RemoteHandle<()>, remaining: usize, - start_offset: u64, - total_size: usize, + range: RequestRange, part_queue: PartQueue, + backpressure_controller: BackpressureController, } impl RequestTask { - pub fn from_handle(task_handle: RemoteHandle<()>, size: usize, offset: u64, part_queue: PartQueue) -> Self { + pub fn from_handle( + task_handle: RemoteHandle<()>, + range: RequestRange, + part_queue: PartQueue, + backpressure_controller: BackpressureController, + ) -> Self { Self { - task_handle: Some(task_handle), - remaining: size, - start_offset: offset, - total_size: size, + _task_handle: task_handle, + remaining: range.len(), + range, part_queue, + backpressure_controller, } } - pub fn from_parts(parts: impl IntoIterator, offset: u64) -> Self { - let mut size = 0; - let (part_queue, part_queue_producer) = unbounded_part_queue(); - for part in parts { - size += part.len(); - part_queue_producer.push(Ok(part)); - } - Self { - task_handle: None, - remaining: size, - start_offset: offset, - total_size: size, - part_queue, + // Push a given list of parts in front of the part queue + pub async fn push_front(&mut self, mut parts: Vec) -> Result<(), PrefetchReadError> { + // Merge all parts into one single part. + // This could result in a really big part, but we normally use this only for backward seek + // so its size should not be bigger than the prefetcher's `max_backward_seek_distance`. + let part = parts.iter_mut().reduce(|acc, e| { + acc.extend(e).unwrap(); + acc + }); + if let Some(part) = part { + self.remaining += part.len(); + self.part_queue.push_front(part.clone()).await; } + Ok(()) } pub async fn read(&mut self, length: usize) -> Result> { + tracing::trace!(length, "read"); let part = self.part_queue.read(length).await?; debug_assert!(part.len() <= self.remaining); self.remaining -= part.len(); + + // We read some data out of the part queue so the read window should be moved + let next_offset = part.offset() + part.len() as u64; + self.backpressure_controller.next_offset_hint(next_offset).await?; + + let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize; + // If the part queue is empty it means we are reading faster than the task could prefetch, + // so we should use larger window for the task. + if remaining_in_queue == 0 { + self.backpressure_controller.try_scaling_up(); + } + Ok(part) } pub fn start_offset(&self) -> u64 { - self.start_offset + self.range.start() } pub fn end_offset(&self) -> u64 { - self.start_offset + self.total_size as u64 + self.range.end() } pub fn total_size(&self) -> usize { - self.total_size + self.range.len() } pub fn remaining(&self) -> usize { @@ -68,12 +101,10 @@ impl RequestTask { /// Maximum offset which data is known to be already in the `self.part_queue` pub fn available_offset(&self) -> u64 { - self.start_offset + self.part_queue.bytes_received() as u64 + self.start_offset() + self.part_queue.bytes_received() } - /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and - /// shouldn't be counted for prefetcher progress. - pub fn is_streaming(&self) -> bool { - self.task_handle.is_some() + pub fn read_window_range(&self) -> u64 { + self.backpressure_controller.read_window_range() } } diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index aa8290fe8..87399531f 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -49,6 +49,7 @@ pub type TestClientBox = Box; pub struct TestSessionConfig { pub part_size: usize, + pub initial_read_window_size: usize, pub filesystem_config: S3FilesystemConfig, pub prefetcher_config: PrefetcherConfig, pub auth_config: S3ClientAuthConfig, @@ -56,8 +57,10 @@ pub struct TestSessionConfig { impl Default for TestSessionConfig { fn default() -> Self { + let part_size = 8 * 1024 * 1024; Self { - part_size: 8 * 1024 * 1024, + part_size, + initial_read_window_size: part_size, filesystem_config: Default::default(), prefetcher_config: Default::default(), auth_config: Default::default(), @@ -125,6 +128,8 @@ pub mod mock_session { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: test_config.part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(client_config)); @@ -162,6 +167,8 @@ pub mod mock_session { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: test_config.part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(client_config)); @@ -284,7 +291,9 @@ pub mod s3_session { let client_config = S3ClientConfig::default() .part_size(test_config.part_size) .endpoint_config(EndpointConfig::new(®ion)) - .auth_config(test_config.auth_config); + .auth_config(test_config.auth_config) + .read_backpressure(true) + .initial_read_window(test_config.initial_read_window_size); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); let prefetcher = default_prefetch(runtime, test_config.prefetcher_config); @@ -316,7 +325,9 @@ pub mod s3_session { let client_config = S3ClientConfig::default() .part_size(test_config.part_size) - .endpoint_config(EndpointConfig::new(®ion)); + .endpoint_config(EndpointConfig::new(®ion)) + .read_backpressure(true) + .initial_read_window(test_config.initial_read_window_size); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index cfcb03059..52969e433 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -35,6 +35,8 @@ pub fn make_test_filesystem( let client_config = MockClientConfig { bucket: bucket.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index aafade057..b7b606bcd 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -733,6 +733,8 @@ async fn test_upload_aborted_on_write_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -810,6 +812,8 @@ async fn test_upload_aborted_on_fsync_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -872,6 +876,8 @@ async fn test_upload_aborted_on_release_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -1487,7 +1493,9 @@ async fn test_readdir_rewind_with_local_files_only() { async fn test_lookup_404_not_an_error() { let name = "test_lookup_404_not_an_error"; let (bucket, prefix) = get_test_bucket_and_prefix(name); - let client_config = S3ClientConfig::default().endpoint_config(EndpointConfig::new(&get_test_region())); + let client_config = S3ClientConfig::default() + .endpoint_config(EndpointConfig::new(&get_test_region())) + .read_backpressure(true); let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); let fs = make_test_filesystem_with_client( client, @@ -1521,7 +1529,8 @@ async fn test_lookup_forbidden() { let auth_config = get_crt_client_auth_config(get_scoped_down_credentials(&policy).await); let client_config = S3ClientConfig::default() .auth_config(auth_config) - .endpoint_config(EndpointConfig::new(&get_test_region())); + .endpoint_config(EndpointConfig::new(&get_test_region())) + .read_backpressure(true); let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); // create an empty file diff --git a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs index c7ece49af..4f47571d4 100644 --- a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs +++ b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs @@ -1,6 +1,5 @@ use fuser::BackgroundSession; use mountpoint_s3::data_cache::InMemoryDataCache; -use mountpoint_s3::prefetch::PrefetcherConfig; use std::fs::{File, OpenOptions}; use std::io::Read; use tempfile::TempDir; @@ -65,30 +64,32 @@ fn read_test_mock_with_cache(object_size: usize) { ); } -/// test for checking either prefetching fails or read original object when object is mutated during read. -/// Prefetching of next request occurs when more than half of the current request is being read. -/// So, when we read the first block, it prefetches the requests ti require to fulfill and the next request -/// depending on size of last request. -/// If object is mutated, E-Tag for the new prefetch request will change and hence the request will fail giving IO error. -fn prefetch_test_etag(creator_fn: F, prefix: &str, request_size: usize, read_size: usize) -where +/// Test for checking either prefetching fails or read original object when object is mutated during read. +/// Prefetching of next read window occurs when more than half of the current window is being read. +/// When we read the first block, it prefetches the data with a window size enough to fulfill the request +/// then increase the window size when needed. +/// If object is mutated, reading a part from the next read window would fail from pre-condition (ETag) error. +fn prefetch_test_etag( + creator_fn: F, + prefix: &str, + part_size: usize, + initial_read_window_size: usize, + read_size: usize, +) where F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), { - const OBJECT_SIZE: usize = 1024 * 1024; - - let prefetcher_config = PrefetcherConfig { - first_request_size: request_size, - ..Default::default() - }; - + // Object needs to be larger than part size because the CRT returns data in chunks of part size, + // we would not be able to see the failures if it's smaller. + let object_size = part_size * 2; let (mount_point, _session, mut test_client) = creator_fn( prefix, TestSessionConfig { - prefetcher_config, + part_size, + initial_read_window_size, ..Default::default() }, ); - let original_data_buf = vec![0u8; OBJECT_SIZE]; + let original_data_buf = vec![0u8; object_size]; test_client.put_object("dir/hello.txt", &original_data_buf).unwrap(); @@ -104,7 +105,7 @@ where .expect("Should be able to read file to buf"); // changed the value of data buf to distinguish it from previous data of the object. - let final_data_buf = vec![255u8; OBJECT_SIZE]; + let final_data_buf = vec![255u8; object_size]; test_client.put_object("dir/hello.txt", &final_data_buf).unwrap(); let mut dest_buf = vec![0u8; read_size]; @@ -149,11 +150,13 @@ where #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { +fn prefetch_test_etag_mock(initial_read_window_size: usize, read_size: usize) { + let part_size = 256 * 1024; prefetch_test_etag( fuse::mock_session::new, "prefetch_test_etag_mock", - request_size, + part_size, + initial_read_window_size, read_size, ); } @@ -162,11 +165,13 @@ fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { +fn prefetch_test_etag_mock_with_cache(initial_read_window_size: usize, read_size: usize) { + let part_size = 256 * 1024; prefetch_test_etag( fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_mock", - request_size, + part_size, + initial_read_window_size, read_size, ); } @@ -174,22 +179,31 @@ fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] -#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_s3(request_size: usize, read_size: usize) { - prefetch_test_etag(fuse::s3_session::new, "prefetch_test_etag_s3", request_size, read_size); +fn prefetch_test_etag_s3(initial_read_window_size: usize, read_size: usize) { + let part_size = 8 * 1024 * 1024; + prefetch_test_etag( + fuse::s3_session::new, + "prefetch_test_etag_s3", + part_size, + initial_read_window_size, + read_size, + ); } #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] -#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_s3_with_cache(request_size: usize, read_size: usize) { +fn prefetch_test_etag_s3_with_cache(initial_read_window_size: usize, read_size: usize) { + let part_size = 8 * 1024 * 1024; prefetch_test_etag( fuse::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_s3", - request_size, + part_size, + initial_read_window_size, read_size, ); } diff --git a/mountpoint-s3/tests/mock_s3_tests.rs b/mountpoint-s3/tests/mock_s3_tests.rs index cdf17bb0d..4aaa95dfa 100644 --- a/mountpoint-s3/tests/mock_s3_tests.rs +++ b/mountpoint-s3/tests/mock_s3_tests.rs @@ -134,6 +134,7 @@ fn create_fs_with_mock_s3(bucket: &str) -> (TestS3Filesystem, MockS let client_config = S3ClientConfig::default() .endpoint_config(endpoint_config) .auth_config(S3ClientAuthConfig::NoSigning) + .read_backpressure(true) .max_attempts(NonZeroUsize::new(3).unwrap()); // retry S3 request 3 times (which equals the existing default) let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); (