Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion guide/samples/tests/storage/quickstart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub async fn quickstart(project_id: &str, bucket_id: &str) -> anyhow::Result<()>
// ANCHOR_END: upload

// ANCHOR: download
use google_cloud_storage::read_object::ReadObjectResponse;
let mut reader = client.read_object(&bucket.name, "hello.txt").send().await?;
let mut contents = Vec::new();
while let Some(chunk) = reader.next().await.transpose()? {
Expand Down
1 change: 0 additions & 1 deletion guide/samples/tests/storage/striped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ async fn write_stripe(
writer.seek(std::io::SeekFrom::Start(offset)).await?;
// ANCHOR_END: write-stripe-seek
// ANCHOR: write-stripe-reader
use google_cloud_storage::read_object::ReadObjectResponse;
let mut reader = client
.read_object(&metadata.bucket, &metadata.name)
// ANCHOR_END: write-stripe-reader
Expand Down
5 changes: 1 addition & 4 deletions src/integration-tests/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,10 +856,7 @@ pub async fn ranged_reads(
Ok(())
}

async fn read_all<R>(mut response: R) -> Result<Vec<u8>>
where
R: ReadObjectResponse,
{
async fn read_all(mut response: ReadObjectResponse) -> Result<Vec<u8>> {
let mut contents = Vec::new();
while let Some(b) = response.next().await.transpose()? {
contents.extend_from_slice(&b);
Expand Down
1 change: 0 additions & 1 deletion src/storage/examples/src/objects/download_byte_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// [START storage_download_byte_range]
use google_cloud_storage::client::Storage;
use google_cloud_storage::model_ext::ReadRange;
use google_cloud_storage::read_object::ReadObjectResponse;

pub async fn sample(
client: &Storage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// [START storage_download_encrypted_file]
use google_cloud_storage::client::Storage;
use google_cloud_storage::model_ext::KeyAes256;
use google_cloud_storage::read_object::ReadObjectResponse;

pub async fn sample(
client: &Storage,
Expand Down
1 change: 0 additions & 1 deletion src/storage/examples/src/objects/stream_file_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

// [START storage_stream_file_download]
use google_cloud_storage::client::Storage;
use google_cloud_storage::read_object::ReadObjectResponse;

pub async fn sample(client: &Storage, bucket_id: &str) -> anyhow::Result<()> {
const NAME: &str = "object-to-download.txt";
Expand Down
53 changes: 43 additions & 10 deletions src/storage/src/read_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@ use crate::model_ext::ObjectHighlights;
#[cfg(feature = "unstable-stream")]
use futures::Stream;

mod sealed {
pub trait ReadObjectResponse {}
/// The result of a `ReadObject` request.
///
/// Objects can be large, and must be returned as a stream of bytes. This struct
/// also provides an accessor to retrieve the object's metadata.
#[derive(Debug)]
pub struct ReadObjectResponse {
inner: Box<dyn dynamic::ReadObjectResponse + Send>,
}

impl<T> sealed::ReadObjectResponse for T where T: ReadObjectResponse {}
impl ReadObjectResponse {
pub(crate) fn new<T>(inner: Box<T>) -> Self
where
T: dynamic::ReadObjectResponse + Send + 'static,
{
Self { inner }
}

/// A trait representing the interface to read an object
pub trait ReadObjectResponse: sealed::ReadObjectResponse + std::fmt::Debug {
/// Get the highlights of the object metadata included in the
/// response.
///
Expand All @@ -36,7 +45,6 @@ pub trait ReadObjectResponse: sealed::ReadObjectResponse + std::fmt::Debug {
/// ```
/// # use google_cloud_storage::client::Storage;
/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
/// use google_cloud_storage::read_object::ReadObjectResponse;
/// let object = client
/// .read_object("projects/_/buckets/my-bucket", "my-object")
/// .send()
Expand All @@ -48,7 +56,9 @@ pub trait ReadObjectResponse: sealed::ReadObjectResponse + std::fmt::Debug {
/// println!("object content encoding={}", object.content_encoding);
/// # Ok(()) }
/// ```
fn object(&self) -> ObjectHighlights;
pub fn object(&self) -> ObjectHighlights {
self.inner.object()
}

/// Stream the next bytes of the object.
///
Expand All @@ -58,7 +68,6 @@ pub trait ReadObjectResponse: sealed::ReadObjectResponse + std::fmt::Debug {
/// ```
/// # use google_cloud_storage::client::Storage;
/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
/// use google_cloud_storage::read_object::ReadObjectResponse;
/// let mut resp = client
/// .read_object("projects/_/buckets/my-bucket", "my-object")
/// .send()
Expand All @@ -68,10 +77,34 @@ pub trait ReadObjectResponse: sealed::ReadObjectResponse + std::fmt::Debug {
/// }
/// # Ok(()) }
/// ```
fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes>>> + Send;
pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
self.inner.next().await
}

#[cfg(feature = "unstable-stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
/// Convert the response to a [Stream].
fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin;
pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
use futures::stream::unfold;
Box::pin(unfold(Some(self), move |state| async move {
if let Some(mut this) = state {
if let Some(chunk) = this.next().await {
return Some((chunk, Some(this)));
}
};
None
}))
}
}

pub(crate) mod dynamic {
use crate::Result;
use crate::model_ext::ObjectHighlights;

/// A trait representing the interface to read an object
#[async_trait::async_trait]
pub trait ReadObjectResponse: std::fmt::Debug {
fn object(&self) -> ObjectHighlights;
async fn next(&mut self) -> Option<Result<bytes::Bytes>>;
}
}
1 change: 0 additions & 1 deletion src/storage/src/storage/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ impl Storage {
/// ```
/// # use google_cloud_storage::client::Storage;
/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
/// use google_cloud_storage::read_object::ReadObjectResponse;
/// let mut resp = client
/// .read_object("projects/_/buckets/my-bucket", "my-object")
/// .send()
Expand Down
49 changes: 13 additions & 36 deletions src/storage/src/storage/read_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ use crate::read_object::ReadObjectResponse;
use crate::read_resume_policy::ReadResumePolicy;
use crate::storage::checksum::details::{Checksum, Crc32c, Md5, validate};
use base64::Engine;
#[cfg(feature = "unstable-stream")]
use futures::Stream;
use serde_with::DeserializeAs;

/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
///
/// # Example: accumulate the contents of an object into a vector
/// ```
/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
/// use google_cloud_storage::read_object::ReadObjectResponse;
/// async fn sample(client: &Storage) -> anyhow::Result<()> {
/// let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
/// let mut reader = builder.send().await?;
Expand All @@ -48,7 +45,6 @@ use serde_with::DeserializeAs;
/// ```
/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
/// use google_cloud_storage::model_ext::ReadRange;
/// use google_cloud_storage::read_object::ReadObjectResponse;
/// async fn sample(client: &Storage) -> anyhow::Result<()> {
/// const MIB: u64 = 1024 * 1024;
/// let mut contents = Vec::new();
Expand Down Expand Up @@ -107,7 +103,6 @@ impl ReadObject {
/// ```
/// # use google_cloud_storage::client::Storage;
/// # async fn sample(client: &Storage) -> anyhow::Result<()> {
/// use google_cloud_storage::read_object::ReadObjectResponse;
/// let builder = client
/// .read_object("projects/_/buckets/my-bucket", "my-object")
/// .compute_md5();
Expand Down Expand Up @@ -361,9 +356,10 @@ impl ReadObject {
}

/// Sends the request.
pub async fn send(self) -> Result<impl ReadObjectResponse> {
pub async fn send(self) -> Result<ReadObjectResponse> {
let read = self.clone().read().await?;
ReadObjectResponseImpl::new(self, read)
let inner = ReadObjectResponseImpl::new(self, read)?;
Ok(ReadObjectResponse::new(Box::new(inner)))
}

async fn read(self) -> Result<reqwest::Response> {
Expand Down Expand Up @@ -570,41 +566,21 @@ impl ReadObjectResponseImpl {
}
}

impl ReadObjectResponse for ReadObjectResponseImpl {
#[async_trait::async_trait]
impl crate::read_object::dynamic::ReadObjectResponse for ReadObjectResponseImpl {
fn object(&self) -> ObjectHighlights {
self.highlights.clone()
}

// A type-checking cycle is detected with `async fn` when its return type
// depends on an opaque type that is defined within the function body.
// Writing out `impl Future` breaks this cycle, allowing the compiler to
// resolve the return type and proceed.
#[allow(clippy::manual_async_fn)]
fn next(&mut self) -> impl Future<Output = Option<Result<bytes::Bytes>>> + Send {
async move {
match self.next_attempt().await {
None => None,
Some(Ok(b)) => Some(Ok(b)),
// Recursive async requires pin:
// https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html
Some(Err(e)) => Box::pin(self.resume(e)).await,
}
async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
match self.next_attempt().await {
None => None,
Some(Ok(b)) => Some(Ok(b)),
// Recursive async requires pin:
// https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html
Some(Err(e)) => Box::pin(self.resume(e)).await,
}
}

#[cfg(feature = "unstable-stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
use futures::stream::unfold;
Box::pin(unfold(Some(self), move |state| async move {
if let Some(mut this) = state {
if let Some(chunk) = this.next().await {
return Some((chunk, Some(this)));
}
};
None
}))
}
}

impl ReadObjectResponseImpl {
Expand Down Expand Up @@ -641,6 +617,7 @@ impl ReadObjectResponseImpl {
}

async fn resume(&mut self, error: Error) -> Option<Result<bytes::Bytes>> {
use crate::read_object::dynamic::ReadObjectResponse;
use crate::read_resume_policy::{ResumeQuery, ResumeResult};

// The existing read is no longer valid.
Expand Down
1 change: 0 additions & 1 deletion src/storage/src/storage/read_object/resume_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

use crate::model_ext::ReadRange;
use crate::{
read_object::ReadObjectResponse,
read_resume_policy::{ReadResumePolicyExt, Recommended},
storage::client::tests::{
MockBackoffPolicy, MockReadResumePolicy, MockRetryPolicy, MockRetryThrottler, test_builder,
Expand Down
1 change: 0 additions & 1 deletion src/w1r3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use google_cloud_gax::retry_policy::{RetryPolicy, RetryPolicyExt};
use google_cloud_storage::Result as StorageResult;
use google_cloud_storage::client::{Storage, StorageControl};
use google_cloud_storage::model::Object;
use google_cloud_storage::read_object::ReadObjectResponse;
use google_cloud_storage::retry_policy::RetryableErrors;
use humantime::parse_duration;
use instrumented_future::Instrumented;
Expand Down
Loading