Skip to content

Commit

Permalink
handle indexes outsize of the upload
Browse files Browse the repository at this point in the history
  • Loading branch information
fsdvh committed Oct 16, 2024
1 parent 5e2c81c commit 58c9e6e
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 61 deletions.
20 changes: 8 additions & 12 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ impl ObjectStore for AmazonS3 {
let upload_id = self.client.create_multipart(location, opts).await?;

Ok(Box::new(S3MultiPartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
Expand Down Expand Up @@ -320,7 +319,6 @@ impl ObjectStore for AmazonS3 {

#[derive(Debug)]
struct S3MultiPartUpload {
part_idx: usize,
state: Arc<UploadState>,
}

Expand All @@ -334,9 +332,7 @@ struct UploadState {

#[async_trait]
impl MultipartUpload for S3MultiPartUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart {
let len = data.content_length();
let state = Arc::clone(&self.state);
println!(
Expand All @@ -357,11 +353,11 @@ impl MultipartUpload for S3MultiPartUpload {
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;
async fn complete(&mut self, idx: usize) -> Result<PutResult> {
let parts = self.state.parts.finish(idx)?;
println!(
"completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}",
self.state.upload_id, self.part_idx, self.state.location, parts
self.state.upload_id, idx, self.state.location, parts
);

self.state
Expand All @@ -372,8 +368,8 @@ impl MultipartUpload for S3MultiPartUpload {

async fn abort(&mut self) -> Result<()> {
println!(
"aborting multipart upload, upload_id: {}, part_id: {}, location: {:?}",
self.state.upload_id, self.part_idx, self.state.location
"aborting multipart upload, upload_id: {}, location: {:?}",
self.state.upload_id, self.state.location
);
self.state
.client
Expand Down Expand Up @@ -613,8 +609,8 @@ mod tests {
store.copy(&locations[0], &locations[1]).await.unwrap();

let mut upload = store.put_multipart(&locations[2]).await.unwrap();
upload.put_part(data.clone()).await.unwrap();
upload.complete().await.unwrap();
upload.put_part(1, data.clone()).await.unwrap();
upload.complete(1).await.unwrap();

for location in &locations {
let res = store
Expand Down
10 changes: 3 additions & 7 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ impl ObjectStore for MicrosoftAzure {
opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>> {
Ok(Box::new(AzureMultiPartUpload {
part_idx: 0,
opts,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
Expand Down Expand Up @@ -199,7 +198,6 @@ impl Signer for MicrosoftAzure {
/// abort -> No equivalent; blocks are simply dropped after 7 days
#[derive(Debug)]
struct AzureMultiPartUpload {
part_idx: usize,
state: Arc<UploadState>,
opts: PutMultipartOpts,
}
Expand All @@ -213,9 +211,7 @@ struct UploadState {

#[async_trait]
impl MultipartUpload for AzureMultiPartUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart {
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state.client.put_block(&state.location, idx, data).await?;
Expand All @@ -224,8 +220,8 @@ impl MultipartUpload for AzureMultiPartUpload {
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;
async fn complete(&mut self, idx: usize) -> Result<PutResult> {
let parts = self.state.parts.finish(idx)?;

self.state
.client
Expand Down
10 changes: 3 additions & 7 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl GoogleCloudStorage {
#[derive(Debug)]
struct GCSMultipartUpload {
state: Arc<UploadState>,
part_idx: usize,
}

#[derive(Debug)]
Expand All @@ -115,9 +114,7 @@ struct UploadState {

#[async_trait]
impl MultipartUpload for GCSMultipartUpload {
fn put_part(&mut self, payload: PutPayload) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
fn put_part(&mut self, idx: usize, payload: PutPayload) -> UploadPart {
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state
Expand All @@ -129,8 +126,8 @@ impl MultipartUpload for GCSMultipartUpload {
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;
async fn complete(&mut self, idx: usize) -> Result<PutResult> {
let parts = self.state.parts.finish(idx)?;

self.state
.client
Expand Down Expand Up @@ -165,7 +162,6 @@ impl ObjectStore for GoogleCloudStorage {
let upload_id = self.client.multipart_initiate(location, opts).await?;

Ok(Box::new(GCSMultipartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
path: location.clone(),
Expand Down
10 changes: 5 additions & 5 deletions object_store/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ pub async fn put_get_attributes(integration: &dyn ObjectStore) {
let opts = attributes.clone().into();
match integration.put_multipart_opts(&path, opts).await {
Ok(mut w) => {
w.put_part("foo".into()).await.unwrap();
w.complete().await.unwrap();
w.put_part(0, "foo".into()).await.unwrap();
w.complete(1).await.unwrap();

let r = integration.get(&path).await.unwrap();
assert_eq!(r.attributes, attributes);
Expand Down Expand Up @@ -755,7 +755,7 @@ pub async fn stream_get(storage: &DynObjectStore) {
let data = get_chunks(5 * 1024 * 1024, 3);
let bytes_expected = data.concat();
let mut upload = storage.put_multipart(&location).await.unwrap();
let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
let uploads = data.into_iter().map(|x| upload.put_part(0, x.into()));
futures::future::try_join_all(uploads).await.unwrap();

// Object should not yet exist in store
Expand All @@ -772,7 +772,7 @@ pub async fn stream_get(storage: &DynObjectStore) {
let result = storage.list_with_delimiter(None).await.unwrap();
assert_eq!(&result.objects, &[]);

upload.complete().await.unwrap();
upload.complete(1).await.unwrap();

let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
Expand Down Expand Up @@ -826,7 +826,7 @@ pub async fn stream_get(storage: &DynObjectStore) {
// We can abort an in-progress write
let mut upload = storage.put_multipart(&location).await.unwrap();
upload
.put_part(data.first().unwrap().clone().into())
.put_part(0, data.first().unwrap().clone().into())
.await
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1391,8 +1391,8 @@ mod tests {
.await
.unwrap();

write.put_part("foo".into()).await.unwrap();
write.complete().await.unwrap();
write.put_part(0, "foo".into()).await.unwrap();
write.complete(1).await.unwrap();

let buf_path = Path::from("tag_test_buf");
let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,18 @@ impl LimitUpload {

#[async_trait]
impl MultipartUpload for LimitUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let upload = self.upload.put_part(data);
fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart {
let upload = self.upload.put_part(idx, data);
let s = Arc::clone(&self.semaphore);
Box::pin(async move {
let _permit = s.acquire().await.unwrap();
upload.await
})
}

async fn complete(&mut self) -> Result<PutResult> {
async fn complete(&mut self, num_parts: usize) -> Result<PutResult> {
let _permit = self.semaphore.acquire().await.unwrap();
self.upload.complete().await
self.upload.complete(num_parts).await
}

async fn abort(&mut self) -> Result<()> {
Expand Down
16 changes: 8 additions & 8 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ impl LocalUpload {

#[async_trait]
impl MultipartUpload for LocalUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
fn put_part(&mut self, _idx: usize, data: PutPayload) -> UploadPart {
let offset = self.offset;
self.offset += data.content_length() as u64;

Expand All @@ -809,7 +809,7 @@ impl MultipartUpload for LocalUpload {
.boxed()
}

async fn complete(&mut self) -> Result<PutResult> {
async fn complete(&mut self, _num_parts: usize) -> Result<PutResult> {
let src = self.src.take().context(AbortedSnafu)?;
let s = Arc::clone(&self.state);
maybe_spawn_blocking(move || {
Expand Down Expand Up @@ -1097,9 +1097,9 @@ mod tests {
// Can't use stream_get test as WriteMultipart uses a tokio JoinSet
let p = Path::from("manual_upload");
let mut upload = integration.put_multipart(&p).await.unwrap();
upload.put_part("123".into()).await.unwrap();
upload.put_part("45678".into()).await.unwrap();
let r = upload.complete().await.unwrap();
upload.put_part(0, "123".into()).await.unwrap();
upload.put_part(1, "45678".into()).await.unwrap();
let r = upload.complete(2).await.unwrap();

let get = integration.get(&p).await.unwrap();
assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap());
Expand Down Expand Up @@ -1406,10 +1406,10 @@ mod tests {

let data = PutPayload::from("arbitrary data");
let mut u1 = integration.put_multipart(&location).await.unwrap();
u1.put_part(data.clone()).await.unwrap();
u1.put_part(0, data.clone()).await.unwrap();

let mut u2 = integration.put_multipart(&location).await.unwrap();
u2.put_part(data).await.unwrap();
u2.put_part(1, data).await.unwrap();

let list = flatten_list_stream(&integration, None).await.unwrap();
assert_eq!(list.len(), 0);
Expand Down Expand Up @@ -1566,7 +1566,7 @@ mod not_wasm_tests {
let location = Path::from("some_file");
let data = PutPayload::from_static(b"hello");
let mut upload = integration.put_multipart(&location).await.unwrap();
upload.put_part(data).await.unwrap();
upload.put_part(0, data).await.unwrap();

let file_count = std::fs::read_dir(root.path()).unwrap().count();
assert_eq!(file_count, 1);
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,12 @@ struct InMemoryUpload {

#[async_trait]
impl MultipartUpload for InMemoryUpload {
fn put_part(&mut self, payload: PutPayload) -> UploadPart {
fn put_part(&mut self, _idx: usize, payload: PutPayload) -> UploadPart {
self.parts.push(payload);
Box::pin(futures::future::ready(Ok(())))
}

async fn complete(&mut self) -> Result<PutResult> {
async fn complete(&mut self, _num_parts: usize) -> Result<PutResult> {
let cap = self.parts.iter().map(|x| x.content_length()).sum();
let mut buf = Vec::with_capacity(cap);
let parts = self.parts.iter().flatten();
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,17 @@ struct ThrottledUpload {

#[async_trait]
impl MultipartUpload for ThrottledUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart {
let duration = self.sleep;
let put = self.upload.put_part(data);
let put = self.upload.put_part(idx, data);
Box::pin(async move {
sleep(duration).await;
put.await
})
}

async fn complete(&mut self) -> Result<PutResult> {
self.upload.complete().await
async fn complete(&mut self, num_parts: usize) -> Result<PutResult> {
self.upload.complete(num_parts).await
}

async fn abort(&mut self) -> Result<()> {
Expand Down
25 changes: 15 additions & 10 deletions object_store/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ pub trait MultipartUpload: Send + std::fmt::Debug {
/// ```
///
/// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
fn put_part(&mut self, data: PutPayload) -> UploadPart;
fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart;

/// Complete the multipart upload
///
/// It is implementation defined behaviour if this method is called before polling
/// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to completion. Additionally,
/// it is implementation defined behaviour to call [`MultipartUpload::complete`]
/// on an already completed or aborted [`MultipartUpload`].
async fn complete(&mut self) -> Result<PutResult>;
async fn complete(&mut self, num_parts: usize) -> Result<PutResult>;

/// Abort the multipart upload
///
Expand All @@ -94,12 +94,12 @@ pub trait MultipartUpload: Send + std::fmt::Debug {

#[async_trait]
impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
(**self).put_part(data)
fn put_part(&mut self, idx: usize, data: PutPayload) -> UploadPart {
(**self).put_part(idx, data)
}

async fn complete(&mut self) -> Result<PutResult> {
(**self).complete().await
async fn complete(&mut self, num_parts: usize) -> Result<PutResult> {
(**self).complete(num_parts).await
}

async fn abort(&mut self) -> Result<()> {
Expand All @@ -118,6 +118,8 @@ impl<W: MultipartUpload + ?Sized> MultipartUpload for Box<W> {
/// [`Sink`]: futures::sink::Sink
#[derive(Debug)]
pub struct WriteMultipart {
idx: usize,

upload: Box<dyn MultipartUpload>,

buffer: PutPayloadMut,
Expand All @@ -136,6 +138,7 @@ impl WriteMultipart {
/// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
Self {
idx: 0,
upload,
chunk_size,
buffer: PutPayloadMut::new(),
Expand Down Expand Up @@ -214,7 +217,9 @@ impl WriteMultipart {
}

pub(crate) fn put_part(&mut self, part: PutPayload) {
self.tasks.spawn(self.upload.put_part(part));
let idx = self.idx;
self.idx += 1;
self.tasks.spawn(self.upload.put_part(idx, part));
}

/// Abort this upload, attempting to clean up any successfully uploaded parts
Expand All @@ -236,7 +241,7 @@ impl WriteMultipart {

self.wait_for_capacity(0).await?;

match self.upload.complete().await {
match self.upload.complete(self.idx).await {
Err(e) => {
self.tasks.shutdown().await;
self.upload.abort().await?;
Expand Down Expand Up @@ -290,12 +295,12 @@ mod tests {

#[async_trait]
impl MultipartUpload for InstrumentedUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
fn put_part(&mut self, _idx: usize, data: PutPayload) -> UploadPart {
self.chunks.lock().push(data);
futures::future::ready(Ok(())).boxed()
}

async fn complete(&mut self) -> Result<PutResult> {
async fn complete(&mut self, _idx: usize) -> Result<PutResult> {
Ok(PutResult {
e_tag: None,
version: None,
Expand Down

0 comments on commit 58c9e6e

Please sign in to comment.