Skip to content

Commit d9e7af3

Browse files
committed
refactor!: rename & rename_if_not_exists => rename_opts
Change the `ObjectStore` core trait to have a single, extensible rename operation. This helps #385 and #297. Also adds extensions similar to apache/arrow-rs#7170 and apache/arrow-rs#7213 . Also see #548 -- which did something similar for `copy`.
1 parent 521f1dc commit d9e7af3

File tree

6 files changed

+174
-70
lines changed

6 files changed

+174
-70
lines changed

src/chunked.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use futures::stream::BoxStream;
2929
use crate::path::Path;
3030
use crate::{
3131
CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
32-
ObjectStore, PutMultipartOptions, PutOptions, PutResult,
32+
ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions,
3333
};
3434
use crate::{PutPayload, Result};
3535

@@ -170,12 +170,8 @@ impl ObjectStore for ChunkedStore {
170170
self.inner.copy_opts(from, to, options).await
171171
}
172172

173-
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
174-
self.inner.rename(from, to).await
175-
}
176-
177-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
178-
self.inner.rename_if_not_exists(from, to).await
173+
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
174+
self.inner.rename_opts(from, to, options).await
179175
}
180176
}
181177

src/lib.rs

Lines changed: 116 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,19 +1035,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
10351035
///
10361036
/// By default, this is implemented as a copy and then delete source. It may not
10371037
/// check when deleting source that it was the same object that was originally copied.
1038-
///
1039-
/// If there exists an object at the destination, it will be overwritten.
1040-
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
1041-
self.copy(from, to).await?;
1042-
self.delete(from).await
1043-
}
1044-
1045-
/// Move an object from one path to another in the same object store.
1046-
///
1047-
/// Will return an error if the destination already has an object.
1048-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1049-
self.copy_if_not_exists(from, to).await?;
1050-
self.delete(from).await
1038+
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
1039+
let RenameOptions {
1040+
target_mode,
1041+
extensions,
1042+
} = options;
1043+
let copy_mode = match target_mode {
1044+
RenameTargetMode::Overwrite => CopyMode::Overwrite,
1045+
RenameTargetMode::Create => CopyMode::Create,
1046+
};
1047+
let copy_options = CopyOptions {
1048+
mode: copy_mode,
1049+
extensions,
1050+
};
1051+
self.copy_opts(from, to, copy_options).await?;
1052+
self.delete(from).await?;
1053+
Ok(())
10511054
}
10521055
}
10531056

@@ -1116,12 +1119,13 @@ macro_rules! as_ref_impl {
11161119
self.as_ref().copy_opts(from, to, options).await
11171120
}
11181121

1119-
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
1120-
self.as_ref().rename(from, to).await
1121-
}
1122-
1123-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1124-
self.as_ref().rename_if_not_exists(from, to).await
1122+
async fn rename_opts(
1123+
&self,
1124+
from: &Path,
1125+
to: &Path,
1126+
options: RenameOptions,
1127+
) -> Result<()> {
1128+
self.as_ref().rename_opts(from, to, options).await
11251129
}
11261130
}
11271131
};
@@ -1238,6 +1242,19 @@ pub trait ObjectStoreExt: ObjectStore {
12381242
/// If atomic operations are not supported by the underlying object storage (like S3)
12391243
/// it will return an error.
12401244
fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1245+
1246+
/// Move an object from one path to another in the same object store.
1247+
///
1248+
/// By default, this is implemented as a copy and then delete source. It may not
1249+
/// check when deleting source that it was the same object that was originally copied.
1250+
///
1251+
/// If there exists an object at the destination, it will be overwritten.
1252+
fn rename(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1253+
1254+
/// Move an object from one path to another in the same object store.
1255+
///
1256+
/// Will return an error if the destination already has an object.
1257+
fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
12411258
}
12421259

12431260
impl<T> ObjectStoreExt for T
@@ -1277,6 +1294,16 @@ where
12771294
let options = CopyOptions::new().with_mode(CopyMode::Create);
12781295
self.copy_opts(from, to, options).await
12791296
}
1297+
1298+
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
1299+
let options = RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
1300+
self.rename_opts(from, to, options).await
1301+
}
1302+
1303+
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1304+
let options = RenameOptions::new().with_target_mode(RenameTargetMode::Create);
1305+
self.rename_opts(from, to, options).await
1306+
}
12801307
}
12811308

12821309
/// Result of a list call that includes objects, prefixes (directories) and a
@@ -1828,6 +1855,76 @@ impl PartialEq<Self> for CopyOptions {
18281855

18291856
impl Eq for CopyOptions {}
18301857

1858+
/// Configure preconditions for the target of rename operation.
1859+
///
1860+
/// Note though that the source location may or not be deleted at the same time in an atomic operation. There is
1861+
/// currently NO flag to control the atomicity of "delete source at the same time as creating the target".
1862+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1863+
pub enum RenameTargetMode {
1864+
/// Perform a write operation on the target, overwriting any object present at the provided path.
1865+
#[default]
1866+
Overwrite,
1867+
/// Perform an atomic write operation of the target, returning [`Error::AlreadyExists`] if an
1868+
/// object already exists at the provided path.
1869+
Create,
1870+
}
1871+
1872+
/// Options for a rename request
1873+
#[derive(Debug, Clone, Default)]
1874+
pub struct RenameOptions {
1875+
/// Configure the [`RenameTargetMode`] for this operation
1876+
pub target_mode: RenameTargetMode,
1877+
/// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1878+
/// that need to pass context-specific information (like tracing spans) via trait methods.
1879+
///
1880+
/// These extensions are ignored entirely by backends offered through this crate.
1881+
///
1882+
/// They are also excluded from [`PartialEq`] and [`Eq`].
1883+
pub extensions: Extensions,
1884+
}
1885+
1886+
impl RenameOptions {
1887+
/// Create a new [`RenameOptions`]
1888+
pub fn new() -> Self {
1889+
Self::default()
1890+
}
1891+
1892+
/// Sets the `target_mode=.
1893+
///
1894+
/// See [`RenameOptions::target_mode`].
1895+
#[must_use]
1896+
pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
1897+
self.target_mode = target_mode;
1898+
self
1899+
}
1900+
1901+
/// Sets the `extensions`.
1902+
///
1903+
/// See [`RenameOptions::extensions`].
1904+
#[must_use]
1905+
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1906+
self.extensions = extensions;
1907+
self
1908+
}
1909+
}
1910+
1911+
impl PartialEq<Self> for RenameOptions {
1912+
fn eq(&self, other: &Self) -> bool {
1913+
let Self {
1914+
target_mode,
1915+
extensions: _,
1916+
} = self;
1917+
let Self {
1918+
target_mode: target_mode_other,
1919+
extensions: _,
1920+
} = other;
1921+
1922+
target_mode == target_mode_other
1923+
}
1924+
}
1925+
1926+
impl Eq for RenameOptions {}
1927+
18311928
/// A specialized `Result` for object store-related errors
18321929
pub type Result<T, E = Error> = std::result::Result<T, E>;
18331930

src/limit.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
2020
use crate::{
2121
BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
22-
ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
23-
StreamExt, UploadPart,
22+
ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult,
23+
RenameOptions, Result, StreamExt, UploadPart,
2424
};
2525
use async_trait::async_trait;
2626
use bytes::Bytes;
@@ -156,14 +156,9 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
156156
self.inner.copy_opts(from, to, options).await
157157
}
158158

159-
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
159+
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
160160
let _permit = self.semaphore.acquire().await.unwrap();
161-
self.inner.rename(from, to).await
162-
}
163-
164-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
165-
let _permit = self.semaphore.acquire().await.unwrap();
166-
self.inner.rename_if_not_exists(from, to).await
161+
self.inner.rename_opts(from, to, options).await
167162
}
168163
}
169164

src/local.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::{
4040
path::{Path, absolute_path_to_url},
4141
util::InvalidGetRange,
4242
};
43-
use crate::{CopyMode, CopyOptions};
43+
use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
4444

4545
/// A specialized `Error` for filesystem object store-related errors
4646
#[derive(Debug, thiserror::Error)]
@@ -610,24 +610,52 @@ impl ObjectStore for LocalFileSystem {
610610
}
611611
}
612612

613-
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
614-
let from = self.path_to_filesystem(from)?;
615-
let to = self.path_to_filesystem(to)?;
616-
maybe_spawn_blocking(move || {
617-
loop {
618-
match std::fs::rename(&from, &to) {
619-
Ok(_) => return Ok(()),
620-
Err(source) => match source.kind() {
621-
ErrorKind::NotFound => match from.exists() {
622-
true => create_parent_dirs(&to, source)?,
623-
false => return Err(Error::NotFound { path: from, source }.into()),
624-
},
625-
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
613+
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
614+
let RenameOptions {
615+
target_mode,
616+
extensions,
617+
} = options;
618+
619+
match target_mode {
620+
// optimized implementation
621+
RenameTargetMode::Overwrite => {
622+
let from = self.path_to_filesystem(from)?;
623+
let to = self.path_to_filesystem(to)?;
624+
maybe_spawn_blocking(move || {
625+
loop {
626+
match std::fs::rename(&from, &to) {
627+
Ok(_) => return Ok(()),
628+
Err(source) => match source.kind() {
629+
ErrorKind::NotFound => match from.exists() {
630+
true => create_parent_dirs(&to, source)?,
631+
false => {
632+
return Err(Error::NotFound { path: from, source }.into());
633+
}
634+
},
635+
_ => {
636+
return Err(Error::UnableToCopyFile { from, to, source }.into());
637+
}
638+
},
639+
}
640+
}
641+
})
642+
.await
643+
}
644+
// fall-back to copy & delete
645+
RenameTargetMode::Create => {
646+
self.copy_opts(
647+
from,
648+
to,
649+
CopyOptions {
650+
mode: CopyMode::Create,
651+
extensions,
626652
},
627-
}
653+
)
654+
.await?;
655+
self.delete(from).await?;
656+
Ok(())
628657
}
629-
})
630-
.await
658+
}
631659
}
632660
}
633661

src/prefix.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::ops::Range;
2323
use crate::path::Path;
2424
use crate::{
2525
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
26-
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
26+
PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result,
2727
};
2828

2929
/// Store wrapper that applies a constant prefix to all paths handled by the store.
@@ -188,16 +188,10 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
188188
self.inner.copy_opts(&full_from, &full_to, options).await
189189
}
190190

191-
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
191+
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
192192
let full_from = self.full_path(from);
193193
let full_to = self.full_path(to);
194-
self.inner.rename(&full_from, &full_to).await
195-
}
196-
197-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
198-
let full_from = self.full_path(from);
199-
let full_to = self.full_path(to);
200-
self.inner.rename_if_not_exists(&full_from, &full_to).await
194+
self.inner.rename_opts(&full_from, &full_to, options).await
201195
}
202196
}
203197

src/throttle.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::ops::Range;
2121
use std::{convert::TryInto, sync::Arc};
2222

2323
use crate::multipart::{MultipartStore, PartId};
24-
use crate::{CopyOptions, GetOptions, UploadPart};
24+
use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart};
2525
use crate::{
2626
GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
2727
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
@@ -261,16 +261,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
261261
self.inner.copy_opts(from, to, options).await
262262
}
263263

264-
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
264+
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
265265
sleep(self.config().wait_put_per_call).await;
266266

267-
self.inner.rename(from, to).await
268-
}
269-
270-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
271-
sleep(self.config().wait_put_per_call).await;
272-
273-
self.inner.rename_if_not_exists(from, to).await
267+
self.inner.rename_opts(from, to, options).await
274268
}
275269
}
276270

0 commit comments

Comments
 (0)