From 3dd103a8aed8c389a099b6f6232e27ae6385aa8f Mon Sep 17 00:00:00 2001 From: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Date: Sat, 21 Oct 2023 06:32:24 -0400 Subject: [PATCH] Add ObjectStore::put_opts so we can write tags (#45) * Add put_opt method for tagging S3 objects * make tags public * Fix condition --- object_store/src/aws/client.rs | 24 ++++++++++++++++++++++-- object_store/src/aws/mod.rs | 26 ++++++++++++++++++++++++-- object_store/src/lib.rs | 27 +++++++++++++++++++++++++++ object_store/src/local.rs | 6 +++++- 4 files changed, 78 insertions(+), 5 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 0c2493651000..67b5dab0a8c4 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -17,7 +17,9 @@ use crate::aws::checksum::Checksum; use crate::aws::credential::{AwsCredential, CredentialExt}; -use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET}; +use crate::aws::{ + AwsCredentialProvider, STORE, STRICT_ENCODE_SET, STRICT_PATH_ENCODE_SET, +}; use crate::client::get::GetClient; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; @@ -33,11 +35,12 @@ use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; use itertools::Itertools; -use percent_encoding::{utf8_percent_encode, PercentEncode}; +use percent_encoding::{percent_encode, utf8_percent_encode, PercentEncode}; use quick_xml::events::{self as xml_events}; use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response}; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; +use std::collections::HashMap; use std::sync::Arc; /// A specialized `Error` for object store-related errors @@ -217,6 +220,8 @@ pub(crate) struct S3Client { client: ReqwestClient, } +const TAGGING_HEADER: &str = "x-amz-tagging"; + impl S3Client { pub fn new(config: S3Config) -> Result { let client = config.client_options.client()?; @@ -238,6 +243,7 @@ impl S3Client { path: &Path, bytes: Option, query: &T, + tags: Option<&HashMap>, ) -> Result { let credential = self.get_credential().await?; let url = self.config.path_url(path); @@ -260,6 +266,20 @@ impl S3Client { builder = builder.header(CONTENT_TYPE, value); } + if let Some(tags) = tags { + let tags = tags + .iter() + .map(|(key, value)| { + let key = + percent_encode(key.as_bytes(), &STRICT_ENCODE_SET).to_string(); + let value = + percent_encode(value.as_bytes(), &STRICT_ENCODE_SET).to_string(); + format!("{key}={value}") + }) + .join("&"); + builder = builder.header(TAGGING_HEADER, tags); + } + let response = builder .query(query) .with_aws_sigv4( diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 8a486f986792..fe3b41115034 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -59,7 +59,7 @@ use crate::config::ConfigValue; use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart}; use crate::{ ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, Path, Result, RetryConfig, + ObjectStore, Path, PutOptions, Result, RetryConfig, }; mod checksum; @@ -211,7 +211,28 @@ impl AmazonS3 { #[async_trait] impl ObjectStore for AmazonS3 { async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.client.put_request(location, Some(bytes), &()).await?; + self.client + .put_request(location, Some(bytes), &(), None) + .await?; + Ok(()) + } + + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + options: PutOptions, + ) -> Result<()> { + if options.tags.is_empty() { + self.client + .put_request(location, Some(bytes), &(), None) + .await?; + } else { + self.client + .put_request(location, Some(bytes), &(), Some(&options.tags)) + .await?; + } + Ok(()) } @@ -323,6 +344,7 @@ impl CloudMultiPartUploadImpl for S3MultiPartUpload { &self.location, Some(buf.into()), &[("partNumber", &part), ("uploadId", &self.upload_id)], + None, ) .await?; diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 864cabc4a8c0..f2eff41dd496 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -246,6 +246,7 @@ mod client; #[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))] pub use client::{backoff::BackoffConfig, retry::RetryConfig, CredentialProvider}; +use std::collections::HashMap; #[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))] mod config; @@ -291,6 +292,25 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// should be able to observe a partially written object async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>; + /// Save the provided bytes to the specified location + /// + /// The operation is guaranteed to be atomic, it will either successfully + /// write the entirety of `bytes` to `location`, or fail. No clients + /// should be able to observe a partially written object + /// + /// If the specified `options` include key-value metadata, this will be stored + /// along with the object depending on the capabilities of the underlying implementation. + /// + /// For example, when using an AWS S3 `ObjectStore` the `tags` will be saved as object tags in S3 + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + _options: PutOptions, + ) -> Result<()> { + self.put(location, bytes).await + } + /// Get a multi-part upload that allows writing data in chunks /// /// Most cloud-based uploads will buffer and upload parts in parallel. @@ -681,6 +701,13 @@ pub struct GetOptions { pub range: Option>, } +/// Options for a put request, such as tags +#[derive(Debug, Default)] +pub struct PutOptions { + /// Key/Value metadata associated with the object + pub tags: HashMap, +} + impl GetOptions { /// Returns an error if the modification conditions on this request are not satisfied fn check_modified( diff --git a/object_store/src/local.rs b/object_store/src/local.rs index ffff6a5739d5..a0933cc6177d 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -863,7 +863,11 @@ impl AsyncWrite for LocalUpload { } } -pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range) -> Result { +pub(crate) fn read_range( + file: &mut File, + path: &PathBuf, + range: Range, +) -> Result { let to_read = range.end - range.start; file.seek(SeekFrom::Start(range.start as u64)) .context(SeekSnafu { path })?;