From 9cf76bc2fdabd5a165cc2bf6923d0b35dbb79916 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Tue, 5 Nov 2024 17:32:59 -0500 Subject: [PATCH] Address review feedback --- .github/workflows/object_store.yml | 2 +- object_store/src/aws/client.rs | 12 +++++- object_store/src/aws/mod.rs | 61 +++++++++++++++++----------- object_store/src/aws/precondition.rs | 27 +++++++----- 4 files changed, 65 insertions(+), 37 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index c073e37da6f4..107b4acf78c3 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -164,7 +164,7 @@ jobs: - name: Run object_store tests (AWS native conditional put) run: cargo test --features=aws env: - AWS_CONDITIONAL_PUT: etag-create-only + AWS_CONDITIONAL_PUT: etag-put-if-not-exists AWS_COPY_IF_NOT_EXISTS: multipart - name: GCS Output diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 2e04683e7b30..a610e635178d 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -641,7 +641,7 @@ impl S3Client { PutPartPayload::Part(payload) => request.with_payload(payload), PutPartPayload::Copy(path) => request.header( "x-amz-copy-source", - &format!("{}/{}", self.config.bucket, path), + &format!("{}/{}", self.config.bucket, encode_path(path)), ), }; @@ -671,6 +671,16 @@ impl S3Client { Ok(PartId { content_id }) } + pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> { + self.request(Method::DELETE, location) + .query(&[("uploadId", upload_id)]) + .with_encryption_headers() + .send() + .await?; + + Ok(()) + } + pub(crate) async fn complete_multipart( &self, location: &Path, diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index acfa05395901..b238d90eb6d7 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -171,7 +171,7 @@ impl ObjectStore for AmazonS3 { (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), ( PutMode::Create, - Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagCreateOnly), + Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagPutIfNotExists), ) => { match request.header(&IF_NONE_MATCH, "*").do_put().await { // Technically If-None-Match should return NotModified but some stores, @@ -196,7 +196,7 @@ impl ObjectStore for AmazonS3 { source: "ETag required for conditional put".to_string().into(), })?; match put { - S3ConditionalPut::ETagCreateOnly => Err(Error::NotImplemented), + S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented), S3ConditionalPut::ETagMatch => { request.header(&IF_MATCH, etag.as_str()).do_put().await } @@ -302,27 +302,40 @@ impl ObjectStore for AmazonS3 { .client .create_multipart(to, PutMultipartOpts::default()) .await?; - let part_id = self - .client - .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) - .await?; - let res = match self - .client - .complete_multipart( - to, - &upload_id, - vec![part_id], - CompleteMultipartMode::Create, - ) - .await - { - Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { - path: to.to_string(), - source: Box::new(e), - }), - Ok(_) => Ok(()), - Err(e) => Err(e.into()), - }; + + let res = async { + let part_id = self + .client + .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) + .await?; + match self + .client + .complete_multipart( + to, + &upload_id, + vec![part_id], + CompleteMultipartMode::Create, + ) + .await + { + Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { + path: to.to_string(), + source: Box::new(e), + }), + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + .await; + + // If the multipart upload failed, make a best effort attempt to + // clean it up. It's the caller's responsibility to add a + // lifecycle rule if guaranteed cleanup is required, as we + // cannot protect against an ill-timed process crash. + if res.is_err() { + let _ = self.client.abort_multipart(to, &upload_id).await; + } + return res; } Some(S3CopyIfNotExists::Dynamo(lock)) => { @@ -504,7 +517,7 @@ mod tests { copy_if_not_exists(&integration).await; } if let Some(conditional_put) = &config.conditional_put { - let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagCreateOnly); + let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagPutIfNotExists); put_opts(&integration, supports_update).await; } diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index 80f3c1a03615..e5058052790d 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -47,11 +47,17 @@ pub enum S3CopyIfNotExists { /// Encoded as `header-with-status:::` ignoring whitespace HeaderWithStatus(String, String, reqwest::StatusCode), /// Native Amazon S3 supports copy if not exists through a multipart upload - /// where the upload copies an existing object and is completed only if - /// the new object does not already exist. + /// where the upload copies an existing object and is completed only if the + /// new object does not already exist. /// - /// WARNING: When using this mode, `copy_if_not_exists` does not copy - /// tags or attributes from the source object. + /// WARNING: When using this mode, `copy_if_not_exists` does not copy tags + /// or attributes from the source object. + /// + /// WARNING: When using this mode, `copy_if_not_exists` makes only a best + /// effort attempt to clean up the multipart upload if the copy operation + /// fails. Consider using a lifecycle rule to automatically clean up + /// abandoned multipart uploads. See [the module + /// docs](super#multipart-uploads) for details. /// /// Encoded as `multipart` ignoring whitespace. Multipart, @@ -81,9 +87,8 @@ impl std::fmt::Display for S3CopyIfNotExists { impl S3CopyIfNotExists { fn from_str(s: &str) -> Option { - match s.trim() { - "multipart" => return Some(Self::Multipart), - _ => (), + if s.trim() == "multipart" { + return Some(Self::Multipart); }; let (variant, value) = s.split_once(':')?; @@ -139,10 +144,10 @@ pub enum S3ConditionalPut { /// This is the limited form of conditional put supported by Amazon S3 /// as of August 2024 ([announcement]). /// - /// Encoded as `etag-create-only` ignoring whitespace. + /// Encoded as `etag-put-if-not-exists` ignoring whitespace. /// /// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/ - ETagCreateOnly, + ETagPutIfNotExists, /// The name of a DynamoDB table to use for coordination /// @@ -159,7 +164,7 @@ impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), - Self::ETagCreateOnly => write!(f, "etag-create-only"), + Self::ETagPutIfNotExists => write!(f, "etag-put-if-not-exists"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -169,7 +174,7 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), - "etag-create-only" => Some(Self::ETagCreateOnly), + "etag-put-if-not-exists" => Some(Self::ETagPutIfNotExists), trimmed => match trimmed.split_once(':')? { ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), _ => None,