Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/storage/src/model_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,11 @@ enum Range {
Segment { offset: u64, limit: u64 },
}

/// Represents the parameters of a `WriteObject` request
/// Represents the parameters of a `WriteObject` request for use in mocks
#[derive(Debug, PartialEq)]
#[non_exhaustive]
// TODO(#2041) - make public
#[allow(dead_code)]
pub(crate) struct WriteObjectRequest {
pub struct WriteObjectRequest {
pub spec: crate::model::WriteObjectSpec,
pub params: Option<crate::model::CommonObjectRequestParams>,
}
Expand Down
18 changes: 14 additions & 4 deletions src/storage/src/storage/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ use std::sync::Arc;
/// [Private Google Access with VPC Service Controls]: https://cloud.google.com/vpc-service-controls/docs/private-connectivity
/// [Application Default Credentials]: https://cloud.google.com/docs/authentication#adc
#[derive(Clone, Debug)]
pub struct Storage {
stub: std::sync::Arc<crate::storage::transport::Storage>,
pub struct Storage<S = crate::storage::transport::Storage>
where
S: crate::storage::stub::Storage + 'static,
{
stub: std::sync::Arc<S>,
options: RequestOptions,
}

Expand All @@ -112,7 +115,12 @@ impl Storage {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
}

impl<S> Storage<S>
where
S: crate::storage::stub::Storage + 'static,
{
/// Write an object using a local buffer.
///
/// If the data source does **not** implement [Seek] the client library must
Expand Down Expand Up @@ -158,7 +166,7 @@ impl Storage {
/// * `payload` - the object data.
///
/// [Seek]: crate::streaming_source::Seek
pub fn write_object<B, O, T, P>(&self, bucket: B, object: O, payload: T) -> WriteObject<P>
pub fn write_object<B, O, T, P>(&self, bucket: B, object: O, payload: T) -> WriteObject<P, S>
where
B: Into<String>,
O: Into<String>,
Expand Down Expand Up @@ -195,14 +203,16 @@ impl Storage {
/// * `bucket` - the bucket name containing the object. In
/// `projects/_/buckets/{bucket_id}` format.
/// * `object` - the object name.
pub fn read_object<B, O>(&self, bucket: B, object: O) -> ReadObject
pub fn read_object<B, O>(&self, bucket: B, object: O) -> ReadObject<S>
where
B: Into<String>,
O: Into<String>,
{
ReadObject::new(self.stub.clone(), bucket, object, self.options.clone())
}
}

impl Storage {
pub(crate) fn new(builder: ClientBuilder) -> gax::client_builder::Result<Self> {
use gax::client_builder::Error;
let client = reqwest::Client::builder()
Expand Down
15 changes: 10 additions & 5 deletions src/storage/src/storage/read_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,21 @@ use serde_with::DeserializeAs;
/// }
/// ```
#[derive(Clone, Debug)]
pub struct ReadObject {
stub: std::sync::Arc<crate::storage::transport::Storage>,
pub struct ReadObject<S = crate::storage::transport::Storage>
where
S: crate::storage::stub::Storage + 'static,
{
stub: std::sync::Arc<S>,
request: crate::model::ReadObjectRequest,
options: RequestOptions,
}

impl ReadObject {
impl<S> ReadObject<S>
where
S: crate::storage::stub::Storage + 'static,
{
pub(crate) fn new<B, O>(
stub: std::sync::Arc<crate::storage::transport::Storage>,
stub: std::sync::Arc<S>,
bucket: B,
object: O,
options: RequestOptions,
Expand Down Expand Up @@ -357,7 +363,6 @@ impl ReadObject {

/// Sends the request.
pub async fn send(self) -> Result<ReadObjectResponse> {
use crate::storage::stub::Storage;
self.stub.read_object(self.request, self.options).await
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/storage/request_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use gax::{
use std::sync::{Arc, Mutex};

#[derive(Clone, Debug)]
pub(crate) struct RequestOptions {
pub struct RequestOptions {
pub(crate) retry_policy: Arc<dyn RetryPolicy>,
pub(crate) backoff_policy: Arc<dyn BackoffPolicy>,
pub(crate) retry_throttler: SharedRetryThrottler,
Expand Down
29 changes: 19 additions & 10 deletions src/storage/src/storage/write_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,20 @@ use crate::storage::request_options::RequestOptions;
/// Ok(())
/// }
/// ```
pub struct WriteObject<T> {
stub: std::sync::Arc<crate::storage::transport::Storage>,
pub struct WriteObject<T, S = crate::storage::transport::Storage>
where
S: crate::storage::stub::Storage + 'static,
{
stub: std::sync::Arc<S>,
pub(crate) request: crate::model_ext::WriteObjectRequest,
pub(crate) payload: Payload<T>,
pub(crate) options: RequestOptions,
}

impl<T> WriteObject<T> {
impl<T, S> WriteObject<T, S>
where
S: crate::storage::stub::Storage + 'static,
{
/// Set a [request precondition] on the object generation to match.
///
/// With this precondition the request fails if the current object
Expand Down Expand Up @@ -892,7 +898,7 @@ impl<T> WriteObject<T> {
}

pub(crate) fn new<B, O, P>(
stub: std::sync::Arc<crate::storage::transport::Storage>,
stub: std::sync::Arc<S>,
bucket: B,
object: O,
payload: P,
Expand All @@ -918,11 +924,12 @@ impl<T> WriteObject<T> {
}
}

impl<T> WriteObject<T>
impl<T, S> WriteObject<T, S>
where
T: StreamingSource + Seek + Send + Sync + 'static,
<T as StreamingSource>::Error: std::error::Error + Send + Sync + 'static,
<T as Seek>::Error: std::error::Error + Send + Sync + 'static,
S: crate::storage::stub::Storage + 'static,
{
/// A simple upload from a buffer.
///
Expand All @@ -938,7 +945,6 @@ where
/// # Ok(()) }
/// ```
pub async fn send_unbuffered(self) -> Result<Object> {
use crate::storage::stub::Storage;
self.stub
.write_object_unbuffered(self.payload, self.request, self.options)
.await
Expand Down Expand Up @@ -976,7 +982,7 @@ where
/// send the checksums at the end of the upload with this API.
///
/// [JSON API]: https://cloud.google.com/storage/docs/json_api
pub async fn precompute_checksums(mut self) -> Result<WriteObject<T>> {
pub async fn precompute_checksums(mut self) -> Result<Self> {
let mut offset = 0_u64;
self.payload.seek(offset).await.map_err(Error::ser)?;
while let Some(n) = self.payload.next().await.transpose().map_err(Error::ser)? {
Expand All @@ -995,10 +1001,11 @@ where
}
}

impl<T> WriteObject<T>
impl<T, S> WriteObject<T, S>
where
T: StreamingSource + Send + Sync + 'static,
T::Error: std::error::Error + Send + Sync + 'static,
S: crate::storage::stub::Storage + 'static,
{
/// Upload an object from a streaming source without rewinds.
///
Expand All @@ -1014,15 +1021,17 @@ where
/// # Ok(()) }
/// ```
pub async fn send_buffered(self) -> crate::Result<Object> {
use crate::storage::stub::Storage;
self.stub
.write_object_buffered(self.payload, self.request, self.options)
.await
}
}

// We need `Debug` to use `expect_err()` in `Result<WriteObject, ...>`.
impl<T> std::fmt::Debug for WriteObject<T> {
impl<T, S> std::fmt::Debug for WriteObject<T, S>
where
S: crate::storage::stub::Storage + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WriteObject")
.field("stub", &self.stub)
Expand Down
Loading