Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/storage/src/storage/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ use std::sync::Arc;
/// [Application Default Credentials]: https://cloud.google.com/docs/authentication#adc
#[derive(Clone, Debug)]
pub struct Storage {
inner: std::sync::Arc<StorageInner>,
stub: std::sync::Arc<crate::storage::transport::Storage>,
options: RequestOptions,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -163,7 +164,13 @@ impl Storage {
O: Into<String>,
T: Into<Payload<P>>,
{
WriteObject::new(self.inner.clone(), bucket, object, payload)
WriteObject::new(
self.stub.clone(),
bucket,
object,
payload,
self.options.clone(),
)
}

/// Reads the contents of an object.
Expand Down Expand Up @@ -193,7 +200,7 @@ impl Storage {
B: Into<String>,
O: Into<String>,
{
ReadObject::new(self.inner.clone(), bucket, object)
ReadObject::new(self.stub.clone(), bucket, object, self.options.clone())
}

pub(crate) fn new(builder: ClientBuilder) -> gax::client_builder::Result<Self> {
Expand Down Expand Up @@ -222,7 +229,9 @@ impl Storage {
builder.credentials = Some(cred);
builder.endpoint = Some(endpoint);
let inner = Arc::new(StorageInner::new(client, builder));
Ok(Self { inner })
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner);
Ok(Self { stub, options })
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/src/storage/perform_upload/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ mod tests {
#[tokio::test]
async fn test_percent_encoding_object_name(want: &str) -> Result {
let inner = test_inner_client(test_builder());
let builder = WriteObject::new(inner.clone(), "projects/_/buckets/bucket", want, "hello");
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(stub, "projects/_/buckets/bucket", want, "hello", options);
let request = perform_upload(inner, builder)
.start_resumable_upload_request()
.await?
Expand Down
23 changes: 18 additions & 5 deletions src/storage/src/storage/perform_upload/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ pub(crate) fn perform_upload<T>(
#[tokio::test]
async fn start_resumable_upload() -> Result {
let inner = test_inner_client(test_builder());
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(
inner.clone(),
stub,
"projects/_/buckets/bucket",
"object",
"hello",
options,
);
let mut request = perform_upload(inner, builder)
.start_resumable_upload_request()
Expand All @@ -82,11 +85,14 @@ async fn start_resumable_upload_headers() -> Result {
let (key, key_base64, _, key_sha256_base64) = create_key_helper();

let inner = test_inner_client(test_builder());
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(
inner.clone(),
stub,
"projects/_/buckets/bucket",
"object",
"hello",
options,
)
.set_key(KeyAes256::new(&key)?);
let request = perform_upload(inner, builder)
Expand Down Expand Up @@ -118,7 +124,9 @@ async fn start_resumable_upload_headers() -> Result {
#[tokio::test]
async fn start_resumable_upload_bad_bucket() -> Result {
let inner = test_inner_client(test_builder());
let builder = WriteObject::new(inner.clone(), "malformed", "object", "hello");
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(stub, "malformed", "object", "hello", options);
let _ = perform_upload(inner, builder)
.start_resumable_upload_request()
.await
Expand All @@ -130,7 +138,9 @@ async fn start_resumable_upload_bad_bucket() -> Result {
async fn start_resumable_upload_metadata_in_request() -> Result {
use crate::model::ObjectAccessControl;
let inner = test_inner_client(test_builder());
let builder = WriteObject::new(inner.clone(), "projects/_/buckets/bucket", "object", "")
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(stub, "projects/_/buckets/bucket", "object", "", options)
.set_if_generation_match(10)
.set_if_generation_not_match(20)
.set_if_metageneration_match(30)
Expand Down Expand Up @@ -213,11 +223,14 @@ async fn start_resumable_upload_credentials() -> Result {
let inner = test_inner_client(
test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
);
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(
inner.clone(),
stub,
"projects/_/buckets/bucket",
"object",
"hello",
options,
);
let _ = perform_upload(inner, builder)
.start_resumable_upload_request()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,14 @@ fn response_body() -> Value {
async fn upload_object_bytes() -> Result {
const PAYLOAD: &str = "hello";
let inner = test_inner_client(test_builder());
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(
inner.clone(),
stub,
"projects/_/buckets/bucket",
"object",
PAYLOAD,
options,
);
let request = perform_upload(inner, builder)
.single_shot_builder(SizeHint::with_exact(PAYLOAD.len() as u64))
Expand All @@ -256,11 +259,14 @@ async fn upload_object_bytes() -> Result {
#[tokio::test]
async fn upload_object_metadata() -> Result {
let inner = test_inner_client(test_builder());
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(
inner.clone(),
stub,
"projects/_/buckets/bucket",
"object",
"hello",
options,
)
.set_metadata([("k0", "v0"), ("k1", "v1")]);
let request = perform_upload(inner, builder)
Expand Down Expand Up @@ -289,7 +295,9 @@ async fn upload_object_stream() -> Result {
.map(|x| bytes::Bytes::from_static(x.as_bytes())),
);
let inner = test_inner_client(test_builder());
let builder = WriteObject::new(inner.clone(), "projects/_/buckets/bucket", "object", stream);
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(stub, "projects/_/buckets/bucket", "object", stream, options);
let request = perform_upload(inner, builder)
.single_shot_builder(SizeHint::new())
.await?
Expand All @@ -310,11 +318,14 @@ async fn upload_object_error_credentials() -> Result {
let inner = test_inner_client(
test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
);
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(
inner.clone(),
stub,
"projects/_/buckets/bucket",
"object",
"hello",
options,
);
let _ = perform_upload(inner, builder)
.single_shot_builder(SizeHint::new())
Expand All @@ -327,7 +338,9 @@ async fn upload_object_error_credentials() -> Result {
#[tokio::test]
async fn upload_object_bad_bucket() -> Result {
let inner = test_inner_client(test_builder());
let builder = WriteObject::new(inner.clone(), "malformed", "object", "hello");
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(stub, "malformed", "object", "hello", options);
let _ = perform_upload(inner, builder)
.single_shot_builder(SizeHint::new())
.await
Expand All @@ -341,11 +354,14 @@ async fn upload_object_headers() -> Result {
let (key, key_base64, _, key_sha256_base64) = create_key_helper();

let inner = test_inner_client(test_builder());
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = WriteObject::new(
inner.clone(),
stub,
"projects/_/buckets/bucket",
"object",
"hello",
options,
)
.set_key(KeyAes256::new(&key)?);
let request = perform_upload(inner, builder)
Expand Down Expand Up @@ -429,10 +445,18 @@ async fn retry_transient_not_idempotent() -> Result {

let inner =
test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
let err = WriteObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
.send_unbuffered()
.await
.expect_err("expected error as request is not idempotent");
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner);
let err = WriteObject::new(
stub,
"projects/_/buckets/bucket",
"object",
"hello",
options,
)
.send_unbuffered()
.await
.expect_err("expected error as request is not idempotent");
assert_eq!(err.http_status_code(), Some(503), "{err:?}");

Ok(())
Expand All @@ -456,10 +480,18 @@ async fn retry_transient_override_idempotency() -> Result {

let inner =
test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
let got = WriteObject::new(inner, "projects/_/buckets/bucket", "object", "hello")
.with_idempotency(true)
.send_unbuffered()
.await?;
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner);
let got = WriteObject::new(
stub,
"projects/_/buckets/bucket",
"object",
"hello",
options,
)
.with_idempotency(true)
.send_unbuffered()
.await?;
let want = Object::from(serde_json::from_value::<v1::Object>(response_body())?);
assert_eq!(got, want);

Expand All @@ -484,11 +516,14 @@ async fn retry_transient_failures_then_success() -> Result {

let inner =
test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner);
let got = WriteObject::new(
inner,
stub,
"projects/_/buckets/test-bucket",
"test-object",
"hello",
options,
)
.set_if_generation_match(0)
.send_unbuffered()
Expand Down Expand Up @@ -517,11 +552,14 @@ async fn retry_transient_failures_then_permanent() -> Result {

let inner =
test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner);
let err = WriteObject::new(
inner,
stub,
"projects/_/buckets/test-bucket",
"test-object",
"hello",
options,
)
.set_if_generation_match(0)
.send_unbuffered()
Expand Down Expand Up @@ -550,11 +588,14 @@ async fn retry_transient_failures_exhausted() -> Result {

let inner =
test_inner_client(test_builder().with_endpoint(format!("http://{}", server.addr())));
let options = inner.options.clone();
let stub = crate::storage::transport::Storage::new(inner);
let err = WriteObject::new(
inner,
stub,
"projects/_/buckets/test-bucket",
"test-object",
"hello",
options,
)
.set_if_generation_match(0)
.with_retry_policy(crate::retry_policy::RetryableErrors.with_attempt_limit(3))
Expand Down
Loading
Loading