Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::stream::BoxStream;
use crate::path::Path;
use crate::{
CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutResult,
ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions,
};
use crate::{PutPayload, Result};

Expand Down Expand Up @@ -170,12 +170,8 @@ impl ObjectStore for ChunkedStore {
self.inner.copy_opts(from, to, options).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.rename(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.rename_if_not_exists(from, to).await
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
self.inner.rename_opts(from, to, options).await
}
}

Expand Down
135 changes: 116 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,19 +1035,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
///
/// By default, this is implemented as a copy and then delete source. It may not
/// check when deleting source that it was the same object that was originally copied.
///
/// If there exists an object at the destination, it will be overwritten.
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.copy(from, to).await?;
self.delete(from).await
}

/// Move an object from one path to another in the same object store.
///
/// Will return an error if the destination already has an object.
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.copy_if_not_exists(from, to).await?;
self.delete(from).await
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
let RenameOptions {
target_mode,
extensions,
} = options;
let copy_mode = match target_mode {
RenameTargetMode::Overwrite => CopyMode::Overwrite,
RenameTargetMode::Create => CopyMode::Create,
};
let copy_options = CopyOptions {
mode: copy_mode,
extensions,
};
self.copy_opts(from, to, copy_options).await?;
self.delete(from).await?;
Ok(())
}
}

Expand Down Expand Up @@ -1116,12 +1119,13 @@ macro_rules! as_ref_impl {
self.as_ref().copy_opts(from, to, options).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename_if_not_exists(from, to).await
async fn rename_opts(
&self,
from: &Path,
to: &Path,
options: RenameOptions,
) -> Result<()> {
self.as_ref().rename_opts(from, to, options).await
}
}
};
Expand Down Expand Up @@ -1238,6 +1242,19 @@ pub trait ObjectStoreExt: ObjectStore {
/// If atomic operations are not supported by the underlying object storage (like S3)
/// it will return an error.
fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;

/// Move an object from one path to another in the same object store.
///
/// By default, this is implemented as a copy and then delete source. It may not
/// check when deleting source that it was the same object that was originally copied.
///
/// If there exists an object at the destination, it will be overwritten.
fn rename(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;

/// Move an object from one path to another in the same object store.
///
/// Will return an error if the destination already has an object.
fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
}

impl<T> ObjectStoreExt for T
Expand Down Expand Up @@ -1277,6 +1294,16 @@ where
let options = CopyOptions::new().with_mode(CopyMode::Create);
self.copy_opts(from, to, options).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let options = RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
self.rename_opts(from, to, options).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let options = RenameOptions::new().with_target_mode(RenameTargetMode::Create);
self.rename_opts(from, to, options).await
}
}

/// Result of a list call that includes objects, prefixes (directories) and a
Expand Down Expand Up @@ -1828,6 +1855,76 @@ impl PartialEq<Self> for CopyOptions {

impl Eq for CopyOptions {}

/// Configure preconditions for the target of rename operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some research about the names used in cloud providers

S3 uses the term If-None-Match for the "don't overwrite" method: https://docs.aws.amazon.com/AmazonS3/latest/API/API_RenameObject.html

From what I can tell, GCP doesn't support an atoimic rename/overwrite operation. If you want to overwrite you have to delete the target first : https://docs.cloud.google.com/storage/docs/copying-renaming-moving-objects

Azure Blob Storage calls it "replace if exists", specifically https://learn.microsoft.com/en-us/rest/api/storageservices/rename-file x-ms-file-rename-replace-if-exists

So TLDR I think these options are well named and reflect the available capabilities 👍

///
/// Note though that the source location may or not be deleted at the same time in an atomic operation. There is
/// currently NO flag to control the atomicity of "delete source at the same time as creating the target".
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RenameTargetMode {
/// Perform a write operation on the target, overwriting any object present at the provided path.
#[default]
Overwrite,
/// Perform an atomic write operation of the target, returning [`Error::AlreadyExists`] if an
/// object already exists at the provided path.
Create,
}

/// Options for a rename request
#[derive(Debug, Clone, Default)]
pub struct RenameOptions {
/// Configure the [`RenameTargetMode`] for this operation
pub target_mode: RenameTargetMode,
/// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
/// that need to pass context-specific information (like tracing spans) via trait methods.
///
/// These extensions are ignored entirely by backends offered through this crate.
///
/// They are also excluded from [`PartialEq`] and [`Eq`].
pub extensions: Extensions,
}

impl RenameOptions {
/// Create a new [`RenameOptions`]
pub fn new() -> Self {
Self::default()
}

/// Sets the `target_mode=.
///
/// See [`RenameOptions::target_mode`].
#[must_use]
pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
self.target_mode = target_mode;
self
}

/// Sets the `extensions`.
///
/// See [`RenameOptions::extensions`].
#[must_use]
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
}

impl PartialEq<Self> for RenameOptions {
fn eq(&self, other: &Self) -> bool {
let Self {
target_mode,
extensions: _,
} = self;
let Self {
target_mode: target_mode_other,
extensions: _,
} = other;

target_mode == target_mode_other
}
}

impl Eq for RenameOptions {}

/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down
13 changes: 4 additions & 9 deletions src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

use crate::{
BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
StreamExt, UploadPart,
ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult,
RenameOptions, Result, StreamExt, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -156,14 +156,9 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.copy_opts(from, to, options).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.rename(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.rename_if_not_exists(from, to).await
self.inner.rename_opts(from, to, options).await
}
}

Expand Down
62 changes: 45 additions & 17 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
path::{Path, absolute_path_to_url},
util::InvalidGetRange,
};
use crate::{CopyMode, CopyOptions};
use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};

/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -610,24 +610,52 @@ impl ObjectStore for LocalFileSystem {
}
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.path_to_filesystem(from)?;
let to = self.path_to_filesystem(to)?;
maybe_spawn_blocking(move || {
loop {
match std::fs::rename(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::NotFound => match from.exists() {
true => create_parent_dirs(&to, source)?,
false => return Err(Error::NotFound { path: from, source }.into()),
},
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
let RenameOptions {
target_mode,
extensions,
} = options;

match target_mode {
// optimized implementation
RenameTargetMode::Overwrite => {
let from = self.path_to_filesystem(from)?;
let to = self.path_to_filesystem(to)?;
maybe_spawn_blocking(move || {
loop {
match std::fs::rename(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::NotFound => match from.exists() {
true => create_parent_dirs(&to, source)?,
false => {
return Err(Error::NotFound { path: from, source }.into());
}
},
_ => {
return Err(Error::UnableToCopyFile { from, to, source }.into());
}
},
}
}
})
.await
}
// fall-back to copy & delete
RenameTargetMode::Create => {
self.copy_opts(
from,
to,
CopyOptions {
mode: CopyMode::Create,
extensions,
},
}
)
.await?;
self.delete(from).await?;
Ok(())
}
})
.await
}
}
}

Expand Down
12 changes: 3 additions & 9 deletions src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::ops::Range;
use crate::path::Path;
use crate::{
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result,
};

/// Store wrapper that applies a constant prefix to all paths handled by the store.
Expand Down Expand Up @@ -188,16 +188,10 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.copy_opts(&full_from, &full_to, options).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.rename(&full_from, &full_to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
self.inner.rename_if_not_exists(&full_from, &full_to).await
self.inner.rename_opts(&full_from, &full_to, options).await
}
}

Expand Down
12 changes: 3 additions & 9 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::ops::Range;
use std::{convert::TryInto, sync::Arc};

use crate::multipart::{MultipartStore, PartId};
use crate::{CopyOptions, GetOptions, UploadPart};
use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart};
use crate::{
GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
Expand Down Expand Up @@ -261,16 +261,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.copy_opts(from, to, options).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
sleep(self.config().wait_put_per_call).await;

self.inner.rename(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
sleep(self.config().wait_put_per_call).await;

self.inner.rename_if_not_exists(from, to).await
self.inner.rename_opts(from, to, options).await
}
}

Expand Down