Skip to content

Commit 01182a4

Browse files
committed
Increase the default multipart target size
1 parent 773bb6c commit 01182a4

File tree

6 files changed

+37
-25
lines changed

6 files changed

+37
-25
lines changed

quickwit/quickwit-storage/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub use self::object_storage::{
7474
#[cfg(feature = "gcs")]
7575
pub use self::opendal_storage::GoogleCloudStorageFactory;
7676
#[cfg(all(feature = "gcs", feature = "integration-testsuite"))]
77-
pub use self::opendal_storage::{DummyTokenLoader, new_emulated_google_cloud_storage};
77+
pub use self::opendal_storage::test_credentials;
7878
pub use self::ram_storage::{RamStorage, RamStorageBuilder};
7979
pub use self::split::{SplitPayload, SplitPayloadBuilder};
8080
#[cfg(any(test, feature = "testsuite"))]

quickwit/quickwit-storage/src/object_storage/policy.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ impl MultiPartPolicy {
6363
}
6464
}
6565

66-
// Default values from https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
67-
// The best default value may however differ depending on vendors.
6866
impl Default for MultiPartPolicy {
6967
fn default() -> Self {
7068
MultiPartPolicy {

quickwit/quickwit-storage/src/opendal_storage/base.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,19 @@ use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
2626
use crate::metrics::object_storage_get_slice_in_flight_guards;
2727
use crate::storage::SendableAsync;
2828
use crate::{
29-
BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind,
30-
StorageResolverError, StorageResult,
29+
BulkDeleteError, MultiPartPolicy, OwnedBytes, PutPayload, Storage, StorageError,
30+
StorageErrorKind, StorageResolverError, StorageResult,
3131
};
3232

3333
/// OpenDAL based storage implementation.
3434
/// # TODO
3535
///
3636
/// - Implement REQUEST_SEMAPHORE to control the concurrency.
3737
/// - Implement STORAGE_METRICS for metrics.
38-
/// - Add multipart_policy to control write at once or via multiple.
39-
#[derive(Clone)]
4038
pub struct OpendalStorage {
4139
uri: Uri,
4240
op: Operator,
41+
multipart_policy: MultiPartPolicy,
4342
}
4443

4544
impl fmt::Debug for OpendalStorage {
@@ -58,7 +57,16 @@ impl OpendalStorage {
5857
cfg: opendal::services::Gcs,
5958
) -> Result<Self, StorageResolverError> {
6059
let op = Operator::new(cfg)?.finish();
61-
Ok(Self { uri, op })
60+
Ok(Self {
61+
uri,
62+
op,
63+
// limits are the same as on S3
64+
multipart_policy: MultiPartPolicy::default(),
65+
})
66+
}
67+
68+
pub fn set_policy(&mut self, multipart_policy: MultiPartPolicy) {
69+
self.multipart_policy = multipart_policy;
6270
}
6371
}
6472

@@ -69,17 +77,14 @@ impl Storage for OpendalStorage {
6977
Ok(())
7078
}
7179

72-
/// # TODO
73-
///
74-
/// We can implement something like `multipart_policy` determine whether to use copy.
75-
/// If the payload is small enough, we can call `op.write()` at once.
7680
async fn put(&self, path: &Path, payload: Box<dyn PutPayload>) -> StorageResult<()> {
7781
let path = path.as_os_str().to_string_lossy();
7882
let mut payload_reader = payload.byte_stream().await?.into_async_read();
7983

8084
let mut storage_writer = self
8185
.op
8286
.writer_with(&path)
87+
.chunk(self.multipart_policy.part_num_bytes(payload.len()) as usize)
8388
.await?
8489
.into_futures_async_write()
8590
.compat_write();

quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,14 @@ impl StorageFactory for GoogleCloudStorageFactory {
5050
}
5151
}
5252

53+
/// Helpers to configure the GCP local test setup.
5354
#[cfg(feature = "integration-testsuite")]
54-
pub mod test_credentials {
55+
pub mod test_config_helpers {
5556
use super::*;
5657

58+
/// URL of the local GCP emulator.
59+
pub const LOCAL_GCP_EMULATOR_ENDPOINT: &str = "http://127.0.0.1:4443";
60+
5761
/// reqsign::GoogleTokenLoad implementation for testing.
5862
#[derive(Debug)]
5963
pub struct DummyTokenLoader;
@@ -78,8 +82,8 @@ pub mod test_credentials {
7882
let cfg = opendal::services::Gcs::default()
7983
.bucket(&bucket)
8084
.root(&root.to_string_lossy())
81-
.endpoint("http://127.0.0.1:4443")
82-
.customized_token_loader(Box::new(test_credentials::DummyTokenLoader));
85+
.endpoint(LOCAL_GCP_EMULATOR_ENDPOINT)
86+
.customized_token_loader(Box::new(test_config_helpers::DummyTokenLoader));
8387
let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?;
8488
Ok(store)
8589
}

quickwit/quickwit-storage/src/opendal_storage/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,4 @@ mod google_cloud_storage;
1919

2020
pub use google_cloud_storage::GoogleCloudStorageFactory;
2121
#[cfg(feature = "integration-testsuite")]
22-
pub use google_cloud_storage::test_credentials::{
23-
DummyTokenLoader, new_emulated_google_cloud_storage,
24-
};
22+
pub use google_cloud_storage::test_config_helpers;

quickwit/quickwit-storage/tests/google_cloud_storage.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,19 @@
1717

1818
#[cfg(all(feature = "integration-testsuite", feature = "gcs"))]
1919
#[cfg_attr(not(feature = "ci-test"), ignore)]
20-
mod tests {
20+
mod gcp_storage_test_suite {
2121
use std::str::FromStr;
2222

2323
use anyhow::Context;
2424
use quickwit_common::rand::append_random_suffix;
2525
use quickwit_common::setup_logging_for_tests;
2626
use quickwit_common::uri::Uri;
27-
use quickwit_storage::{DummyTokenLoader, new_emulated_google_cloud_storage};
27+
use quickwit_storage::test_config_helpers::{
28+
DummyTokenLoader, LOCAL_GCP_EMULATOR_ENDPOINT, new_emulated_google_cloud_storage,
29+
};
2830
use reqsign::GoogleTokenLoad;
2931

30-
pub async fn sign_gcs_requet(req: &mut reqwest::Request) -> anyhow::Result<()> {
32+
pub async fn sign_gcs_request(req: &mut reqwest::Request) -> anyhow::Result<()> {
3133
let client = reqwest::Client::new();
3234
let token = DummyTokenLoader
3335
.load(client.clone())
@@ -42,18 +44,16 @@ mod tests {
4244

4345
async fn create_gcs_bucket(bucket_name: &str) -> anyhow::Result<()> {
4446
let client = reqwest::Client::new();
45-
let url = "http://127.0.0.1:4443/storage/v1/b";
47+
let url = format!("{LOCAL_GCP_EMULATOR_ENDPOINT}/storage/v1/b");
4648
let mut request = client
4749
.post(url)
4850
.body(serde_json::to_vec(&serde_json::json!({
4951
"name": bucket_name,
50-
// "location": "us-central1",
51-
// "storageClass": "STANDARD",
5252
}))?)
5353
.header(reqwest::header::CONTENT_TYPE, "application/json")
5454
.build()?;
5555

56-
sign_gcs_requet(&mut request).await?;
56+
sign_gcs_request(&mut request).await?;
5757

5858
let response = client.execute(request).await?;
5959

@@ -90,6 +90,13 @@ mod tests {
9090
// supported in the emulated GCS server
9191
// (https://github.com/fsouza/fake-gcs-server/pull/1164)
9292

93+
// object_storage.set_policy(MultiPartPolicy {
94+
// target_part_num_bytes: 5 * 1_024 * 1_024,
95+
// max_num_parts: 10_000,
96+
// multipart_threshold_num_bytes: 10_000_000,
97+
// max_object_num_bytes: 5_000_000_000_000,
98+
// max_concurrent_uploads: 100,
99+
// });
93100
// quickwit_storage::storage_test_multi_part_upload(&mut object_storage)
94101
// .await
95102
// .context("test multipart upload failed")?;

0 commit comments

Comments
 (0)