Skip to content

Commit

Permalink
Add ObjectStore::put_opts so we can write tags (#45)
Browse files Browse the repository at this point in the history
* Add put_opt method for tagging S3 objects

* make tags public

* Fix condition
  • Loading branch information
thinkharderdev authored Oct 21, 2023
1 parent cbaf98c commit 3dd103a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
24 changes: 22 additions & 2 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
use crate::aws::{
AwsCredentialProvider, STORE, STRICT_ENCODE_SET, STRICT_PATH_ENCODE_SET,
};
use crate::client::get::GetClient;
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
Expand All @@ -33,11 +35,12 @@ use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use itertools::Itertools;
use percent_encoding::{utf8_percent_encode, PercentEncode};
use percent_encoding::{percent_encode, utf8_percent_encode, PercentEncode};
use quick_xml::events::{self as xml_events};
use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::collections::HashMap;
use std::sync::Arc;

/// A specialized `Error` for object store-related errors
Expand Down Expand Up @@ -217,6 +220,8 @@ pub(crate) struct S3Client {
client: ReqwestClient,
}

const TAGGING_HEADER: &str = "x-amz-tagging";

impl S3Client {
pub fn new(config: S3Config) -> Result<Self> {
let client = config.client_options.client()?;
Expand All @@ -238,6 +243,7 @@ impl S3Client {
path: &Path,
bytes: Option<Bytes>,
query: &T,
tags: Option<&HashMap<String, String>>,
) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
Expand All @@ -260,6 +266,20 @@ impl S3Client {
builder = builder.header(CONTENT_TYPE, value);
}

if let Some(tags) = tags {
let tags = tags
.iter()
.map(|(key, value)| {
let key =
percent_encode(key.as_bytes(), &STRICT_ENCODE_SET).to_string();
let value =
percent_encode(value.as_bytes(), &STRICT_ENCODE_SET).to_string();
format!("{key}={value}")
})
.join("&");
builder = builder.header(TAGGING_HEADER, tags);
}

let response = builder
.query(query)
.with_aws_sigv4(
Expand Down
26 changes: 24 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::{
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Path, Result, RetryConfig,
ObjectStore, Path, PutOptions, Result, RetryConfig,
};

mod checksum;
Expand Down Expand Up @@ -211,7 +211,28 @@ impl AmazonS3 {
#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.client.put_request(location, Some(bytes), &()).await?;
self.client
.put_request(location, Some(bytes), &(), None)
.await?;
Ok(())
}

async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
options: PutOptions,
) -> Result<()> {
if options.tags.is_empty() {
self.client
.put_request(location, Some(bytes), &(), None)
.await?;
} else {
self.client
.put_request(location, Some(bytes), &(), Some(&options.tags))
.await?;
}

Ok(())
}

Expand Down Expand Up @@ -323,6 +344,7 @@ impl CloudMultiPartUploadImpl for S3MultiPartUpload {
&self.location,
Some(buf.into()),
&[("partNumber", &part), ("uploadId", &self.upload_id)],
None,
)
.await?;

Expand Down
27 changes: 27 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ mod client;

#[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))]
pub use client::{backoff::BackoffConfig, retry::RetryConfig, CredentialProvider};
use std::collections::HashMap;

#[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))]
mod config;
Expand Down Expand Up @@ -291,6 +292,25 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// should be able to observe a partially written object
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;

/// Save the provided bytes to the specified location
///
/// The operation is guaranteed to be atomic, it will either successfully
/// write the entirety of `bytes` to `location`, or fail. No clients
/// should be able to observe a partially written object
///
/// If the specified `options` include key-value metadata, this will be stored
/// along with the object depending on the capabilities of the underlying implementation.
///
/// For example, when using an AWS S3 `ObjectStore` the `tags` will be saved as object tags in S3
async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
_options: PutOptions,
) -> Result<()> {
self.put(location, bytes).await
}

/// Get a multi-part upload that allows writing data in chunks
///
/// Most cloud-based uploads will buffer and upload parts in parallel.
Expand Down Expand Up @@ -681,6 +701,13 @@ pub struct GetOptions {
pub range: Option<Range<usize>>,
}

/// Options for a put request, such as tags
#[derive(Debug, Default)]
pub struct PutOptions {
/// Key/Value metadata associated with the object
pub tags: HashMap<String, String>,
}

impl GetOptions {
/// Returns an error if the modification conditions on this request are not satisfied
fn check_modified(
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,11 @@ impl AsyncWrite for LocalUpload {
}
}

pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
pub(crate) fn read_range(
file: &mut File,
path: &PathBuf,
range: Range<usize>,
) -> Result<Bytes> {
let to_read = range.end - range.start;
file.seek(SeekFrom::Start(range.start as u64))
.context(SeekSnafu { path })?;
Expand Down

0 comments on commit 3dd103a

Please sign in to comment.