Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ impl PaginatedListStore for AmazonS3 {
mod tests {
use super::*;
use crate::ClientOptions;
use crate::ObjectStoreExt;
use crate::client::SpawnedReqwestConnector;
use crate::client::get::GetClient;
use crate::client::retry::RetryContext;
Expand Down
1 change: 1 addition & 0 deletions src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ impl PaginatedListStore for MicrosoftAzure {
#[cfg(test)]
mod tests {
use super::*;
use crate::ObjectStoreExt;
use crate::integration::*;
use crate::tests::*;
use bytes::Bytes;
Expand Down
8 changes: 4 additions & 4 deletions src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ impl AsyncBufRead for BufReader {

/// An async buffered writer compatible with the tokio IO traits
///
/// This writer adaptively uses [`ObjectStore::put`] or
/// This writer adaptively uses [`ObjectStore::put_opts`] or
/// [`ObjectStore::put_multipart`] depending on the amount of data that has
/// been written.
///
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will instead be
/// streamed using [`ObjectStore::put_multipart`]
pub struct BufWriter {
capacity: usize,
Expand All @@ -242,7 +242,7 @@ enum BufWriterState {
Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
/// Write to a multipart upload
Write(Option<WriteMultipart>),
/// [`ObjectStore::put`]
/// [`ObjectStore::put_opts`]
Flush(BoxFuture<'static, crate::Result<()>>),
}

Expand Down Expand Up @@ -489,7 +489,7 @@ mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
use crate::{Attribute, GetOptions};
use crate::{Attribute, GetOptions, ObjectStoreExt};
use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

Expand Down
1 change: 1 addition & 0 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl ObjectStore for ChunkedStore {
mod tests {
use futures::StreamExt;

use crate::ObjectStoreExt;
#[cfg(feature = "fs")]
use crate::integration::*;
#[cfg(feature = "fs")]
Expand Down
1 change: 1 addition & 0 deletions src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ impl PaginatedListStore for GoogleCloudStorage {
mod test {
use credential::DEFAULT_GCS_BASE_URL;

use crate::ObjectStoreExt;
use crate::integration::*;
use crate::tests::*;

Expand Down
2 changes: 1 addition & 1 deletion src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::multipart::MultipartStore;
use crate::path::Path;
use crate::{
Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, MultipartUpload,
ObjectStore, PutMode, PutPayload, UpdateVersion, WriteMultipart,
ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, WriteMultipart,
};
use bytes::Bytes;
use futures::stream::FuturesUnordered;
Expand Down
68 changes: 47 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@
//!
//! # Put Object
//!
//! Use the [`ObjectStore::put`] method to atomically write data.
//! Use the [`ObjectStoreExt::put`] method to atomically write data.
//!
//! ```ignore-wasm32
//! # use object_store::local::LocalFileSystem;
//! # use object_store::{ObjectStore, PutPayload};
//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
//! # use std::sync::Arc;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
Expand Down Expand Up @@ -364,7 +364,7 @@
//!
//! ```ignore-wasm32
//! # use object_store::local::LocalFileSystem;
//! # use object_store::{ObjectStore, PutPayloadMut};
//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -613,19 +613,24 @@ pub type DynObjectStore = dyn ObjectStore;
pub type MultipartId = String;

/// Universal API to multiple object store services.
///
/// For more convience methods, check [`ObjectStoreExt`].
///
/// # Contract
/// This trait is meant as a contract between object store implementations
/// (e.g. providers, wrappers) and the `object_store` crate itself and is
/// intended to be the minimum API required for an object store.
///
/// The [`ObjectStoreExt`] acts as an API/contract between `object_store`
/// and the store users and provides additional methods that may be simpler to use but overlap
/// in functionality with `ObjectStore`
#[async_trait]
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// Save the provided bytes to the specified location
/// Save the provided `payload` to `location` with the given options
///
/// The operation is guaranteed to be atomic, it will either successfully
/// write the entirety of `payload` to `location`, or fail. No clients
/// should be able to observe a partially written object
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.put_opts(location, payload, PutOptions::default())
.await
}

/// Save the provided `payload` to `location` with the given options
async fn put_opts(
&self,
location: &Path,
Expand All @@ -635,7 +640,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Perform a multipart upload
///
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
/// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
///
/// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
Expand All @@ -646,7 +651,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Perform a multipart upload with options
///
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
/// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
///
/// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
Expand All @@ -665,7 +670,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
/// # use object_store::{path::Path, ObjectStore};
/// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
/// async fn get_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
Expand Down Expand Up @@ -699,7 +704,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
/// # use object_store::{path::Path, ObjectStore, GetOptions};
/// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
/// async fn get_opts_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
Expand Down Expand Up @@ -756,7 +761,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
/// # use object_store::{path::Path, ObjectStore, GetOptions};
/// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
/// async fn get_opts_range_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
Expand Down Expand Up @@ -824,7 +829,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// ```ignore-wasm32
/// # use object_store::local::LocalFileSystem;
/// # use tempfile::tempdir;
/// # use object_store::{path::Path, ObjectStore};
/// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
/// async fn get_range_example() {
/// let tmp = tempdir().unwrap();
/// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
Expand Down Expand Up @@ -892,7 +897,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # let root = tempfile::TempDir::new().unwrap();
/// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
/// # use object_store::{ObjectStore, ObjectMeta};
/// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
/// # use object_store::path::Path;
/// # use futures::{StreamExt, TryStreamExt};
/// #
Expand Down Expand Up @@ -1103,10 +1108,6 @@ macro_rules! as_ref_impl {
($type:ty) => {
#[async_trait]
impl ObjectStore for $type {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.as_ref().put(location, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand Down Expand Up @@ -1201,6 +1202,31 @@ macro_rules! as_ref_impl {
as_ref_impl!(Arc<dyn ObjectStore>);
as_ref_impl!(Box<dyn ObjectStore>);

/// Extension trait for [`ObjectStore`] with convinience functions.
///
/// See "contract" section within the [`ObjectStore`] documentation for more reasoning.
///
/// # Implementation
/// You MUST NOT implement this trait yourself. It is automatically implemented for all [`ObjectStore`] implementations.
pub trait ObjectStoreExt: ObjectStore {
/// Save the provided bytes to the specified location
///
/// The operation is guaranteed to be atomic, it will either successfully
/// write the entirety of `payload` to `location`, or fail. No clients
/// should be able to observe a partially written object
fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output = Result<PutResult>>;
}

impl<T> ObjectStoreExt for T
where
T: ObjectStore + ?Sized,
{
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.put_opts(location, payload, PutOptions::default())
.await
}
}

/// Result of a list call that includes objects, prefixes (directories) and a
/// token for the next set of results. Individual result sets may be limited to
/// 1,000 objects based on the underlying object storage's limitations.
Expand Down
5 changes: 0 additions & 5 deletions src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {

#[async_trait]
impl<T: ObjectStore> ObjectStore for LimitStore<T> {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.put(location, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand Down
2 changes: 1 addition & 1 deletion src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ mod tests {
#[cfg(target_family = "unix")]
use tempfile::NamedTempFile;

use crate::integration::*;
use crate::{ObjectStoreExt, integration::*};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ impl MultipartUpload for InMemoryUpload {

#[cfg(test)]
mod tests {
use crate::integration::*;
use crate::{ObjectStoreExt, integration::*};

use super::*;

Expand Down
7 changes: 1 addition & 6 deletions src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ fn strip_meta(prefix: &Path, meta: ObjectMeta) -> ObjectMeta {

#[async_trait::async_trait]
impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
let full_path = self.full_path(location);
self.inner.put(&full_path, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand Down Expand Up @@ -238,8 +233,8 @@ mod tests {
use std::slice;

use super::*;
use crate::integration::*;
use crate::local::LocalFileSystem;
use crate::{ObjectStoreExt, integration::*};

use tempfile::TempDir;

Expand Down
8 changes: 2 additions & 6 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub struct ThrottleConfig {
/// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
pub wait_list_with_delimiter_per_entry: Duration,

/// Sleep duration for every call to [`put`](ThrottledStore::put).
/// Sleep duration for every call to [`put_opts`](ThrottledStore::put_opts).
///
/// Sleeping is done before the underlying store is called and independently of the success of
/// the operation.
Expand Down Expand Up @@ -148,11 +148,6 @@ impl<T: ObjectStore> std::fmt::Display for ThrottledStore<T> {

#[async_trait]
impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
sleep(self.config().wait_put_per_call).await;
self.inner.put(location, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand Down Expand Up @@ -419,6 +414,7 @@ mod tests {
use super::*;
#[cfg(target_os = "linux")]
use crate::GetResultPayload;
use crate::ObjectStoreExt;
use crate::{integration::*, memory::InMemory};
use futures::TryStreamExt;
use tokio::time::Duration;
Expand Down