Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions guide/samples/tests/storage/striped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,13 @@ async fn download(
let start = std::time::Instant::now();

// ANCHOR: compute-stripes
let limit = stripe_size as i64;
let count = metadata.size / limit;
let size = metadata.size as u64;
let limit = stripe_size as u64;
let count = size / limit;
let mut stripes = (0..count)
.map(|i| write_stripe(client.clone(), &file, i * limit, limit, &metadata))
.collect::<Vec<_>>();
if metadata.size % limit != 0 {
if size % limit != 0 {
stripes.push(write_stripe(
client.clone(),
&file,
Expand Down Expand Up @@ -164,15 +165,16 @@ async fn download(
async fn write_stripe(
client: Storage,
file: &tokio::fs::File,
offset: i64,
limit: i64,
offset: u64,
limit: u64,
metadata: &Object,
) -> anyhow::Result<()> {
use google_cloud_storage::model_ext::ReadRange;
use tokio::io::AsyncSeekExt;
// ANCHOR_END: write-stripe-function
// ANCHOR: write-stripe-seek
let mut writer = file.try_clone().await?;
writer.seek(std::io::SeekFrom::Start(offset as u64)).await?;
writer.seek(std::io::SeekFrom::Start(offset)).await?;
// ANCHOR_END: write-stripe-seek
// ANCHOR: write-stripe-reader
use google_cloud_storage::ReadObjectResponse;
Expand All @@ -183,8 +185,7 @@ async fn write_stripe(
.set_generation(metadata.generation)
// ANCHOR_END: write-stripe-reader-generation
// ANCHOR: write-stripe-reader-range
.set_read_offset(offset)
.set_read_limit(limit)
.set_read_range(ReadRange::segment(offset, limit))
// ANCHOR_END: write-stripe-reader-range
// ANCHOR: write-stripe-reader
.send()
Expand Down
86 changes: 86 additions & 0 deletions src/integration-tests/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,92 @@ async fn abort_upload_buffered(client: storage::client::Storage, bucket_name: &s
Ok(())
}

pub async fn ranged_reads(
builder: storage::builder::storage::ClientBuilder,
bucket_name: &str,
) -> Result<()> {
// Enable a basic subscriber. Useful to troubleshoot problems and visually
// verify tracing is doing something.
#[cfg(feature = "log-integration-tests")]
let _guard = {
use tracing_subscriber::fmt::format::FmtSpan;
let subscriber = tracing_subscriber::fmt()
.with_level(true)
.with_thread_ids(true)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.finish();

tracing::subscriber::set_default(subscriber)
};

tracing::info!("ranged reads test, using bucket {bucket_name}");
let client = builder.build().await?;
const VEXING: &str = "how vexingly quick daft zebras jump";

let object = client
.write_object(bucket_name, "ranged_reads", VEXING)
.set_if_generation_match(0)
.send_unbuffered()
.await?;
tracing::info!("created object {object:?}");

use storage::model_ext::ReadRange;
let want = VEXING.as_bytes();
tracing::info!("running with ReadRange::head");
let response = client
.read_object(&object.bucket, &object.name)
.set_generation(object.generation)
.set_read_range(ReadRange::head(4))
.send()
.await?;
let got = read_all(response).await?;
assert_eq!(&got, &want[0..4]);

tracing::info!("running with ReadRange::tail");
let response = client
.read_object(&object.bucket, &object.name)
.set_generation(object.generation)
.set_read_range(ReadRange::tail(4))
.send()
.await?;
let got = read_all(response).await?;
assert_eq!(&got, &want[(VEXING.len() - 4)..]);

tracing::info!("running with ReadRange::offset");
let response = client
.read_object(&object.bucket, &object.name)
.set_generation(object.generation)
.set_read_range(ReadRange::offset(4))
.send()
.await?;
let got = read_all(response).await?;
assert_eq!(&got, &want[4..]);

tracing::info!("running with ReadRange::segment");
let response = client
.read_object(&object.bucket, &object.name)
.set_generation(object.generation)
.set_read_range(ReadRange::segment(4, 4))
.send()
.await?;
let got = read_all(response).await?;
assert_eq!(&got, &want[4..8]);

tracing::info!("DONE");
Ok(())
}

async fn read_all<R>(mut response: R) -> Result<Vec<u8>>
where
R: storage::ReadObjectResponse,
{
let mut contents = Vec::new();
while let Some(b) = response.next().await.transpose()? {
contents.extend_from_slice(&b);
}
Ok(contents)
}

pub async fn checksums(
builder: storage::builder::storage::ClientBuilder,
bucket_name: &str,
Expand Down
13 changes: 13 additions & 0 deletions src/integration-tests/tests/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ mod driver {
result
}

#[test_case(Storage::builder(); "default")]
#[tokio::test]
async fn run_storage_ranged_reads(
builder: storage::builder::storage::ClientBuilder,
) -> integration_tests::Result<()> {
let (control, bucket) = integration_tests::storage::create_test_bucket().await?;
let result = integration_tests::storage::ranged_reads(builder, &bucket.name)
.await
.map_err(integration_tests::report_error);
let _ = storage_samples::cleanup_bucket(control, bucket.name).await;
result
}

#[test_case(Storage::builder(); "default")]
#[tokio::test]
async fn run_storage_object_names(
Expand Down
12 changes: 0 additions & 12 deletions src/storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,6 @@ pub enum KeyAes256Error {
InvalidLength,
}

/// Represents an error that can occur when invalid range is specified.
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum RangeError {
/// The provided read limit was negative.
#[error("read limit was negative, expected non-negative value.")]
NegativeLimit,
/// A negative offset was provided with a read limit.
#[error("negative read offsets cannot be used with read limits.")]
NegativeOffsetWithLimit,
}

/// Represents an error that can occur when reading response data.
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
Expand Down
Loading
Loading