diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index 7fdb18b8f..4fccb4039 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -615,10 +615,24 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo user_agent.key_value("mp-cache-ttl", &ttl.to_string()); } + // This is a weird looking number! We really want our first request size to be 1MiB, + // which is a common IO size. But Linux's readahead will try to read an extra 128k on on + // top of a 1MiB read, which we'd have to wait for a second request to service. Because + // FUSE doesn't know the difference between regular reads and readahead reads, it will + // send us a READ request for that 128k, so we'll have to block waiting for it even if + // the application doesn't want it. This is all in the noise for sequential IO, but + // waiting for the readahead hurts random IO. So we add 128k to the first request size + // to avoid the latency hit of the second request. + // + // Note the CRT does not respect this value right now, they always return chunks of part size + // but this is the first window size we prefer. + let initial_read_window_size = 1024 * 1024 + 128 * 1024; let mut client_config = S3ClientConfig::new() .auth_config(auth_config) .throughput_target_gbps(throughput_target_gbps) .part_size(args.part_size as usize) + .read_backpressure(true) + .initial_read_window(initial_read_window_size) .user_agent(user_agent); if args.requester_pays { client_config = client_config.request_payer("requester"); diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 46c3f0915..b03563cd2 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -1349,6 +1349,8 @@ mod tests { let bucket = "bucket"; let client = Arc::new(MockClient::new(MockClientConfig { bucket: bucket.to_owned(), + enable_backpressure: true, + initial_read_window_size: 1024 * 1024, ..Default::default() })); // Create "dir1" in the client to avoid creating it locally diff --git a/mountpoint-s3/src/fs/error.rs b/mountpoint-s3/src/fs/error.rs index d01c53512..68554ac8b 100644 --- a/mountpoint-s3/src/fs/error.rs +++ b/mountpoint-s3/src/fs/error.rs @@ -131,7 +131,9 @@ impl From> fo PrefetchReadError::Integrity(e) => err!(libc::EIO, source:e, "integrity error"), PrefetchReadError::GetRequestFailed(_) | PrefetchReadError::GetRequestTerminatedUnexpectedly - | PrefetchReadError::GetRequestReturnedWrongOffset { .. } => { + | PrefetchReadError::GetRequestReturnedWrongOffset { .. } + | PrefetchReadError::BackpressurePreconditionFailed + | PrefetchReadError::ReadWindowIncrement => { err!(libc::EIO, source:err, "get request failed") } } diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 486a1ceed..ca4565410 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -7,6 +7,7 @@ //! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a //! non-sequential read, we abandon the prefetching and start again with the minimum request size. +mod backpressure_controller; mod caching_stream; mod part; mod part_queue; @@ -14,7 +15,6 @@ mod part_stream; mod seek_window; mod task; -use std::collections::VecDeque; use std::fmt::Debug; use std::time::Duration; @@ -24,6 +24,7 @@ use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; +use task::RequestTaskConfig; use thiserror::Error; use tracing::trace; @@ -79,6 +80,12 @@ pub enum PrefetchReadError { #[error("integrity check failed")] Integrity(#[from] IntegrityError), + + #[error("backpressure must be enabled with non-zero initial read window")] + BackpressurePreconditionFailed, + + #[error("read window increment failed")] + ReadWindowIncrement, } pub type DefaultPrefetcher = Prefetcher>; @@ -110,36 +117,24 @@ where #[derive(Debug, Clone, Copy)] pub struct PrefetcherConfig { - /// Size of the first request in a prefetch run - pub first_request_size: usize, - /// Maximum size of a single prefetch request - pub max_request_size: usize, + /// Maximum size of the read window + max_read_window_size: usize, /// Factor to increase the request size by whenever the reader continues making sequential reads - pub sequential_prefetch_multiplier: usize, + sequential_prefetch_multiplier: usize, /// Timeout to wait for a part to become available - pub read_timeout: Duration, + read_timeout: Duration, /// The maximum amount of unavailable data the prefetcher will tolerate during a seek operation /// before resetting and starting a new S3 request. - pub max_forward_seek_wait_distance: u64, + max_forward_seek_wait_distance: u64, /// The maximum distance the prefetcher will seek backwards before resetting and starting a new /// S3 request. We keep this much data in memory in addition to any inflight requests. - pub max_backward_seek_distance: u64, + max_backward_seek_distance: u64, } impl Default for PrefetcherConfig { fn default() -> Self { - #[allow(clippy::identity_op)] Self { - // This is a weird looking number! We really want our first request size to be 1MiB, - // which is a common IO size. But Linux's readahead will try to read an extra 128k on on - // top of a 1MiB read, which we'd have to wait for a second request to service. Because - // FUSE doesn't know the difference between regular reads and readahead reads, it will - // send us a READ request for that 128k, so we'll have to block waiting for it even if - // the application doesn't want it. This is all in the noise for sequential IO, but - // waiting for the readahead hurts random IO. So we add 128k to the first request size - // to avoid the latency hit of the second request. - first_request_size: 1 * 1024 * 1024 + 128 * 1024, - max_request_size: 2 * 1024 * 1024 * 1024, + max_read_window_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), // We want these large enough to tolerate a single out-of-order Linux readahead, which @@ -147,11 +142,44 @@ impl Default for PrefetcherConfig { // making a guess about where the optimal cut-off point is before it would be faster to // just start a new request instead. max_forward_seek_wait_distance: 16 * 1024 * 1024, - max_backward_seek_distance: 1 * 1024 * 1024, + max_backward_seek_distance: 1024 * 1024, } } } +impl PrefetcherConfig { + /// Create a prefetcher config with default values + pub fn new() -> Self { + // Initialise the builder with default values + Self::default() + } + + pub fn max_read_window_size(mut self, max_read_window_size: usize) -> Self { + self.max_read_window_size = max_read_window_size; + self + } + + pub fn sequential_prefetch_multiplier(mut self, sequential_prefetch_multiplier: usize) -> Self { + self.sequential_prefetch_multiplier = sequential_prefetch_multiplier; + self + } + + pub fn read_timeout(mut self, read_timeout: Duration) -> Self { + self.read_timeout = read_timeout; + self + } + + pub fn max_forward_seek_wait_distance(mut self, max_forward_seek_wait_distance: u64) -> Self { + self.max_forward_seek_wait_distance = max_forward_seek_wait_distance; + self + } + + pub fn max_backward_seek_distance(mut self, max_backward_seek_distance: u64) -> Self { + self.max_backward_seek_distance = max_backward_seek_distance; + self + } +} + /// A [Prefetcher] creates and manages prefetching GetObject requests to objects. #[derive(Debug)] pub struct Prefetcher { @@ -206,11 +234,7 @@ pub struct PrefetchGetObject { client: Arc, part_stream: Arc, config: PrefetcherConfig, - // Invariant: the offset of the first byte in this task's part queue is always - // self.next_sequential_read_offset. - current_task: Option>, - // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: VecDeque>, + backpressure_task: Option>, // Invariant: the offset of the last byte in this window is always // self.next_sequential_read_offset - 1. backward_seek_window: SeekWindow, @@ -221,7 +245,6 @@ pub struct PrefetchGetObject { /// Start offset for sequential read, used for calculating contiguous read metric sequential_read_start_offset: u64, next_sequential_read_offset: u64, - next_request_size: usize, next_request_offset: u64, size: u64, } @@ -273,13 +296,11 @@ where client, part_stream, config, - current_task: None, - future_tasks: Default::default(), + backpressure_task: None, backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize), preferred_part_size: 128 * 1024, sequential_read_start_offset: 0, next_sequential_read_offset: 0, - next_request_size: config.first_request_size, next_request_offset: 0, bucket: bucket.to_owned(), object_id: ObjectId::new(key.to_owned(), etag), @@ -329,12 +350,13 @@ where } assert_eq!(self.next_sequential_read_offset, offset); - self.prepare_requests(); + if self.backpressure_task.is_none() { + self.backpressure_task = Some(self.spawn_read_backpressure_request()?); + } let mut response = ChecksummedBytes::default(); while to_read > 0 { - let Some(current_task) = self.current_task.as_mut() else { - // If [prepare_requests] didn't spawn a request, we've reached the end of the object. + let Some(current_task) = self.backpressure_task.as_mut() else { trace!(offset, length, "read beyond object size"); break; }; @@ -347,8 +369,6 @@ where .unwrap(); self.next_sequential_read_offset += part_bytes.len() as u64; - self.prepare_requests(); - // If we can complete the read with just a single buffer, early return to avoid copying // into a new buffer. This should be the common case as long as part size is larger than // read size, which it almost always is for real S3 clients and FUSE. @@ -364,75 +384,45 @@ where Ok(response) } - /// Runs on every read to prepare and spawn any requests our prefetching logic requires - fn prepare_requests(&mut self) { - let current_task = self.current_task.as_ref(); - if current_task.map(|task| task.remaining() == 0).unwrap_or(true) { - // There's no current task, or the current task is finished. Prepare the next request. - if let Some(next_task) = self.future_tasks.pop_front() { - self.current_task = Some(next_task); - return; - } - self.current_task = self.spawn_next_request(); - } else if current_task - .map(|task| { - // Don't trigger prefetch if we're in a fake task created by backward streaming - task.is_streaming() && task.remaining() <= task.total_size() / 2 - }) - .unwrap_or(false) - && self.future_tasks.is_empty() - { - // The current task is nearing completion, so pre-spawn the next request in anticipation - // of it completing. - if let Some(task) = self.spawn_next_request() { - self.future_tasks.push_back(task); - } - } - } + /// Spawn a backpressure GetObject request which has a range from current offset to the end of the file. + /// We will be using flow-control window to control how much data we want to download into the prefetcher. + fn spawn_read_backpressure_request( + &mut self, + ) -> Result, PrefetchReadError> { + let start = self.next_sequential_read_offset; + let object_size = self.size as usize; + let range = RequestRange::new(object_size, start, object_size); + + // The prefetcher now relies on backpressure mechanism so it must be enabled + let initial_read_window_size = match self.client.initial_read_window_size() { + Some(value) => value, + None => return Err(PrefetchReadError::BackpressurePreconditionFailed), + }; - /// Spawn the next required request - fn spawn_next_request(&mut self) -> Option> { - let start = self.next_request_offset; - if start >= self.size { - return None; + // Make sure that we don't get blocked from the beginning + if initial_read_window_size == 0 { + return Err(PrefetchReadError::BackpressurePreconditionFailed); } - let range = RequestRange::new(self.size as usize, start, self.next_request_size); - let task = self.part_stream.spawn_get_object_request( - &self.client, - &self.bucket, - self.object_id.key(), - self.object_id.etag().clone(), + let config = RequestTaskConfig { + bucket: self.bucket.clone(), + key: self.object_id.key().to_owned(), + if_match: self.object_id.etag().clone(), range, - self.preferred_part_size, - ); - - // [read] will reset these if the reader stops making sequential requests - self.next_request_offset += task.total_size() as u64; - self.next_request_size = self.get_next_request_size(task.total_size()); - - Some(task) - } - - /// Suggest next request size. - /// The next request size is the current request size multiplied by sequential prefetch multiplier. - fn get_next_request_size(&self, request_size: usize) -> usize { - // TODO: this logic doesn't work well right now in the case where part_size < - // first_request_size and sequential_prefetch_multiplier = 1. It ends up just repeatedly - // shrinking the request size until it reaches 1. But this isn't a configuration we - // currently expect to ever run in (part_size will always be >= 5MB for MPU reasons, and a - // prefetcher with multiplier 1 is not very good). - (request_size * self.config.sequential_prefetch_multiplier).min(self.config.max_request_size) + preferred_part_size: self.preferred_part_size, + initial_read_window_size, + max_read_window_size: self.config.max_read_window_size, + read_window_size_multiplier: self.config.sequential_prefetch_multiplier, + }; + Ok(self.part_stream.spawn_get_object_request(&self.client, config)) } /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { - self.current_task = None; - self.future_tasks.drain(..); + self.backpressure_task = None; self.backward_seek_window.clear(); self.sequential_read_start_offset = offset; self.next_sequential_read_offset = offset; - self.next_request_size = self.config.first_request_size; self.next_request_offset = offset; } @@ -445,7 +435,7 @@ where if offset > self.next_sequential_read_offset { self.try_seek_forward(offset).await } else { - self.try_seek_backward(offset) + self.try_seek_backward(offset).await } } @@ -454,43 +444,22 @@ where let total_seek_distance = offset - self.next_sequential_read_offset; histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64); - let Some(current_task) = self.current_task.as_mut() else { + let Some(task) = self.backpressure_task.as_mut() else { // Can't seek if there's no requests in flight at all return Ok(false); }; - // Jump ahead to the right request - if offset >= current_task.end_offset() { - self.next_sequential_read_offset = current_task.end_offset(); - self.current_task = None; - while let Some(next_request) = self.future_tasks.pop_front() { - if next_request.end_offset() > offset { - self.current_task = Some(next_request); - break; - } else { - self.next_sequential_read_offset = next_request.end_offset(); - } - } - if self.current_task.is_none() { - // No inflight task containing the target offset. - trace!(current_offset=?self.next_sequential_read_offset, requested_offset=?offset, "seek failed: not enough inflight data"); - return Ok(false); - } - // We could try harder to preserve the backwards seek buffer if we're near the - // request boundary, but it's probably not worth the trouble. - self.backward_seek_window.clear(); + // Not enough data in the read window to serve the forward seek + if offset >= task.read_window_range() { + return Ok(false); } - // At this point it's guaranteed by the previous if-block that `offset` is in the range of `self.current_task` - let current_task = self - .current_task - .as_mut() - .expect("a request existed that covered this seek offset"); // If we have enough bytes already downloaded (`available`) to skip straight to this read, then do // it. Otherwise, we're willing to wait for the bytes to download only if they're coming "soon", where // soon is defined as up to `max_forward_seek_wait_distance` bytes ahead of the available offset. - let available_offset = current_task.available_offset(); - if offset >= available_offset.saturating_add(self.config.max_forward_seek_wait_distance) { + let available_offset = task.available_offset(); + let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance); + if offset >= available_soon_offset { trace!( requested_offset = offset, available_offset = available_offset, @@ -500,7 +469,7 @@ where } let mut seek_distance = offset - self.next_sequential_read_offset; while seek_distance > 0 { - let part = current_task.read(seek_distance as usize).await?; + let part = task.read(seek_distance as usize).await?; seek_distance -= part.len() as u64; self.next_sequential_read_offset += part.len() as u64; self.backward_seek_window.push(part); @@ -508,7 +477,7 @@ where Ok(true) } - fn try_seek_backward(&mut self, offset: u64) -> Result> { + async fn try_seek_backward(&mut self, offset: u64) -> Result> { assert!(offset < self.next_sequential_read_offset); let backwards_length_needed = self.next_sequential_read_offset - offset; histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64); @@ -517,16 +486,13 @@ where trace!("seek failed: not enough data in backwards seek window"); return Ok(false); }; - // We're going to create a new fake "request" that contains the parts we read out of the - // window. That sounds a bit hacky, but it keeps all the read logic simple rather than - // needing separate paths for backwards seeks vs others. - let request = RequestTask::from_parts(parts, offset); - if let Some(current_task) = self.current_task.take() { - self.future_tasks.push_front(current_task); + if let Some(task) = self.backpressure_task.as_mut() { + task.push_front(parts).await?; + self.next_sequential_read_offset = offset; + Ok(true) + } else { + Ok(false) } - self.current_task = Some(request); - self.next_sequential_read_offset = offset; - Ok(true) } } @@ -552,12 +518,11 @@ mod tests { #![allow(clippy::identity_op)] use crate::data_cache::InMemoryDataCache; - use crate::prefetch::part_stream::ClientPartStream; use super::caching_stream::CachingPartStream; use super::*; use futures::executor::{block_on, ThreadPool}; - use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; + use mountpoint_s3_client::error::GetObjectError; use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap}; use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; use proptest::proptest; @@ -571,9 +536,9 @@ mod tests { #[derive(Debug, Arbitrary)] struct TestConfig { #[proptest(strategy = "16usize..1*1024*1024")] - first_request_size: usize, + initial_read_window_size: usize, #[proptest(strategy = "16usize..1*1024*1024")] - max_request_size: usize, + max_read_window_size: usize, #[proptest(strategy = "1usize..8usize")] sequential_prefetch_multiplier: usize, #[proptest(strategy = "16usize..2*1024*1024")] @@ -582,6 +547,8 @@ mod tests { max_forward_seek_wait_distance: u64, #[proptest(strategy = "1u64..4*1024*1024")] max_backward_seek_distance: u64, + #[proptest(strategy = "16usize..1*1024*1024")] + cache_block_size: usize, } fn default_stream() -> ClientPartStream { @@ -604,6 +571,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -612,14 +581,12 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, - sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, - read_timeout: Duration::from_secs(5), - max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance, - max_backward_seek_distance: test_config.max_backward_seek_distance, - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(test_config.max_read_window_size) + .sequential_prefetch_multiplier(test_config.sequential_prefetch_multiplier) + .read_timeout(Duration::from_secs(5)) + .max_forward_seek_wait_distance(test_config.max_forward_seek_wait_distance) + .max_backward_seek_distance(test_config.max_backward_seek_distance); let prefetcher = Prefetcher::new(part_stream, prefetcher_config); let mut request = prefetcher.prefetch(client, "test-bucket", "hello", size, etag); @@ -645,12 +612,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 1024 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config); } @@ -662,12 +630,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 64 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -679,17 +648,96 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 64 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 256 * 1024 * 1024 + 111, 1024 * 1024, config); } + fn fail_with_backpressure_precondition_test( + part_stream: Stream, + test_config: TestConfig, + client_config: MockClientConfig, + ) where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let client = MockClient::new(client_config); + let read_size = 1 * MB; + let object_size = 8 * MB; + let object = MockObject::ramp(0xaa, object_size, ETag::for_tests()); + let etag = object.etag(); + + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(test_config.max_read_window_size) + .sequential_prefetch_multiplier(test_config.sequential_prefetch_multiplier); + + let prefetcher = Prefetcher::new(part_stream, prefetcher_config); + let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", object_size as u64, etag); + let result = block_on(request.read(0, read_size)); + assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed))); + } + + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn fail_with_backpressure_not_enabled(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let test_config = TestConfig { + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, + sequential_prefetch_multiplier: 8, + client_part_size: 8 * 1024 * 1024, + max_forward_seek_wait_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, + }; + + // backpressure is not enabled for the client + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: test_config.client_part_size, + enable_backpressure: false, + ..Default::default() + }; + + fail_with_backpressure_precondition_test(part_stream, test_config, config); + } + + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn fail_with_backpressure_zero_read_window(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let test_config = TestConfig { + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, + sequential_prefetch_multiplier: 8, + client_part_size: 8 * 1024 * 1024, + max_forward_seek_wait_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, + }; + + // backpressure is enabled but initial read window size is zero + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: 0, + ..Default::default() + }; + + fail_with_backpressure_precondition_test(part_stream, test_config, config); + } + fn fail_sequential_read_test( part_stream: Stream, size: u64, @@ -700,6 +748,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = MockClient::new(config); @@ -710,12 +760,9 @@ mod tests { let client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new()); - let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, - sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(test_config.max_read_window_size) + .sequential_prefetch_multiplier(test_config.sequential_prefetch_multiplier); let prefetcher = Prefetcher::new(part_stream, prefetcher_config); let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", size, etag); @@ -748,23 +795,25 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 1024 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; let mut get_failures = HashMap::new(); - get_failures.insert( - 2, - Err(ObjectClientError::ClientError(MockClientError( - err_value.to_owned().into(), - ))), - ); + // We only have one request with backpressure, so we are going to inject the failure at + // 2nd read from that request stream. + get_failures.insert(1, Ok((2, MockClientError(err_value.to_owned().into())))); + + // Object needs to be bigger than a part size in order to trigger the failure + // because the CRT data returns in chunks of part size. + let object_size = config.client_part_size + 111; - fail_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config, get_failures); + fail_sequential_read_test(part_stream, object_size as u64, 1024 * 1024, config, get_failures); } proptest! { @@ -788,18 +837,17 @@ mod tests { fn proptest_sequential_read_with_cache( size in 1u64..1 * 1024 * 1024, read_size in 1usize..1 * 1024 * 1024, - block_size in 16usize..1 * 1024 * 1024, config: TestConfig, ) { - run_sequential_read_test(caching_stream(block_size), size, read_size, config); + run_sequential_read_test(caching_stream(config.cache_block_size), size, read_size, config); } #[test] fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, - block_size in 16usize..1 * 1024 * 1024, config: TestConfig) { + config: TestConfig) { // Pick read size smaller than the object size let read_size = (size as usize / read_factor).max(1); - run_sequential_read_test(caching_stream(block_size), size, read_size, config); + run_sequential_read_test(caching_stream(config.cache_block_size), size, read_size, config); } } @@ -808,12 +856,13 @@ mod tests { let object_size = 854966; let read_size = 161647; let config = TestConfig { - first_request_size: 484941, - max_request_size: 81509, + initial_read_window_size: 484941, + max_read_window_size: 81509, sequential_prefetch_multiplier: 1, client_part_size: 181682, max_forward_seek_wait_distance: 1, max_backward_seek_distance: 18668, + cache_block_size: 1 * MB, }; run_sequential_read_test(default_stream(), object_size, read_size, config); } @@ -827,6 +876,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -835,14 +886,11 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, - sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, - max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance, - max_backward_seek_distance: test_config.max_backward_seek_distance, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(test_config.max_read_window_size) + .sequential_prefetch_multiplier(test_config.sequential_prefetch_multiplier) + .max_forward_seek_wait_distance(test_config.max_forward_seek_wait_distance) + .max_backward_seek_distance(test_config.max_backward_seek_distance); let prefetcher = Prefetcher::new(part_stream, prefetcher_config); let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, etag); @@ -895,11 +943,10 @@ mod tests { #[test] fn proptest_random_read_with_cache( reads in random_read_strategy(1 * 1024 * 1024), - block_size in 16usize..1 * 1024 * 1024, config: TestConfig, ) { let (object_size, reads) = reads; - run_random_read_test(caching_stream(block_size), object_size, reads, config); + run_random_read_test(caching_stream(config.cache_block_size), object_size, reads, config); } } @@ -908,12 +955,13 @@ mod tests { let object_size = 724314; let reads = vec![(0, 516883)]; let config = TestConfig { - first_request_size: 3684779, - max_request_size: 2147621, + initial_read_window_size: 3684779, + max_read_window_size: 2147621, sequential_prefetch_multiplier: 4, client_part_size: 516882, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -923,12 +971,13 @@ mod tests { let object_size = 755678; let reads = vec![(0, 278499), (311250, 1)]; let config = TestConfig { - first_request_size: 556997, - max_request_size: 105938, + initial_read_window_size: 556997, + max_read_window_size: 105938, sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -938,12 +987,13 @@ mod tests { let object_size = 755678; let reads = vec![(0, 236766), (291204, 1), (280930, 36002)]; let config = TestConfig { - first_request_size: 556997, - max_request_size: 105938, + initial_read_window_size: 556997, + max_read_window_size: 105938, sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_wait_distance: 2260662, max_backward_seek_distance: 2369799, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -953,12 +1003,13 @@ mod tests { let object_size = 14201; let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)]; let config = TestConfig { - first_request_size: 457999, - max_request_size: 863511, + initial_read_window_size: 457999, + max_read_window_size: 863511, sequential_prefetch_multiplier: 5, client_part_size: 1972409, max_forward_seek_wait_distance: 2810651, max_backward_seek_distance: 3531090, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -971,6 +1022,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: PART_SIZE, + enable_backpressure: true, + initial_read_window_size: OBJECT_SIZE, ..Default::default() }; @@ -1003,10 +1056,7 @@ mod tests { )); // For simplicity, prefetch the whole object in one request. - let prefetcher_config = PrefetcherConfig { - first_request_size: OBJECT_SIZE, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new(); let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); block_on(async { let mut request = prefetcher.prefetch(client, "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); @@ -1045,6 +1095,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size: FIRST_REQUEST_SIZE, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1053,10 +1105,7 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: FIRST_REQUEST_SIZE, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new(); let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); @@ -1079,11 +1128,12 @@ mod tests { #[test_case(125, 110; "read in second request")] fn test_backward_seek(first_read_size: usize, part_size: usize) { const OBJECT_SIZE: usize = 200; - const FIRST_REQUEST_SIZE: usize = 100; let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size: part_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1092,10 +1142,7 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: FIRST_REQUEST_SIZE, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new(); let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); // Try every possible seek from first_read_size @@ -1131,16 +1178,18 @@ mod tests { fn sequential_read_stress_helper() { let mut rng = shuttle::rand::thread_rng(); let object_size = rng.gen_range(1u64..1 * 1024 * 1024); - let first_request_size = rng.gen_range(16usize..1 * 1024 * 1024); - let max_request_size = rng.gen_range(16usize..1 * 1024 * 1024); + let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024); let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); + let initial_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1149,14 +1198,11 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size, - max_request_size, - sequential_prefetch_multiplier, - max_forward_seek_wait_distance, - max_backward_seek_distance, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(max_read_window_size) + .sequential_prefetch_multiplier(sequential_prefetch_multiplier) + .max_forward_seek_wait_distance(max_forward_seek_wait_distance) + .max_backward_seek_distance(max_backward_seek_distance); let prefetcher = Prefetcher::new(ClientPartStream::new(ShuttleRuntime), prefetcher_config); let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, file_etag); @@ -1184,20 +1230,22 @@ mod tests { fn random_read_stress_helper() { let mut rng = shuttle::rand::thread_rng(); - let first_request_size = rng.gen_range(16usize..32 * 1024); - let max_request_size = rng.gen_range(16usize..32 * 1024); - // Try to prevent testing very small reads of very large objects, which are easy to OOM - // under Shuttle (lots of concurrent tasks) - let max_object_size = first_request_size.min(max_request_size) * 20; - let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); + let max_read_window_size = rng.gen_range(16usize..32 * 1024); let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..128 * 1024); + let initial_read_window_size = rng.gen_range(16usize..128 * 1024); let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024); let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024); + // Try to prevent testing very small reads of very large objects, which are easy to OOM + // under Shuttle (lots of concurrent tasks) + let max_object_size = initial_read_window_size.min(max_read_window_size) * 20; + let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1206,14 +1254,11 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size, - max_request_size, - sequential_prefetch_multiplier, - max_forward_seek_wait_distance, - max_backward_seek_distance, - ..Default::default() - }; + let prefetcher_config = PrefetcherConfig::new() + .max_read_window_size(max_read_window_size) + .sequential_prefetch_multiplier(sequential_prefetch_multiplier) + .max_forward_seek_wait_distance(max_forward_seek_wait_distance) + .max_backward_seek_distance(max_backward_seek_distance); let prefetcher = Prefetcher::new(ClientPartStream::new(ShuttleRuntime), prefetcher_config); let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, file_etag); diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs new file mode 100644 index 000000000..6183e064e --- /dev/null +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -0,0 +1,151 @@ +use async_channel::{unbounded, Receiver, Sender}; +use tracing::trace; + +use super::PrefetchReadError; + +#[derive(Debug)] +pub struct BackpressureController { + read_window_updater: Sender, + preferred_read_window_size: usize, + max_read_window_size: usize, + read_window_size_multiplier: usize, + read_window_range: u64, + last_request_offset: u64, +} + +#[derive(Debug)] +pub struct BackpressureLimiter { + read_window_incrementing_queue: Receiver, + read_window_range: u64, +} + +/// Creates a [BackpressureController] and its related [BackpressureLimiter]. +/// We use a pair of these to for providing feedback to backpressure stream. +/// +/// [BackpressureLimiter] is used on producer side of the object stream, that is, any +/// [super::part_stream::ObjectPartStream] that support backpressure. The producer can call +/// `wait_for_read_window_increment` to wait for feedback from the consumer. This method +/// could block when they know that the producer requires read window incrementing. +/// +/// [BackpressureController] will be given to the consumer side of the object stream. +/// It can be used anywhere to set preferred read window size for the stream and tell the +/// producer when its read window should be increased. +pub fn new_backpressure_controller( + preferred_read_window_size: usize, + max_read_window_size: usize, + read_window_size_multiplier: usize, + read_window_range: u64, + last_request_offset: u64, +) -> (BackpressureController, BackpressureLimiter) { + let (read_window_updater, read_window_incrementing_queue) = unbounded(); + let controller = BackpressureController { + read_window_updater, + preferred_read_window_size, + max_read_window_size, + read_window_size_multiplier, + read_window_range, + last_request_offset, + }; + let limiter = BackpressureLimiter { + read_window_incrementing_queue, + read_window_range, + }; + (controller, limiter) +} + +impl BackpressureController { + pub fn set_preferred_read_window_size(&mut self, preferred_read_window_size: usize) { + self.preferred_read_window_size = preferred_read_window_size; + } + + pub fn read_window_range(&self) -> u64 { + self.read_window_range + } + + // Try scaling up preferred read window size with a multiplier configured at initialization. + pub fn try_scaling_up(&mut self) { + if self.preferred_read_window_size < self.max_read_window_size { + let new_read_window_size = + (self.preferred_read_window_size * self.read_window_size_multiplier).min(self.max_read_window_size); + trace!( + current_size = self.preferred_read_window_size, + new_size = new_read_window_size, + "scaling up preferred read window" + ); + self.preferred_read_window_size = new_read_window_size; + } + } + + /// Signal the stream producer about the next offset we might want to read from the stream. The backpressure controller + /// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`. + pub async fn next_offset_hint(&mut self, next_offset: u64) -> Result<(), PrefetchReadError> { + let mut remaining_window = self.read_window_range.saturating_sub(next_offset) as usize; + // Increment the read window only if the remaining window reaches some threshold i.e. half of it left. + while remaining_window < (self.preferred_read_window_size / 2) + && self.read_window_range < self.last_request_offset + { + let to_increase = next_offset + .saturating_add(self.preferred_read_window_size as u64) + .saturating_sub(self.read_window_range) as usize; + trace!( + next_offset, + read_window_range = self.read_window_range, + preferred_read_window_size = self.preferred_read_window_size, + to_increase, + "incrementing read window" + ); + self.increment_read_window(to_increase).await?; + self.read_window_range += to_increase as u64; + remaining_window = self.read_window_range.saturating_sub(next_offset) as usize; + } + Ok(()) + } + + // Send an increment read window request to the stream producer + async fn increment_read_window(&self, len: usize) -> Result<(), PrefetchReadError> { + // This should not block since the channel is unbounded + self.read_window_updater + .send(len) + .await + .map_err(|_| PrefetchReadError::ReadWindowIncrement) + } +} + +impl BackpressureLimiter { + pub fn read_window_range(&self) -> u64 { + self.read_window_range + } + + /// Checks if there is enough read window to put the next item with a given offset to the stream. + /// It blocks until receiving enough incrementing read window requests to serve the next part. + /// + /// Returns the new read window offset. + pub async fn wait_for_read_window_increment( + &mut self, + offset: u64, + ) -> Result, PrefetchReadError> { + if self.read_window_range > offset { + if let Ok(len) = self.read_window_incrementing_queue.try_recv() { + self.read_window_range += len as u64; + Ok(Some(self.read_window_range)) + } else { + Ok(None) + } + } else { + // Block until we have enough read window to read the next chunk + while self.read_window_range <= offset { + trace!( + offset, + read_window_range = self.read_window_range, + "blocking for read window increment" + ); + let recv = self.read_window_incrementing_queue.recv().await; + match recv { + Ok(len) => self.read_window_range += len as u64, + Err(_) => return Err(PrefetchReadError::ReadWindowIncrement), + } + } + Ok(Some(self.read_window_range)) + } + } +} diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index ff5a9e5e4..7937290dd 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -4,18 +4,22 @@ use std::{ops::Range, sync::Arc}; use bytes::Bytes; use futures::task::{Spawn, SpawnExt}; use futures::{pin_mut, StreamExt}; -use mountpoint_s3_client::{types::ETag, ObjectClient}; +use mountpoint_s3_client::{types::ETag, types::GetObjectRequest, ObjectClient}; use tracing::{debug_span, trace, warn, Instrument}; use crate::checksums::ChecksummedBytes; use crate::data_cache::{BlockIndex, DataCache}; use crate::object::ObjectId; +use crate::prefetch::backpressure_controller::new_backpressure_controller; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer}; use crate::prefetch::part_stream::{ObjectPartStream, RequestRange}; use crate::prefetch::task::RequestTask; use crate::prefetch::PrefetchReadError; +use super::backpressure_controller::BackpressureLimiter; +use super::task::RequestTaskConfig; + /// [ObjectPartStream] implementation which maintains a [DataCache] for the object data /// retrieved by an [ObjectClient]. #[derive(Debug)] @@ -41,39 +45,41 @@ where fn spawn_get_object_request( &self, client: &Client, - bucket: &str, - key: &str, - if_match: ETag, - range: RequestRange, - _preferred_part_size: usize, + config: RequestTaskConfig, ) -> RequestTask<::ClientError> where Client: ObjectClient + Clone + Send + Sync + 'static, { - let range = range.align(self.cache.block_size(), false); - - let start = range.start(); - let size = range.len(); + let initial_read_window_range = config.range.start() + config.initial_read_window_size as u64; let (part_queue, part_queue_producer) = unbounded_part_queue(); - trace!(?range, "spawning request"); + let (backpressure_controller, backpressure_limiter) = new_backpressure_controller( + config.initial_read_window_size, + config.max_read_window_size, + config.read_window_size_multiplier, + initial_read_window_range, + config.range.end(), + ); + trace!(range=?config.range, "spawning request"); let request_task = { let request = CachingRequest::new( client.clone(), self.cache.clone(), - bucket.to_owned(), - key.to_owned(), - if_match, + config.bucket, + config.key, + config.if_match, part_queue_producer, ); - let span = debug_span!("prefetch", ?range); - request.get_from_cache(range).instrument(span) + let span = debug_span!("prefetch", range=?config.range); + request + .get_from_cache(config.range, initial_read_window_range, backpressure_limiter) + .instrument(span) }; let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); - RequestTask::from_handle(task_handle, size, start, part_queue) + RequestTask::from_handle(task_handle, config.range, part_queue, backpressure_controller) } } @@ -109,7 +115,12 @@ where } } - async fn get_from_cache(self, range: RequestRange) { + async fn get_from_cache( + mut self, + range: RequestRange, + initial_read_window_range: u64, + mut backpressure_limiter: BackpressureLimiter, + ) { let cache_key = &self.cache_key; let block_size = self.cache.block_size(); let block_range = self.block_indices_for_byte_range(&range); @@ -120,13 +131,20 @@ where // request, but since this stream is already behind the prefetcher, the delay is // already likely negligible. let mut block_offset = block_range.start * block_size; + for block_index in block_range.clone() { match self.cache.get_block(cache_key, block_index, block_offset) { Ok(Some(block)) => { trace!(?cache_key, ?range, block_index, "cache hit"); + let part = self.make_part(block, block_index, block_offset, &range); self.part_queue_producer.push(Ok(part)); block_offset += block_size; + + if let Err(e) = backpressure_limiter.wait_for_read_window_increment(block_offset).await { + self.part_queue_producer.push(Err(e)); + break; + } continue; } Ok(None) => trace!(?cache_key, block_index, ?range, "cache miss - no data for block"), @@ -142,14 +160,25 @@ where metrics::counter!("prefetch.blocks_served_from_cache").increment(block_index - block_range.start); metrics::counter!("prefetch.blocks_requested_to_client").increment(block_range.end - block_index); return self - .get_from_client(range.trim_start(block_offset), block_index..block_range.end) + .get_from_client( + range.trim_start(block_offset), + block_index..block_range.end, + initial_read_window_range, + backpressure_limiter, + ) .await; } // We served the whole range from cache. metrics::counter!("prefetch.blocks_served_from_cache").increment(block_range.end - block_range.start); } - async fn get_from_client(&self, range: RequestRange, block_range: Range) { + async fn get_from_client( + &mut self, + range: RequestRange, + block_range: Range, + initial_read_window_range: u64, + mut backpressure_limiter: BackpressureLimiter, + ) { let key = self.cache_key.key(); let block_size = self.cache.block_size(); assert!(block_size > 0); @@ -157,6 +186,9 @@ where // Always request a range aligned with block boundaries (or to the end of the object). let block_aligned_byte_range = (block_range.start * block_size)..(block_range.end * block_size).min(range.object_size() as u64); + let request_len = (block_aligned_byte_range.end - block_aligned_byte_range.start) as usize; + let block_aligned_byte_range = + RequestRange::new(range.object_size(), block_aligned_byte_range.start, request_len); trace!( ?key, @@ -164,97 +196,259 @@ where original_range =? range, "fetching data from client" ); - let get_object_result = match self - .client - .get_object( - &self.bucket, - key, - Some(block_aligned_byte_range), - Some(self.cache_key.etag().clone()), - ) - .await - { - Ok(get_object_result) => get_object_result, - Err(e) => { - warn!(key, error=?e, "GetObject request failed"); - self.part_queue_producer - .push(Err(PrefetchReadError::GetRequestFailed(e))); - return; - } - }; - pin_mut!(get_object_result); let mut block_index = block_range.start; let mut block_offset = block_range.start * block_size; let mut buffer = ChecksummedBytes::default(); - loop { - assert!( - buffer.len() < block_size as usize, - "buffer should be flushed when we get a full block" - ); - match get_object_result.next().await { - Some(Ok((offset, body))) => { - trace!(offset, length = body.len(), "received GetObject part"); - metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); - - let expected_offset = block_offset + buffer.len() as u64; - if offset != expected_offset { - warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); - self.part_queue_producer - .push(Err(PrefetchReadError::GetRequestReturnedWrongOffset { - offset, - expected_offset, - })); - break; - } - // Split the body into blocks. - let mut body: Bytes = body.into(); - while !body.is_empty() { - let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); - let chunk = body.split_to(remaining); - if let Err(e) = buffer.extend(chunk.into()) { - warn!(key, error=?e, "integrity check for body part failed"); - self.part_queue_producer.push(Err(e.into())); - return; - } - if buffer.len() < block_size as usize { + // Start by issuing the first request that has a range up to initial read window range. + // This is an optimization to lower time to first bytes, see more details in ClientPartStream about why this is needed. + let first_req_range = block_aligned_byte_range.trim_end(initial_read_window_range); + if !first_req_range.is_empty() { + let first_request = match self + .client + .get_object( + &self.bucket, + key, + Some(first_req_range.into()), + Some(self.cache_key.etag().clone()), + ) + .await + { + Ok(get_object_result) => get_object_result, + Err(e) => { + warn!(key, error=?e, "GetObject request failed"); + self.part_queue_producer + .push(Err(PrefetchReadError::GetRequestFailed(e))); + return; + } + }; + + pin_mut!(first_request); + // We want the first request to complete without any blocking + first_request.as_mut().increment_read_window(first_req_range.len()); + loop { + assert!( + buffer.len() < block_size as usize, + "buffer should be flushed when we get a full block" + ); + match first_request.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); + + let expected_offset = block_offset + buffer.len() as u64; + if offset != expected_offset { + warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); + self.part_queue_producer + .push(Err(PrefetchReadError::GetRequestReturnedWrongOffset { + offset, + expected_offset, + })); break; } - // We have a full block: write it to the cache, send it to the queue, and flush the buffer. - self.update_cache(block_index, block_offset, &buffer); + let mut body: Bytes = body.into(); + + // Return some bytes to the part queue even before we can fill an entire caching block because we want to + // start the feedback loop for the flow-control window. + // We need to do this because the read window might be enough to fetch "some data" from S3 but not the entire block. + // For example, consider that we got a file system read request with range 2MB to 4MB and we have to start + // reading from block_offset=0 and block_size=5MB. The first read window might have a range up 4MB which is enough + // to serve the read request but if the prefetcher is not able to read anything it cannot tell this stream to + // move its read window. + let part_range = range.trim_start(offset).trim_end(offset + body.len() as u64); + let trim_start = (part_range.start().saturating_sub(offset)) as usize; + let trim_end = (part_range.end().saturating_sub(offset)) as usize; + // Put to the part queue only if returned data is in the actual request range. + if trim_end > trim_start { + let checksum_bytes = ChecksummedBytes::new(body.clone()); + let bytes = checksum_bytes.slice(trim_start..trim_end); + + let part = Part::new(self.cache_key.clone(), part_range.start(), bytes); + self.part_queue_producer.push(Ok(part)); + } + + // Now we can fill the caching blocks. + while !body.is_empty() { + let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); + let chunk = body.split_to(remaining); + if let Err(e) = buffer.extend(chunk.into()) { + warn!(key, error=?e, "integrity check for body part failed"); + self.part_queue_producer.push(Err(e.into())); + return; + } + if buffer.len() < block_size as usize { + break; + } + + // We have a full block: write it to the cache, send it to the queue, and flush the buffer. + self.update_cache(block_index, block_offset, &buffer); + block_index += 1; + block_offset += block_size; + buffer = ChecksummedBytes::default(); + } + } + Some(Err(e)) => { + warn!(key, error=?e, "GetObject body part failed"); self.part_queue_producer - .push(Ok(self.make_part(buffer, block_index, block_offset, &range))); - block_index += 1; - block_offset += block_size; - buffer = ChecksummedBytes::default(); + .push(Err(PrefetchReadError::GetRequestFailed(e))); + break; } + None => break, } - Some(Err(e)) => { - warn!(key, error=?e, "GetObject body part failed"); + } + } + + // After the first request is completed we will create the second request for the rest of the stream, + // but only if there is something left to be fetched. + let range = block_aligned_byte_range.trim_start(initial_read_window_range); + if !range.is_empty() { + // Blocks until there is enough read window and record the new read window offset before creating the request + let next_read_window_offset = match backpressure_limiter.wait_for_read_window_increment(range.start()).await + { + Ok(next) => next, + Err(e) => { + self.part_queue_producer.push(Err(e)); + return; + } + }; + + let get_object_request = match self + .client + .get_object( + &self.bucket, + key, + Some(range.into()), + Some(self.cache_key.etag().clone()), + ) + .await + { + Ok(get_object_request) => get_object_request, + Err(e) => { + warn!(key, error=?e, "GetObject request failed"); self.part_queue_producer .push(Err(PrefetchReadError::GetRequestFailed(e))); - break; + return; } - None => { - if !buffer.is_empty() { - // If we still have data in the buffer, this must be the last block for this object, - // which can be smaller than block_size (and ends at the end of the object). - assert_eq!( - block_offset as usize + buffer.len(), - range.object_size(), - "a partial block is only allowed at the end of the object" - ); - // Write the last block to the cache. - self.update_cache(block_index, block_offset, &buffer); + }; + + pin_mut!(get_object_request); + + // Increment read window if we got a request from the backpressure limiter earlier + if let Some(next_read_window_offset) = next_read_window_offset { + let diff = + next_read_window_offset.saturating_sub(get_object_request.as_ref().read_window_range()) as usize; + get_object_request.as_mut().increment_read_window(diff); + } + + // Current read window might be larger than the client's read window as some bytes were served from the cache, so we need to match them. + // for example, current read window of the caching stream might be 100MB while the first read window of the client is 8MB. + let read_window_size_diff = backpressure_limiter + .read_window_range() + .saturating_sub(get_object_request.as_ref().read_window_range()) + as usize; + get_object_request.as_mut().increment_read_window(read_window_size_diff); + + loop { + assert!( + buffer.len() < block_size as usize, + "buffer should be flushed when we get a full block" + ); + match get_object_request.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); + + let expected_offset = block_offset + buffer.len() as u64; + if offset != expected_offset { + warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); + self.part_queue_producer + .push(Err(PrefetchReadError::GetRequestReturnedWrongOffset { + offset, + expected_offset, + })); + break; + } + + let mut body: Bytes = body.into(); + + // Same with what we do for the first request, return some bytes to the part queue even before we can fill + // an entire caching block because we want to start the feedback loop for the flow-control window. + let part_range = range.trim_start(offset).trim_end(offset + body.len() as u64); + let trim_start = (part_range.start().saturating_sub(offset)) as usize; + let trim_end = (part_range.end().saturating_sub(offset)) as usize; + // Put to the part queue only if returned data is in the actual request range. + if trim_end > trim_start { + let checksum_bytes = ChecksummedBytes::new(body.clone()); + let bytes = checksum_bytes.slice(trim_start..trim_end); + + let part = Part::new(self.cache_key.clone(), part_range.start(), bytes); + self.part_queue_producer.push(Ok(part)); + } + + // Now we can fill the caching blocks. + while !body.is_empty() { + let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); + let chunk = body.split_to(remaining); + if let Err(e) = buffer.extend(chunk.into()) { + warn!(key, error=?e, "integrity check for body part failed"); + self.part_queue_producer.push(Err(e.into())); + return; + } + if buffer.len() < block_size as usize { + break; + } + + // We have a full block: write it to the cache, send it to the queue, and flush the buffer. + self.update_cache(block_index, block_offset, &buffer); + block_index += 1; + block_offset += block_size; + buffer = ChecksummedBytes::default(); + } + + let next_offset = block_offset + buffer.len() as u64; + match backpressure_limiter.wait_for_read_window_increment(next_offset).await { + Ok(next) => { + if let Some(next_read_window_offset) = next { + let diff = next_read_window_offset + .saturating_sub(get_object_request.as_ref().read_window_range()) + as usize; + get_object_request.as_mut().increment_read_window(diff); + } + } + Err(e) => { + self.part_queue_producer.push(Err(e)); + break; + } + } + } + Some(Err(e)) => { + warn!(key, error=?e, "GetObject body part failed"); self.part_queue_producer - .push(Ok(self.make_part(buffer, block_index, block_offset, &range))); + .push(Err(PrefetchReadError::GetRequestFailed(e))); + break; + } + None => { + break; } - break; } } } + + if !buffer.is_empty() { + // If we still have data in the buffer, this must be the last block for this object, + // which can be smaller than block_size (and ends at the end of the object). + assert_eq!( + block_offset as usize + buffer.len(), + range.object_size(), + "a partial block is only allowed at the end of the object" + ); + // Write the last block to the cache. + self.update_cache(block_index, block_offset, &buffer); + self.part_queue_producer + .push(Ok(self.make_part(buffer, block_index, block_offset, &range))); + } trace!("request finished"); } @@ -350,11 +544,18 @@ mod tests { let etag = object.etag(); let id = ObjectId::new(key.to_owned(), object.etag()); + // backpressure config + let initial_read_window_size = 1 * MB; + let max_read_window_size = 64 * MB; + let read_window_size_multiplier = 2; + let cache = InMemoryDataCache::new(block_size as u64); let bucket = "test-bucket"; let config = MockClientConfig { bucket: bucket.to_string(), part_size: client_part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); @@ -367,7 +568,17 @@ mod tests { let first_read_count = { // First request (from client) let get_object_counter = mock_client.new_counter(Operation::GetObject); - let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + let config = RequestTaskConfig { + bucket: bucket.to_owned(), + key: key.to_owned(), + if_match: etag.clone(), + range, + preferred_part_size: 0, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, + }; + let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -376,7 +587,17 @@ mod tests { let second_read_count = { // Second request (from cache) let get_object_counter = mock_client.new_counter(Operation::GetObject); - let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + let config = RequestTaskConfig { + bucket: bucket.to_owned(), + key: key.to_owned(), + if_match: etag, + range, + preferred_part_size: 0, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, + }; + let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -395,11 +616,18 @@ mod tests { let etag = object.etag(); let id = ObjectId::new(key.to_owned(), object.etag()); + // backpressure config + let initial_read_window_size = 1 * MB; + let max_read_window_size = 64 * MB; + let read_window_size_multiplier = 2; + let cache = InMemoryDataCache::new(block_size as u64); let bucket = "test-bucket"; let config = MockClientConfig { bucket: bucket.to_string(), part_size: client_part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); @@ -410,8 +638,17 @@ mod tests { for offset in [0, 512 * KB, 1 * MB, 4 * MB, 9 * MB] { for preferred_size in [1 * KB, 512 * KB, 4 * MB, 12 * MB, 16 * MB] { - let range = RequestRange::new(object_size, offset as u64, preferred_size); - let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + let config = RequestTaskConfig { + bucket: bucket.to_owned(), + key: key.to_owned(), + if_match: etag.clone(), + range: RequestRange::new(object_size, offset as u64, preferred_size), + preferred_part_size: 0, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, + }; + let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); } } diff --git a/mountpoint-s3/src/prefetch/part.rs b/mountpoint-s3/src/prefetch/part.rs index 8542ed173..45e09375b 100644 --- a/mountpoint-s3/src/prefetch/part.rs +++ b/mountpoint-s3/src/prefetch/part.rs @@ -21,6 +21,17 @@ impl Part { } } + pub fn extend(&mut self, other: &Part) -> Result<(), PartMismatchError> { + let expected_offset = self.offset + self.checksummed_bytes.len() as u64; + other.check(&self.id, expected_offset)?; + self.checksummed_bytes + .extend(other.clone().checksummed_bytes) + .map_err(|_| PartMismatchError::Id { + actual: self.id.clone(), + requested: other.id.clone(), + }) + } + pub fn into_bytes(self, id: &ObjectId, offset: u64) -> Result { self.check(id, offset).map(|_| self.checksummed_bytes) } @@ -38,6 +49,10 @@ impl Part { } } + pub(super) fn offset(&self) -> u64 { + self.offset + } + pub(super) fn len(&self) -> usize { self.checksummed_bytes.len() } @@ -71,3 +86,107 @@ pub enum PartMismatchError { #[error("wrong part offset: actual={actual}, requested={requested}")] Offset { actual: u64, requested: u64 }, } + +#[cfg(test)] +mod tests { + use mountpoint_s3_client::types::ETag; + + use crate::{checksums::ChecksummedBytes, object::ObjectId, prefetch::part::PartMismatchError}; + + use super::Part; + + #[test] + fn test_append() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_part_len = 512; + let second_offset = first_offset + first_part_len as u64; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(object_id.clone(), second_offset, checksummed_bytes); + + first.extend(&second).expect("should be able to extend"); + assert_eq!(first_part_len + second_part_len, first.len()); + first.check(&object_id, first_offset).expect("the part should be valid"); + } + + #[test] + fn test_append_with_mismatch_object_id() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_object_id = ObjectId::new("other".to_owned(), ETag::for_tests()); + let second_part_len = 512; + let second_offset = first_offset; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(second_object_id.clone(), second_offset, checksummed_bytes); + + let result = first.extend(&second); + assert!(matches!( + result, + Err(PartMismatchError::Id { + actual: _, + requested: _ + }) + )); + } + + #[test] + fn test_append_with_mismatch_offset() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_part_len = 512; + let second_offset = first_offset; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(object_id.clone(), second_offset, checksummed_bytes); + + let result = first.extend(&second); + assert!(matches!( + result, + Err(PartMismatchError::Offset { + actual: _, + requested: _ + }) + )); + } +} diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index e680babb1..a7b5753ea 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -1,11 +1,12 @@ use std::time::Instant; +use metrics::atomics::AtomicU64; use tracing::trace; use crate::prefetch::part::Part; use crate::prefetch::PrefetchReadError; use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender}; -use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::{Arc, AsyncMutex}; /// A queue of [Part]s where the first part can be partially read from if the reader doesn't want @@ -16,7 +17,7 @@ pub struct PartQueue { receiver: Receiver>>, failed: AtomicBool, /// The total number of bytes sent to the underlying queue of `self.receiver` - bytes_received: Arc, + bytes_received: Arc, } /// Producer side of the queue of [Part]s. @@ -24,13 +25,13 @@ pub struct PartQueue { pub struct PartQueueProducer { sender: Sender>>, /// The total number of bytes sent to `self.sender` - bytes_sent: Arc, + bytes_sent: Arc, } /// Creates an unbounded [PartQueue] and its related [PartQueueProducer]. pub fn unbounded_part_queue() -> (PartQueue, PartQueueProducer) { let (sender, receiver) = unbounded(); - let bytes_counter = Arc::new(AtomicUsize::new(0)); + let bytes_counter = Arc::new(AtomicU64::new(0)); let part_queue = PartQueue { current_part: AsyncMutex::new(None), receiver, @@ -93,7 +94,25 @@ impl PartQueue { Ok(part) } - pub fn bytes_received(&self) -> usize { + /// Push a new [Part] onto the front of the queue + /// which actually just concatenate it with the current part + pub async fn push_front(&self, mut part: Part) { + let mut current_part = self.current_part.lock().await; + + assert!( + !self.failed.load(Ordering::SeqCst), + "cannot use a PartQueue after failure" + ); + + if let Some(current_part) = current_part.as_mut() { + part.extend(current_part).unwrap(); + *current_part = part; + } else { + *current_part = Some(part); + } + } + + pub fn bytes_received(&self) -> u64 { self.bytes_received.load(Ordering::SeqCst) } } @@ -101,7 +120,7 @@ impl PartQueue { impl PartQueueProducer { /// Push a new [Part] onto the back of the queue pub fn push(&self, part: Result>) { - let part_len = part.as_ref().map_or(0, |part| part.len()); + let part_len = part.as_ref().map_or(0, |part| part.len()) as u64; // Unbounded channel will never actually block let send_result = self.sender.send_blocking(part); diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index 43dcdd0d4..e3650b34b 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -3,16 +3,19 @@ use std::{fmt::Debug, ops::Range}; use bytes::Bytes; use futures::task::SpawnExt; use futures::{pin_mut, task::Spawn, StreamExt}; -use mountpoint_s3_client::{types::ETag, ObjectClient}; +use mountpoint_s3_client::{types::GetObjectRequest, ObjectClient}; use tracing::{debug_span, error, trace, Instrument}; use crate::checksums::ChecksummedBytes; use crate::object::ObjectId; +use crate::prefetch::backpressure_controller::new_backpressure_controller; use crate::prefetch::part::Part; use crate::prefetch::part_queue::unbounded_part_queue; use crate::prefetch::task::RequestTask; use crate::prefetch::PrefetchReadError; +use super::task::RequestTaskConfig; + /// A generic interface to retrieve data from objects in a S3-like store. pub trait ObjectPartStream { /// Spawns a request to get the content of an object. The object data will be retrieved in fixed size @@ -21,11 +24,7 @@ pub trait ObjectPartStream { fn spawn_get_object_request( &self, client: &Client, - bucket: &str, - key: &str, - if_match: ETag, - range: RequestRange, - preferred_part_size: usize, + config: RequestTaskConfig, ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static; @@ -166,32 +165,45 @@ where fn spawn_get_object_request( &self, client: &Client, - bucket: &str, - key: &str, - if_match: ETag, - range: RequestRange, - preferred_part_size: usize, + config: RequestTaskConfig, ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static, { - assert!(preferred_part_size > 0); - let request_range = range.align(client.part_size().unwrap_or(8 * 1024 * 1024) as u64, true); - let start = request_range.start(); - let size = request_range.len(); + assert!(config.preferred_part_size > 0); + + let initial_read_window_range = config.range.start() + config.initial_read_window_size as u64; let (part_queue, part_queue_producer) = unbounded_part_queue(); - trace!(range=?request_range, "spawning request"); + let (backpressure_controller, mut backpressure_limiter) = new_backpressure_controller( + config.initial_read_window_size, + config.max_read_window_size, + config.read_window_size_multiplier, + initial_read_window_range, + config.range.end(), + ); + trace!(range=?config.range, "spawning request"); let request_task = { let client = client.clone(); - let bucket = bucket.to_owned(); - let id = ObjectId::new(key.to_owned(), if_match); - let span = debug_span!("prefetch", range=?request_range); + let bucket = config.bucket; + let id = ObjectId::new(config.key, config.if_match); + let span = debug_span!("prefetch", range=?config.range); async move { - let get_object_result = match client - .get_object(&bucket, id.key(), Some(request_range.into()), Some(id.etag().clone())) + // Normally, initial read window size should be very small (~1MB) so that we can serve the first read request as soon as possible, + // but right now the CRT only returns data in chunks of part size (default to 8MB) even if initial read window is smaller than that. + // This makes time to first byte much higher than expected. + // + // To workaround this issue, we instead create two requests for the part stream where the first request has the range exactly equal to + // the initial read window size to force the CRT to return data immediately, and the second request for the rest of the stream. + // From this, our first read window range must be 2x of the initial read window size because we make two requests, each with the same + // initial read window size. + // + // Let's start by issuing the first request with a range trimmed to initial read window size + let first_req_range = config.range.trim_end(initial_read_window_range); + let first_request = match client + .get_object(&bucket, id.key(), Some(first_req_range.into()), Some(id.etag().clone())) .await { Ok(get_object_result) => get_object_result, @@ -202,9 +214,11 @@ where } }; - pin_mut!(get_object_result); + pin_mut!(first_request); + // Make there is enough read window because we don't want to block this request at all + first_request.as_mut().increment_read_window(first_req_range.len()); loop { - match get_object_result.next().await { + match first_request.next().await { Some(Ok((offset, body))) => { trace!(offset, length = body.len(), "received GetObject part"); metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); @@ -213,7 +227,7 @@ where let mut body: Bytes = body.into(); let mut curr_offset = offset; loop { - let chunk_size = preferred_part_size.min(body.len()); + let chunk_size = config.preferred_part_size.min(body.len()); if chunk_size == 0 { break; } @@ -234,6 +248,91 @@ where None => break, } } + + // After the first request is completed we will create the second request for the rest of the stream, + // but only if there is something left to be fetched. + let range = config.range.trim_start(initial_read_window_range); + if !range.is_empty() { + // First, blocks until there is enough read window and record the new read window offset before creating the request + let next_read_window_offset = + match backpressure_limiter.wait_for_read_window_increment(range.start()).await { + Ok(next) => next, + Err(e) => { + part_queue_producer.push(Err(e)); + return; + } + }; + + let request = match client + .get_object(&bucket, id.key(), Some(range.into()), Some(id.etag().clone())) + .await + { + Ok(get_object_result) => get_object_result, + Err(e) => { + error!(key=id.key(), error=?e, "GetObject request failed"); + part_queue_producer.push(Err(PrefetchReadError::GetRequestFailed(e))); + return; + } + }; + + pin_mut!(request); + // Increment read window if we got a request from the backpressure limiter earlier + if let Some(next_read_window_offset) = next_read_window_offset { + let diff = + next_read_window_offset.saturating_sub(request.as_ref().read_window_range()) as usize; + request.as_mut().increment_read_window(diff); + } + + loop { + match request.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + metrics::counter!("s3.client.total_bytes", "type" => "read") + .increment(body.len() as u64); + // pre-split the body into multiple parts as suggested by preferred part size + // in order to avoid validating checksum on large parts at read. + let mut body: Bytes = body.into(); + let mut curr_offset = offset; + loop { + let chunk_size = config.preferred_part_size.min(body.len()); + if chunk_size == 0 { + break; + } + let chunk = body.split_to(chunk_size); + // S3 doesn't provide checksum for us if the request range is not aligned to + // object part boundaries, so we're computing our own checksum here. + let checksum_bytes = ChecksummedBytes::new(chunk); + let part = Part::new(id.clone(), curr_offset, checksum_bytes); + curr_offset += part.len() as u64; + part_queue_producer.push(Ok(part)); + } + + // Blocks if read window increment if it's not enough to read the next offset + let next_offset = curr_offset; + match backpressure_limiter.wait_for_read_window_increment(next_offset).await { + Ok(next) => { + if let Some(next_read_window_offset) = next { + let diff = next_read_window_offset + .saturating_sub(request.as_ref().read_window_range()) + as usize; + request.as_mut().increment_read_window(diff); + } + } + Err(e) => { + part_queue_producer.push(Err(e)); + break; + } + } + } + Some(Err(e)) => { + error!(key=id.key(), error=?e, "GetObject body part failed"); + part_queue_producer.push(Err(PrefetchReadError::GetRequestFailed(e))); + break; + } + None => break, + } + } + } trace!("request finished"); } .instrument(span) @@ -241,7 +340,7 @@ where let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); - RequestTask::from_handle(task_handle, size, start, part_queue) + RequestTask::from_handle(task_handle, config.range, part_queue, backpressure_controller) } } diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index c55ebf532..551ac214d 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -1,65 +1,98 @@ use futures::future::RemoteHandle; +use mountpoint_s3_client::types::ETag; use crate::prefetch::part::Part; -use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; +use crate::prefetch::part_queue::PartQueue; use crate::prefetch::PrefetchReadError; +use super::backpressure_controller::BackpressureController; +use super::part_stream::RequestRange; + +pub struct RequestTaskConfig { + pub(super) bucket: String, + pub(super) key: String, + pub(super) if_match: ETag, + pub(super) range: RequestRange, + pub(super) preferred_part_size: usize, + pub(super) initial_read_window_size: usize, + pub(super) max_read_window_size: usize, + pub(super) read_window_size_multiplier: usize, +} + /// A single GetObject request submitted to the S3 client #[derive(Debug)] pub struct RequestTask { /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if /// the request is fake (created by seeking backwards in the stream) - task_handle: Option>, + _task_handle: RemoteHandle<()>, remaining: usize, - start_offset: u64, - total_size: usize, + range: RequestRange, part_queue: PartQueue, + backpressure_controller: BackpressureController, } impl RequestTask { - pub fn from_handle(task_handle: RemoteHandle<()>, size: usize, offset: u64, part_queue: PartQueue) -> Self { + pub fn from_handle( + task_handle: RemoteHandle<()>, + range: RequestRange, + part_queue: PartQueue, + backpressure_controller: BackpressureController, + ) -> Self { Self { - task_handle: Some(task_handle), - remaining: size, - start_offset: offset, - total_size: size, + _task_handle: task_handle, + remaining: range.len(), + range, part_queue, + backpressure_controller, } } - pub fn from_parts(parts: impl IntoIterator, offset: u64) -> Self { - let mut size = 0; - let (part_queue, part_queue_producer) = unbounded_part_queue(); - for part in parts { - size += part.len(); - part_queue_producer.push(Ok(part)); - } - Self { - task_handle: None, - remaining: size, - start_offset: offset, - total_size: size, - part_queue, + // Push a given list of parts in front of the part queue + pub async fn push_front(&mut self, mut parts: Vec) -> Result<(), PrefetchReadError> { + // Merge all parts into one single part. + // This could result in a really big part, but we normally use this only for backward seek + // so its size should not be bigger than the prefetcher's `max_backward_seek_distance`. + let part = parts.iter_mut().reduce(|acc, e| { + acc.extend(e).unwrap(); + acc + }); + if let Some(part) = part { + self.remaining += part.len(); + self.part_queue.push_front(part.clone()).await; } + Ok(()) } pub async fn read(&mut self, length: usize) -> Result> { + tracing::trace!(length, "read"); let part = self.part_queue.read(length).await?; debug_assert!(part.len() <= self.remaining); self.remaining -= part.len(); + + // We read some data out of the part queue so the read window should be moved + let next_offset = part.offset() + part.len() as u64; + self.backpressure_controller.next_offset_hint(next_offset).await?; + + let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize; + // If the part queue is empty it means we are reading faster than the task could prefetch, + // so we should use larger window for the task. + if remaining_in_queue == 0 { + self.backpressure_controller.try_scaling_up(); + } + Ok(part) } pub fn start_offset(&self) -> u64 { - self.start_offset + self.range.start() } pub fn end_offset(&self) -> u64 { - self.start_offset + self.total_size as u64 + self.range.end() } pub fn total_size(&self) -> usize { - self.total_size + self.range.len() } pub fn remaining(&self) -> usize { @@ -68,12 +101,10 @@ impl RequestTask { /// Maximum offset which data is known to be already in the `self.part_queue` pub fn available_offset(&self) -> u64 { - self.start_offset + self.part_queue.bytes_received() as u64 + self.start_offset() + self.part_queue.bytes_received() } - /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and - /// shouldn't be counted for prefetcher progress. - pub fn is_streaming(&self) -> bool { - self.task_handle.is_some() + pub fn read_window_range(&self) -> u64 { + self.backpressure_controller.read_window_range() } } diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index aa8290fe8..87399531f 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -49,6 +49,7 @@ pub type TestClientBox = Box; pub struct TestSessionConfig { pub part_size: usize, + pub initial_read_window_size: usize, pub filesystem_config: S3FilesystemConfig, pub prefetcher_config: PrefetcherConfig, pub auth_config: S3ClientAuthConfig, @@ -56,8 +57,10 @@ pub struct TestSessionConfig { impl Default for TestSessionConfig { fn default() -> Self { + let part_size = 8 * 1024 * 1024; Self { - part_size: 8 * 1024 * 1024, + part_size, + initial_read_window_size: part_size, filesystem_config: Default::default(), prefetcher_config: Default::default(), auth_config: Default::default(), @@ -125,6 +128,8 @@ pub mod mock_session { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: test_config.part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(client_config)); @@ -162,6 +167,8 @@ pub mod mock_session { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: test_config.part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(client_config)); @@ -284,7 +291,9 @@ pub mod s3_session { let client_config = S3ClientConfig::default() .part_size(test_config.part_size) .endpoint_config(EndpointConfig::new(®ion)) - .auth_config(test_config.auth_config); + .auth_config(test_config.auth_config) + .read_backpressure(true) + .initial_read_window(test_config.initial_read_window_size); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); let prefetcher = default_prefetch(runtime, test_config.prefetcher_config); @@ -316,7 +325,9 @@ pub mod s3_session { let client_config = S3ClientConfig::default() .part_size(test_config.part_size) - .endpoint_config(EndpointConfig::new(®ion)); + .endpoint_config(EndpointConfig::new(®ion)) + .read_backpressure(true) + .initial_read_window(test_config.initial_read_window_size); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index cfcb03059..52969e433 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -35,6 +35,8 @@ pub fn make_test_filesystem( let client_config = MockClientConfig { bucket: bucket.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index aafade057..b7b606bcd 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -733,6 +733,8 @@ async fn test_upload_aborted_on_write_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -810,6 +812,8 @@ async fn test_upload_aborted_on_fsync_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -872,6 +876,8 @@ async fn test_upload_aborted_on_release_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -1487,7 +1493,9 @@ async fn test_readdir_rewind_with_local_files_only() { async fn test_lookup_404_not_an_error() { let name = "test_lookup_404_not_an_error"; let (bucket, prefix) = get_test_bucket_and_prefix(name); - let client_config = S3ClientConfig::default().endpoint_config(EndpointConfig::new(&get_test_region())); + let client_config = S3ClientConfig::default() + .endpoint_config(EndpointConfig::new(&get_test_region())) + .read_backpressure(true); let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); let fs = make_test_filesystem_with_client( client, @@ -1521,7 +1529,8 @@ async fn test_lookup_forbidden() { let auth_config = get_crt_client_auth_config(get_scoped_down_credentials(&policy).await); let client_config = S3ClientConfig::default() .auth_config(auth_config) - .endpoint_config(EndpointConfig::new(&get_test_region())); + .endpoint_config(EndpointConfig::new(&get_test_region())) + .read_backpressure(true); let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); // create an empty file diff --git a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs index c7ece49af..4f47571d4 100644 --- a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs +++ b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs @@ -1,6 +1,5 @@ use fuser::BackgroundSession; use mountpoint_s3::data_cache::InMemoryDataCache; -use mountpoint_s3::prefetch::PrefetcherConfig; use std::fs::{File, OpenOptions}; use std::io::Read; use tempfile::TempDir; @@ -65,30 +64,32 @@ fn read_test_mock_with_cache(object_size: usize) { ); } -/// test for checking either prefetching fails or read original object when object is mutated during read. -/// Prefetching of next request occurs when more than half of the current request is being read. -/// So, when we read the first block, it prefetches the requests ti require to fulfill and the next request -/// depending on size of last request. -/// If object is mutated, E-Tag for the new prefetch request will change and hence the request will fail giving IO error. -fn prefetch_test_etag(creator_fn: F, prefix: &str, request_size: usize, read_size: usize) -where +/// Test for checking either prefetching fails or read original object when object is mutated during read. +/// Prefetching of next read window occurs when more than half of the current window is being read. +/// When we read the first block, it prefetches the data with a window size enough to fulfill the request +/// then increase the window size when needed. +/// If object is mutated, reading a part from the next read window would fail from pre-condition (ETag) error. +fn prefetch_test_etag( + creator_fn: F, + prefix: &str, + part_size: usize, + initial_read_window_size: usize, + read_size: usize, +) where F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), { - const OBJECT_SIZE: usize = 1024 * 1024; - - let prefetcher_config = PrefetcherConfig { - first_request_size: request_size, - ..Default::default() - }; - + // Object needs to be larger than part size because the CRT returns data in chunks of part size, + // we would not be able to see the failures if it's smaller. + let object_size = part_size * 2; let (mount_point, _session, mut test_client) = creator_fn( prefix, TestSessionConfig { - prefetcher_config, + part_size, + initial_read_window_size, ..Default::default() }, ); - let original_data_buf = vec![0u8; OBJECT_SIZE]; + let original_data_buf = vec![0u8; object_size]; test_client.put_object("dir/hello.txt", &original_data_buf).unwrap(); @@ -104,7 +105,7 @@ where .expect("Should be able to read file to buf"); // changed the value of data buf to distinguish it from previous data of the object. - let final_data_buf = vec![255u8; OBJECT_SIZE]; + let final_data_buf = vec![255u8; object_size]; test_client.put_object("dir/hello.txt", &final_data_buf).unwrap(); let mut dest_buf = vec![0u8; read_size]; @@ -149,11 +150,13 @@ where #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { +fn prefetch_test_etag_mock(initial_read_window_size: usize, read_size: usize) { + let part_size = 256 * 1024; prefetch_test_etag( fuse::mock_session::new, "prefetch_test_etag_mock", - request_size, + part_size, + initial_read_window_size, read_size, ); } @@ -162,11 +165,13 @@ fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { +fn prefetch_test_etag_mock_with_cache(initial_read_window_size: usize, read_size: usize) { + let part_size = 256 * 1024; prefetch_test_etag( fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_mock", - request_size, + part_size, + initial_read_window_size, read_size, ); } @@ -174,22 +179,31 @@ fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] -#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_s3(request_size: usize, read_size: usize) { - prefetch_test_etag(fuse::s3_session::new, "prefetch_test_etag_s3", request_size, read_size); +fn prefetch_test_etag_s3(initial_read_window_size: usize, read_size: usize) { + let part_size = 8 * 1024 * 1024; + prefetch_test_etag( + fuse::s3_session::new, + "prefetch_test_etag_s3", + part_size, + initial_read_window_size, + read_size, + ); } #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] -#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_s3_with_cache(request_size: usize, read_size: usize) { +fn prefetch_test_etag_s3_with_cache(initial_read_window_size: usize, read_size: usize) { + let part_size = 8 * 1024 * 1024; prefetch_test_etag( fuse::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_s3", - request_size, + part_size, + initial_read_window_size, read_size, ); } diff --git a/mountpoint-s3/tests/mock_s3_tests.rs b/mountpoint-s3/tests/mock_s3_tests.rs index cdf17bb0d..4aaa95dfa 100644 --- a/mountpoint-s3/tests/mock_s3_tests.rs +++ b/mountpoint-s3/tests/mock_s3_tests.rs @@ -134,6 +134,7 @@ fn create_fs_with_mock_s3(bucket: &str) -> (TestS3Filesystem, MockS let client_config = S3ClientConfig::default() .endpoint_config(endpoint_config) .auth_config(S3ClientAuthConfig::NoSigning) + .read_backpressure(true) .max_attempts(NonZeroUsize::new(3).unwrap()); // retry S3 request 3 times (which equals the existing default) let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); (