Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object store 0.10.2 debug #54

Open
wants to merge 19 commits into
base: object-store-0.10.2-cx1
Choose a base branch
from
3 changes: 2 additions & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ impl S3Client {
data: PutPayload,
) -> Result<PartId> {
let part = (part_idx + 1).to_string();
let size = data.content_length();

let response = self
.request(Method::PUT, path)
Expand All @@ -571,7 +572,7 @@ impl S3Client {
.await?;

let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
Ok(PartId { content_id })
Ok(PartId { content_id, size })
}

pub async fn complete_multipart(
Expand Down
69 changes: 68 additions & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use std::cmp::Ordering;
use std::{sync::Arc, time::Duration};
use url::Url;

Expand Down Expand Up @@ -334,21 +335,34 @@ struct UploadState {
#[async_trait]
impl MultipartUpload for S3MultiPartUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let len = data.content_length();
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
println!(
"uploading part: {}, location: {:?}, size: {}, upload_id: {}",
idx, state.location, len, state.upload_id
);
Box::pin(async move {
let part = state
.client
.put_part(&state.location, &state.upload_id, idx, data)
.await?;
state.parts.put(idx, part);
println!(
"uploaded part: {}, location: {:?}, upload_id: {}, size: {}",
idx, state.location, state.upload_id, len
);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;
let parts = self.state.parts.finish_sorted_by(self.part_idx, sort)?;
println!(
"completing multipart upload, upload_id: {}, part_id: {}, location: {:?}, parts: {:?}",
self.state.upload_id, self.part_idx, self.state.location, parts
);

self.state
.client
Expand All @@ -357,6 +371,10 @@ impl MultipartUpload for S3MultiPartUpload {
}

async fn abort(&mut self) -> Result<()> {
println!(
"aborting multipart upload, upload_id: {}, part_id: {}, location: {:?}",
self.state.upload_id, self.part_idx, self.state.location
);
self.state
.client
.request(Method::DELETE, &self.state.location)
Expand Down Expand Up @@ -406,6 +424,15 @@ impl MultipartStore for AmazonS3 {
}
}

// smaller parts last, everything else by index
fn sort(a: &(usize, PartId), b: &(usize, PartId)) -> Ordering {
if a.1.size == b.1.size {
a.0.cmp(&b.0)
} else {
b.1.size.cmp(&a.1.size)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -605,4 +632,44 @@ mod tests {
store.delete(location).await.unwrap();
}
}

#[test]
fn test_sort() {
let mut parts = [
(
1,
PartId {
content_id: "1".to_string(),
size: 100,
},
),
(
2,
PartId {
content_id: "2".to_string(),
size: 50,
},
),
(
3,
PartId {
content_id: "3".to_string(),
size: 100,
},
),
(
4,
PartId {
content_id: "4".to_string(),
size: 100,
},
),
];
parts.sort_unstable_by(sort);

assert_eq!(parts[0].1.content_id, "1");
assert_eq!(parts[1].1.content_id, "3");
assert_eq!(parts[2].1.content_id, "4");
assert_eq!(parts[3].1.content_id, "2");
}
}
3 changes: 2 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,15 @@ impl AzureClient {
) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id = BASE64_STANDARD.encode(&content_id);
let size = payload.content_length();

self.put_request(path, payload)
.query(&[("comp", "block"), ("blockid", &block_id)])
.idempotent(true)
.send()
.await?;

Ok(PartId { content_id })
Ok(PartId { content_id, size })
}

/// PUT a block list <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list>
Expand Down
18 changes: 17 additions & 1 deletion object_store/src/client/parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::cmp::Ordering;

use crate::multipart::PartId;
use parking_lot::Mutex;

Expand All @@ -35,14 +37,28 @@ impl Parts {
///
/// `expected` is the number of parts expected in the final result
pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<PartId>> {
self.finish_sorted_by(expected, |(idx_a, _), (idx_b, _)| idx_a.cmp(idx_b))
}

/// Produce the final list of [`PartId`] ordered by sorting function
///
/// `expected` is the number of parts expected in the final result
pub(crate) fn finish_sorted_by<F>(
&self,
expected: usize,
sort_by: F,
) -> crate::Result<Vec<PartId>>
where
F: FnMut(&(usize, PartId), &(usize, PartId)) -> Ordering,
{
let mut parts = self.0.lock();
if parts.len() != expected {
return Err(crate::Error::Generic {
store: "Parts",
source: "Missing part".to_string().into(),
});
}
parts.sort_unstable_by_key(|(idx, _)| *idx);
parts.sort_unstable_by(sort_by);
Ok(parts.drain(..).map(|(_, v)| v).collect())
}
}
2 changes: 2 additions & 0 deletions object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl GoogleCloudStorageClient {
("partNumber", &format!("{}", part_idx + 1)),
("uploadId", upload_id),
];
let size = data.content_length();
let result = self
.request(Method::PUT, path)
.with_payload(data)
Expand All @@ -419,6 +420,7 @@ impl GoogleCloudStorageClient {

Ok(PartId {
content_id: result.e_tag.unwrap(),
size,
})
}

Expand Down
2 changes: 2 additions & 0 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,14 @@ impl MultipartStore for InMemory {
) -> Result<PartId> {
let mut storage = self.storage.write();
let upload = storage.upload_mut(id)?;
let size = payload.content_length();
if part_idx <= upload.parts.len() {
upload.parts.resize(part_idx + 1, None);
}
upload.parts[part_idx] = Some(payload.into());
Ok(PartId {
content_id: Default::default(),
size,
})
}

Expand Down
2 changes: 2 additions & 0 deletions object_store/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use crate::{MultipartId, PutPayload, PutResult, Result};
pub struct PartId {
/// Id of this part
pub content_id: String,
/// Size of this part
pub size: usize,
fsdvh marked this conversation as resolved.
Show resolved Hide resolved
}

/// A low-level interface for interacting with multipart upload APIs
Expand Down