diff --git a/Cargo.lock b/Cargo.lock index 66815f7a3a..0a2d3a0f76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,6 +876,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backon" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0" +dependencies = [ + "fastrand 2.1.0", + "futures-core", + "pin-project", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.67" @@ -1208,9 +1220,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.5.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "bytes_ext" @@ -1713,9 +1725,9 @@ checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" [[package]] name = "crc32c" -version = "0.6.3" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dfea2db42e9927a3845fb268a10a72faed6d416065f77873f05e411457c363e" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" dependencies = [ "rustc_version", ] @@ -2381,6 +2393,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -2406,6 +2419,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -2548,6 +2570,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" + [[package]] name = "filedescriptor" version = "0.8.2" @@ -2577,6 +2605,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flagset" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" + [[package]] name = "flatbuffers" version = "23.1.21" @@ -2777,7 +2811,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" dependencies = [ - "fastrand", + "fastrand 1.9.0", "futures-core", "futures-io", "memchr", @@ -2878,8 +2912,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if 1.0.0", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -3069,6 +3105,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "horaectl" version = "2.0.0" @@ -3078,7 +3123,7 @@ dependencies = [ "clap", "lazy_static", "prettytable", - "reqwest 0.11.24", + "reqwest 0.12.4", "serde", "shell-words", "tokio", @@ -3143,7 +3188,7 @@ dependencies = [ "async-trait", "horaedb-client", "local-ip-address", - "reqwest 0.11.24", + "reqwest 0.12.4", "serde", "sqlness", "tokio", @@ -4111,7 +4156,7 @@ dependencies = [ "logger", "macros", "prost 0.11.8", - "reqwest 0.11.24", + "reqwest 0.12.4", "serde", "serde_json", "snafu 0.6.10", @@ -4620,22 +4665,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" dependencies = [ "async-trait", - "base64 0.22.1", "bytes", "chrono", "futures 0.3.28", "humantime 2.1.0", - "hyper 1.3.1", "itertools 0.12.0", - "md-5", "parking_lot 0.12.1", "percent-encoding", - "quick-xml 0.31.0", - "rand 0.8.5", - "reqwest 0.12.4", - "ring 0.17.7", - "serde", - "serde_json", "snafu 0.7.4", "tokio", "tracing", @@ -4662,11 +4698,14 @@ dependencies = [ "macros", "notifier", "object_store 0.10.1", + "object_store_opendal", + "opendal", "partitioned_lock", "prometheus 0.12.0", "prometheus-static-metric", "prost 0.11.8", "rand 0.8.5", + "reqwest 0.12.4", "runtime", "serde", "serde_json", @@ -4680,6 +4719,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "object_store_opendal" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7e5902fc99e9fb9e32c93f6a67dc5cc0772dc0fb348e2ef4ce258b03666d034" +dependencies = [ + "async-trait", + "bytes", + "flagset", + "futures 0.3.28", + "futures-util", + "object_store 0.10.1", + "opendal", + "pin-project", + "tokio", +] + [[package]] name = "obkv-table-client-rs" version = "0.1.0" @@ -4739,6 +4795,36 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "opendal" +version = "0.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494" +dependencies = [ + "anyhow", + "async-trait", + "backon", + "base64 0.22.1", + "bytes", + "chrono", + "crc32c", + "flagset", + "futures 0.3.28", + "getrandom", + "http 1.1.0", + "log", + "md-5", + "once_cell", + "percent-encoding", + "quick-xml 0.36.1", + "reqsign", + "reqwest 0.12.4", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "opensrv-mysql" version = "0.1.0" @@ -4753,12 +4839,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "openssl-probe" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" - [[package]] name = "ordered-float" version = "2.10.0" @@ -4768,6 +4848,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.0", +] + [[package]] name = "overload" version = "0.1.1" @@ -5101,22 +5191,22 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.0.12" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.12" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.48", ] [[package]] @@ -5768,9 +5858,19 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.31.0" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +checksum = "86e446ed58cef1bbfe847bc2fda0e2e4ea9f0e57b90c507d4781292590d72a4e" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "quick-xml" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" dependencies = [ "memchr", "serde", @@ -6015,6 +6115,35 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "reqsign" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.1", + "chrono", + "form_urlencoded", + "getrandom", + "hex", + "hmac", + "home", + "http 1.1.0", + "log", + "once_cell", + "percent-encoding", + "quick-xml 0.35.0", + "rand 0.8.5", + "reqwest 0.12.4", + "rust-ini", + "serde", + "serde_json", + "sha1", + "sha2", +] + [[package]] name = "reqwest" version = "0.11.24" @@ -6081,7 +6210,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.22.2", - "rustls-native-certs", "rustls-pemfile 2.1.2", "rustls-pki-types", "serde", @@ -6097,6 +6225,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots 0.26.3", "winreg 0.52.0", ] @@ -6241,6 +6370,17 @@ dependencies = [ "time 0.1.43", ] +[[package]] +name = "rust-ini" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f" +dependencies = [ + "cfg-if 1.0.0", + "ordered-multimap", + "trim-in-place", +] + [[package]] name = "rust-sdk-test" version = "2.0.0" @@ -6346,19 +6486,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" -dependencies = [ - "openssl-probe", - "rustls-pemfile 2.1.2", - "rustls-pki-types", - "schannel", - "security-framework", -] - [[package]] name = "rustls-pemfile" version = "0.2.1" @@ -6464,15 +6591,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ece8e78b2f38ec51c51f5d475df0a7187ba5111b2a28bdc761ee05b075d40a71" -[[package]] -name = "schannel" -version = "0.1.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" -dependencies = [ - "windows-sys 0.52.0", -] - [[package]] name = "scheduled-thread-pool" version = "0.2.7" @@ -6529,29 +6647,6 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" -[[package]] -name = "security-framework" -version = "2.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" -dependencies = [ - "bitflags 1.3.2", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "semver" version = "1.0.17" @@ -7311,7 +7406,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" dependencies = [ "cfg-if 1.0.0", - "fastrand", + "fastrand 1.9.0", "redox_syscall 0.3.5", "rustix", "windows-sys 0.45.0", @@ -7921,6 +8016,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "triomphe" version = "0.1.8" @@ -8063,6 +8164,7 @@ checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ "getrandom", "rand 0.8.5", + "serde", "uuid-macro-internal", ] @@ -8326,6 +8428,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.0" diff --git a/Cargo.toml b/Cargo.toml index 4563acf5c2..c0e62ebf8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -160,9 +160,10 @@ query_frontend = { path = "src/query_frontend" } rand = "0.8.5" regex = "1" remote_engine_client = { path = "src/remote_engine_client" } -reqwest = { version = "0.11", default-features = false, features = [ +reqwest = { version = "0.12.4", default-features = false, features = [ "rustls-tls", "json", + "http2", ] } router = { path = "src/router" } runtime = { path = "src/components/runtime" } @@ -200,14 +201,14 @@ zstd = { version = "0.12", default-features = false } # This profile optimizes for good runtime performance. [profile.release] # reference: https://doc.rust-lang.org/rustc/codegen-options/index.html#codegen-units -codegen-units = 1 -debug = true +codegen-units = 1 +debug = true overflow-checks = true # This profile is used to produce a smaller (no symbols) binary with a little bit poorer performance, # but with a faster speed and low memory consumption required by compiling. [profile.release-slim] -inherits = "release" +inherits = "release" codegen-units = 16 -debug = false -strip = true +debug = false +strip = true diff --git a/Makefile b/Makefile index 6569ebf0ac..4480eede75 100644 --- a/Makefile +++ b/Makefile @@ -126,7 +126,7 @@ dev-setup: echo "Installing dependencies using Homebrew..." HOMEBREW_NO_AUTO_UPDATE=1 brew install git openssl protobuf cmake pre-commit cargo install cargo-udeps - cargo install cargo-sort + cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked else ifeq ($(shell uname), Linux) dev-setup: echo "Detecting Linux system..." @@ -137,7 +137,7 @@ dev-setup: sudo apt-get update; \ sudo apt install -y git gcc g++ libssl-dev pkg-config protobuf-compiler cmake pre-commit; \ cargo install cargo-udeps; \ - cargo install cargo-sort; \ + cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked; \ else \ echo "Error: Unsupported Linux distribution. Exiting..."; \ exit 1; \ diff --git a/src/analytic_engine/src/manifest/details.rs b/src/analytic_engine/src/manifest/details.rs index 7df80a4f9f..e49c3d82cc 100644 --- a/src/analytic_engine/src/manifest/details.rs +++ b/src/analytic_engine/src/manifest/details.rs @@ -653,7 +653,7 @@ mod tests { column_schema, datum::DatumKind, schema, schema::Schema, table::DEFAULT_SHARD_ID, }; use futures::future::BoxFuture; - use object_store::LocalFileSystem; + use object_store::local_file; use runtime::Runtime; use table_engine::table::{SchemaId, TableId, TableSeqGenerator}; use wal::rocksdb_impl::manager::Builder as WalBuilder; @@ -836,7 +836,9 @@ mod tests { .build() .unwrap(); - let object_store = LocalFileSystem::new_with_prefix(&self.dir).unwrap(); + let local_path = self.dir.to_string_lossy().to_string(); + let object_store = local_file::try_new_with_default(local_path).unwrap(); + ManifestImpl::open( self.options.clone(), Arc::new(manifest_wal), diff --git a/src/analytic_engine/src/setup.rs b/src/analytic_engine/src/setup.rs index be0d9354b0..ee16772985 100644 --- a/src/analytic_engine/src/setup.rs +++ b/src/analytic_engine/src/setup.rs @@ -25,10 +25,11 @@ use object_store::{ aliyun, config::{ObjectStoreOptions, StorageOptions}, disk_cache::DiskCacheStore, + local_file, mem_cache::{MemCache, MemCacheStore}, metrics::StoreWithMetrics, prefix::StoreWithPrefix, - s3, LocalFileSystem, ObjectStoreRef, + s3, ObjectStoreRef, }; use snafu::{ResultExt, Snafu}; use table_engine::engine::{EngineRuntimes, TableEngineRef}; @@ -61,6 +62,9 @@ pub enum Error { source: object_store::ObjectStoreError, }, + #[snafu(display("Failed to access object store by openDal , err:{}", source))] + OpenDal { source: object_store::OpenDalError }, + #[snafu(display("Failed to create dir for {}, err:{}", path, source))] CreateDir { path: String, @@ -192,27 +196,32 @@ fn open_storage( ) -> Pin> + Send>> { Box::pin(async move { let mut store = match opts.object_store { - ObjectStoreOptions::Local(local_opts) => { + ObjectStoreOptions::Local(mut local_opts) => { let data_path = Path::new(&local_opts.data_dir); - let sst_path = data_path.join(STORE_DIR_NAME); + let sst_path = data_path + .join(STORE_DIR_NAME) + .to_string_lossy() + .into_owned(); tokio::fs::create_dir_all(&sst_path) .await .context(CreateDir { - path: sst_path.to_string_lossy().into_owned(), + path: sst_path.clone(), })?; - let store = LocalFileSystem::new_with_prefix(sst_path).context(OpenObjectStore)?; + local_opts.data_dir = sst_path; + + let store: ObjectStoreRef = + Arc::new(local_file::try_new(&local_opts).context(OpenDal)?); Arc::new(store) as _ } ObjectStoreOptions::Aliyun(aliyun_opts) => { - let oss: ObjectStoreRef = - Arc::new(aliyun::try_new(&aliyun_opts).context(OpenObjectStore)?); - let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, oss); + let store: ObjectStoreRef = + Arc::new(aliyun::try_new(&aliyun_opts).context(OpenDal)?); + let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, store); Arc::new(store_with_prefix.context(OpenObjectStore)?) as _ } ObjectStoreOptions::S3(s3_option) => { - let oss: ObjectStoreRef = - Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?); - let store_with_prefix = StoreWithPrefix::new(s3_option.prefix, oss); + let store: ObjectStoreRef = Arc::new(s3::try_new(&s3_option).context(OpenDal)?); + let store_with_prefix = StoreWithPrefix::new(s3_option.prefix, store); Arc::new(store_with_prefix.context(OpenObjectStore)?) as _ } }; diff --git a/src/analytic_engine/src/sst/meta_data/cache.rs b/src/analytic_engine/src/sst/meta_data/cache.rs index 8ddaf48743..d90e71b089 100644 --- a/src/analytic_engine/src/sst/meta_data/cache.rs +++ b/src/analytic_engine/src/sst/meta_data/cache.rs @@ -180,7 +180,7 @@ mod tests { schema::Builder as CustomSchemaBuilder, time::{TimeRange, Timestamp}, }; - use object_store::{LocalFileSystem, ObjectStoreRef}; + use object_store::{local_file, ObjectStoreRef}; use parquet::{arrow::ArrowWriter, file::footer}; use parquet_ext::ParquetMetaData; @@ -329,7 +329,9 @@ mod tests { parquet_filter: None, column_values: None, }; - let store = Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap()); + + let local_path = temp_dir.as_ref().to_string_lossy().to_string(); + let store = Arc::new(local_file::try_new_with_default(local_path).unwrap()); write_parquet_file_with_metadata( store.clone(), parquet_file_path.as_path(), diff --git a/src/analytic_engine/src/sst/parquet/writer.rs b/src/analytic_engine/src/sst/parquet/writer.rs index 5fe669c5cb..732753b773 100644 --- a/src/analytic_engine/src/sst/parquet/writer.rs +++ b/src/analytic_engine/src/sst/parquet/writer.rs @@ -28,7 +28,10 @@ use datafusion::parquet::basic::Compression; use futures::StreamExt; use generic_error::BoxError; use logger::{debug, error}; -use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path, WriteMultipartRef}; +use object_store::{ + multi_part::{MultiUploadRef, MultiUploadWriter}, + ObjectStore, ObjectStoreRef, Path, +}; use snafu::{OptionExt, ResultExt}; use tokio::io::AsyncWrite; @@ -417,7 +420,7 @@ async fn write_metadata( Ok(buf_size) } -async fn multi_upload_abort(aborter: WriteMultipartRef) { +async fn multi_upload_abort(aborter: MultiUploadRef) { // The uploading file will be leaked if failed to abort. A repair command // will be provided to clean up the leaked files. if let Err(e) = aborter.lock().await.abort().await { @@ -589,7 +592,7 @@ mod tests { time::{TimeRange, Timestamp}, }; use futures::stream; - use object_store::LocalFileSystem; + use object_store::local_file; use runtime::{self, Runtime}; use table_engine::predicate::Predicate; use tempfile::tempdir; @@ -613,7 +616,7 @@ mod tests { fn test_parquet_build_and_read() { test_util::init_log_for_test(); - let runtime = Arc::new(runtime::Builder::default().build().unwrap()); + let runtime = Arc::new(runtime::Builder::default().enable_all().build().unwrap()); parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]); parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3, 3, 2]); parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 4, 4]); @@ -635,9 +638,8 @@ mod tests { column_stats: Default::default(), }; - let dir = tempdir().unwrap(); - let root = dir.path(); - let store: ObjectStoreRef = Arc::new(LocalFileSystem::new_with_prefix(root).unwrap()); + let root = tempdir().unwrap().as_ref().to_string_lossy().to_string(); + let store: ObjectStoreRef = Arc::new(local_file::try_new_with_default(root).unwrap()); let store_picker: ObjectStorePickerRef = Arc::new(store); let sst_file_path = Path::from("data.par"); diff --git a/src/analytic_engine/src/tests/util.rs b/src/analytic_engine/src/tests/util.rs index 7eab3c1be2..8fe0710624 100644 --- a/src/analytic_engine/src/tests/util.rs +++ b/src/analytic_engine/src/tests/util.rs @@ -510,6 +510,8 @@ impl Builder { disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), + max_retries: 3, + timeout: Default::default(), }), }, wal: WalConfig { @@ -588,6 +590,8 @@ impl Default for RocksDBEngineBuildContext { disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), + max_retries: 3, + timeout: Default::default(), }), }, wal: WalConfig { @@ -621,6 +625,8 @@ impl Clone for RocksDBEngineBuildContext { disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), + max_retries: 3, + timeout: Default::default(), }), }; @@ -685,6 +691,8 @@ impl Default for MemoryEngineBuildContext { disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), + max_retries: 3, + timeout: Default::default(), }), }, wal: WalConfig { diff --git a/src/benchmarks/src/merge_memtable_bench.rs b/src/benchmarks/src/merge_memtable_bench.rs index 7c9d9ba4ab..abf5f8f45a 100644 --- a/src/benchmarks/src/merge_memtable_bench.rs +++ b/src/benchmarks/src/merge_memtable_bench.rs @@ -45,7 +45,7 @@ use common_types::{ projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema, time::TimeRange, }; use logger::info; -use object_store::{LocalFileSystem, ObjectStoreRef}; +use object_store::{local_file, ObjectStoreRef}; use runtime::Runtime; use table_engine::{predicate::Predicate, table::TableId}; @@ -69,7 +69,8 @@ impl MergeMemTableBench { pub fn new(config: MergeMemTableBenchConfig) -> Self { assert!(!config.sst_file_ids.is_empty()); - let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _; + let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _; + let runtime = Arc::new(util::new_runtime(config.runtime_thread_num)); let space_id = config.space_id; let table_id = config.table_id; diff --git a/src/benchmarks/src/merge_sst_bench.rs b/src/benchmarks/src/merge_sst_bench.rs index 0e7280f845..9a949438a9 100644 --- a/src/benchmarks/src/merge_sst_bench.rs +++ b/src/benchmarks/src/merge_sst_bench.rs @@ -38,7 +38,7 @@ use analytic_engine::{ }; use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema}; use logger::info; -use object_store::{LocalFileSystem, ObjectStoreRef}; +use object_store::{local_file, ObjectStoreRef}; use runtime::Runtime; use table_engine::{predicate::Predicate, table::TableId}; use tokio::sync::mpsc::{self, UnboundedReceiver}; @@ -65,7 +65,8 @@ impl MergeSstBench { pub fn new(config: MergeSstBenchConfig) -> Self { assert!(!config.sst_file_ids.is_empty()); - let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _; + let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _; + let runtime = Arc::new(util::new_runtime(config.runtime_thread_num)); let space_id = config.space_id; let table_id = config.table_id; diff --git a/src/benchmarks/src/parquet_bench.rs b/src/benchmarks/src/parquet_bench.rs index 9ff1860891..5bec32bac4 100644 --- a/src/benchmarks/src/parquet_bench.rs +++ b/src/benchmarks/src/parquet_bench.rs @@ -23,7 +23,7 @@ use analytic_engine::sst::meta_data::cache::MetaCacheRef; use common_types::schema::Schema; use futures::StreamExt; use logger::info; -use object_store::{LocalFileSystem, ObjectStoreRef, Path}; +use object_store::{local_file, ObjectStoreRef, Path}; use parquet::arrow::{ arrow_reader::ParquetRecordBatchReaderBuilder, ParquetRecordBatchStreamBuilder, }; @@ -46,7 +46,7 @@ pub struct ParquetBench { impl ParquetBench { pub fn new(config: SstBenchConfig) -> Self { - let store = Arc::new(LocalFileSystem::new_with_prefix(&config.store_path).unwrap()) as _; + let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _; let runtime = util::new_runtime(config.runtime_thread_num); diff --git a/src/benchmarks/src/scan_memtable_bench.rs b/src/benchmarks/src/scan_memtable_bench.rs index a51ceeb09d..edd1ad3284 100644 --- a/src/benchmarks/src/scan_memtable_bench.rs +++ b/src/benchmarks/src/scan_memtable_bench.rs @@ -33,7 +33,7 @@ use common_types::{ time::TimeRange, }; use logger::info; -use object_store::{LocalFileSystem, Path}; +use object_store::{local_file, Path}; use crate::{config::ScanMemTableBenchConfig, util}; @@ -45,7 +45,7 @@ pub struct ScanMemTableBench { impl ScanMemTableBench { pub fn new(config: ScanMemTableBenchConfig) -> Self { - let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _; + let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _; let runtime = Arc::new(util::new_runtime(config.runtime_thread_num)); let meta_cache: Option = None; diff --git a/src/benchmarks/src/sst_bench.rs b/src/benchmarks/src/sst_bench.rs index 2577c0a19f..29afdd7f69 100644 --- a/src/benchmarks/src/sst_bench.rs +++ b/src/benchmarks/src/sst_bench.rs @@ -31,7 +31,7 @@ use common_types::{ schema::Schema, }; use logger::info; -use object_store::{LocalFileSystem, ObjectStoreRef, Path}; +use object_store::{local_file, ObjectStoreRef, Path}; use runtime::Runtime; use crate::{config::SstBenchConfig, util}; @@ -50,7 +50,7 @@ impl SstBench { pub fn new(config: SstBenchConfig) -> Self { let runtime = Arc::new(util::new_runtime(config.runtime_thread_num)); - let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _; + let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _; let sst_path = Path::from(config.sst_file_name.clone()); let meta_cache: Option = config .sst_meta_cache_cap diff --git a/src/benchmarks/src/sst_tools.rs b/src/benchmarks/src/sst_tools.rs index 664f89b0b1..4e27492943 100644 --- a/src/benchmarks/src/sst_tools.rs +++ b/src/benchmarks/src/sst_tools.rs @@ -48,7 +48,7 @@ use common_types::{ }; use generic_error::BoxError; use logger::info; -use object_store::{LocalFileSystem, ObjectStoreRef, Path}; +use object_store::{local_file, ObjectStoreRef, Path}; use runtime::Runtime; use serde::Deserialize; use table_engine::{predicate::Predicate, table::TableId}; @@ -81,7 +81,7 @@ async fn create_sst_from_stream(config: SstConfig, record_batch_stream: RecordBa ); let store: ObjectStoreRef = - Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()); + Arc::new(local_file::try_new_with_default(config.store_path).unwrap()); let store_picker: ObjectStorePickerRef = Arc::new(store); let sst_file_path = Path::from(config.sst_file_name); @@ -115,7 +115,7 @@ pub struct RebuildSstConfig { pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { info!("Start rebuild sst, config:{:?}", config); - let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) as _; + let store = Arc::new(local_file::try_new_with_default(config.store_path.clone()).unwrap()) as _; let input_path = Path::from(config.input_file_name); let parquet_metadata = util::parquet_metadata(&store, &input_path).await; @@ -210,7 +210,8 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let space_id = config.space_id; let table_id = config.table_id; - let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) as _; + let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _; + let (tx, _rx) = mpsc::unbounded_channel(); let purge_queue = FilePurgeQueue::new(space_id, table_id, tx); diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs index cb6d8de967..a7f86f0866 100644 --- a/src/benchmarks/src/util.rs +++ b/src/benchmarks/src/util.rs @@ -308,6 +308,8 @@ impl Builder { disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), + max_retries: 3, + timeout: Default::default(), }), }, wal: WalConfig { @@ -386,6 +388,8 @@ impl Default for RocksDBEngineBuildContext { disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), + max_retries: 3, + timeout: Default::default(), }), }, wal: WalConfig { @@ -419,6 +423,8 @@ impl Clone for RocksDBEngineBuildContext { disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), + max_retries: 3, + timeout: Default::default(), }), }; diff --git a/src/components/object_store/Cargo.toml b/src/components/object_store/Cargo.toml index f9221e1d30..926e85f935 100644 --- a/src/components/object_store/Cargo.toml +++ b/src/components/object_store/Cargo.toml @@ -45,11 +45,18 @@ logger = { workspace = true } lru = { workspace = true } macros = { workspace = true } notifier = { workspace = true } +object_store_opendal = "0.46.0" +opendal = { version = "0.49.0", features = [ + "services-oss", + "services-s3", + "services-fs", +] } partitioned_lock = { workspace = true } prometheus = { workspace = true } prometheus-static-metric = { workspace = true } prost = { workspace = true } rand = { workspace = true } +reqwest = { workspace = true } runtime = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -59,7 +66,7 @@ table_kv = { workspace = true } time_ext = { workspace = true } tokio = { workspace = true } twox-hash = "1.6" -upstream = { package = "object_store", version = "0.10.1", features = [ "aws" ] } +upstream = { package = "object_store", version = "0.10.1" } uuid = { version = "1.3.3", features = ["v4"] } [dev-dependencies] diff --git a/src/components/object_store/src/aliyun.rs b/src/components/object_store/src/aliyun.rs index f2432d5151..736c8755ea 100644 --- a/src/components/object_store/src/aliyun.rs +++ b/src/components/object_store/src/aliyun.rs @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. -use upstream::{ - aws::{AmazonS3, AmazonS3Builder}, - ClientOptions, RetryConfig, +use object_store_opendal::OpendalStore; +use opendal::{ + layers::{RetryLayer, TimeoutLayer}, + raw::HttpClient, + services::Oss, + Operator, Result, }; use crate::config::AliyunOptions; @@ -34,36 +37,35 @@ fn normalize_endpoint(endpoint: &str, bucket: &str) -> String { } } -pub fn try_new(aliyun_opts: &AliyunOptions) -> upstream::Result { - let cli_opt = ClientOptions::new() - .with_allow_http(true) - .with_pool_max_idle_per_host(aliyun_opts.http.pool_max_idle_per_host) - .with_http2_keep_alive_timeout(aliyun_opts.http.keep_alive_timeout.0) - .with_http2_keep_alive_while_idle() - .with_http2_keep_alive_interval(aliyun_opts.http.keep_alive_interval.0) - .with_timeout(aliyun_opts.http.timeout.0); - let retry_config = RetryConfig { - max_retries: aliyun_opts.retry.max_retries, - retry_timeout: aliyun_opts.retry.retry_timeout.0, - ..Default::default() - }; +pub fn try_new(aliyun_opts: &AliyunOptions) -> Result { + let http_builder = reqwest::ClientBuilder::new() + .pool_max_idle_per_host(aliyun_opts.http.pool_max_idle_per_host) + .http2_keep_alive_timeout(aliyun_opts.http.keep_alive_timeout.0) + .http2_keep_alive_while_idle(true) + .http2_keep_alive_interval(aliyun_opts.http.keep_alive_interval.0) + .timeout(aliyun_opts.http.timeout.0); + let http_client = HttpClient::build(http_builder)?; let endpoint = &aliyun_opts.endpoint; let bucket = &aliyun_opts.bucket; let endpoint = normalize_endpoint(endpoint, bucket); - AmazonS3Builder::new() - .with_virtual_hosted_style_request(true) - // region is not used when virtual_hosted_style is true, - // but is required, so dummy is used here - // https://github.com/apache/arrow-rs/issues/3827 - .with_region("dummy") - .with_access_key_id(&aliyun_opts.key_id) - .with_secret_access_key(&aliyun_opts.key_secret) - .with_endpoint(endpoint) - .with_bucket_name(bucket) - .with_client_options(cli_opt) - .with_retry(retry_config) - .build() + + let builder = Oss::default() + .access_key_id(&aliyun_opts.key_id) + .access_key_secret(&aliyun_opts.key_secret) + .endpoint(&endpoint) + .bucket(bucket) + .http_client(http_client); + let op = Operator::new(builder)? + .layer( + TimeoutLayer::new() + .with_timeout(aliyun_opts.timeout.timeout.0) + .with_io_timeout(aliyun_opts.timeout.io_timeout.0), + ) + .layer(RetryLayer::new().with_max_times(aliyun_opts.max_retries)) + .finish(); + + Ok(OpendalStore::new(op)) } #[cfg(test)] diff --git a/src/components/object_store/src/config.rs b/src/components/object_store/src/config.rs index d0ecbfb079..072b9159f9 100644 --- a/src/components/object_store/src/config.rs +++ b/src/components/object_store/src/config.rs @@ -49,9 +49,7 @@ impl Default for StorageOptions { disk_cache_capacity: ReadableSize::gb(0), disk_cache_page_size: ReadableSize::mb(2), disk_cache_partition_bits: 4, - object_store: ObjectStoreOptions::Local(LocalOptions { - data_dir: root_path, - }), + object_store: ObjectStoreOptions::Local(LocalOptions::new_with_default(root_path)), } } } @@ -68,6 +66,20 @@ pub enum ObjectStoreOptions { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct LocalOptions { pub data_dir: String, + #[serde(default = "default_max_retries")] + pub max_retries: usize, + #[serde(default)] + pub timeout: TimeoutOptions, +} + +impl LocalOptions { + pub fn new_with_default(data_dir: String) -> Self { + Self { + data_dir, + max_retries: default_max_retries(), + timeout: Default::default(), + } + } } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -77,10 +89,12 @@ pub struct AliyunOptions { pub endpoint: String, pub bucket: String, pub prefix: String, + #[serde(default = "default_max_retries")] + pub max_retries: usize, #[serde(default)] pub http: HttpOptions, #[serde(default)] - pub retry: RetryOptions, + pub timeout: TimeoutOptions, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -91,10 +105,12 @@ pub struct S3Options { pub endpoint: String, pub bucket: String, pub prefix: String, + #[serde(default = "default_max_retries")] + pub max_retries: usize, #[serde(default)] pub http: HttpOptions, #[serde(default)] - pub retry: RetryOptions, + pub timeout: TimeoutOptions, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -117,16 +133,25 @@ impl Default for HttpOptions { } #[derive(Debug, Clone, Deserialize, Serialize)] -pub struct RetryOptions { - pub max_retries: usize, - pub retry_timeout: ReadableDuration, +pub struct TimeoutOptions { + // Non IO Operation like stat and delete, they operate on a single file, we control them by + // setting timeout. + pub timeout: ReadableDuration, + // IO Operation like read and write, they operate on data directly, we control them by setting + // io_timeout. + pub io_timeout: ReadableDuration, } -impl Default for RetryOptions { +impl Default for TimeoutOptions { fn default() -> Self { Self { - max_retries: 3, - retry_timeout: ReadableDuration::from(Duration::from_secs(3 * 60)), + timeout: ReadableDuration::from(Duration::from_secs(10)), + io_timeout: ReadableDuration::from(Duration::from_secs(10)), } } } + +#[inline] +fn default_max_retries() -> usize { + 3 +} diff --git a/src/components/object_store/src/disk_cache.rs b/src/components/object_store/src/disk_cache.rs index 33ab7776b5..a89fd42898 100644 --- a/src/components/object_store/src/disk_cache.rs +++ b/src/components/object_store/src/disk_cache.rs @@ -1033,10 +1033,9 @@ impl ObjectStore for DiskCacheStore { mod test { use runtime::{Builder, RuntimeRef}; use tempfile::{tempdir, TempDir}; - use upstream::local::LocalFileSystem; use super::*; - use crate::test_util::MemoryStore; + use crate::{local_file, test_util::MemoryStore}; struct StoreWithCacheDir { inner: DiskCacheStore, @@ -1334,9 +1333,10 @@ mod test { let page_size = 8; let first_create_time = { let _store = { - let local_path = tempdir().unwrap(); + let local_path = tempdir().unwrap().as_ref().to_string_lossy().to_string(); let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + Arc::new(local_file::try_new_with_default(local_path).unwrap()); + DiskCacheStore::try_new( cache_root_dir.clone(), 160, @@ -1361,9 +1361,9 @@ mod test { // open again { let _store = { - let local_path = tempdir().unwrap(); + let local_path = tempdir().unwrap().as_ref().to_string_lossy().to_string(); let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + Arc::new(local_file::try_new_with_default(local_path).unwrap()); DiskCacheStore::try_new( cache_root_dir.clone(), 160, @@ -1387,9 +1387,8 @@ mod test { // open again, but with different page_size { - let local_path = tempdir().unwrap(); - let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + let local_path = tempdir().unwrap().as_ref().to_string_lossy().to_string(); + let local_store = Arc::new(local_file::try_new_with_default(local_path).unwrap()); let store = DiskCacheStore::try_new( cache_dir.as_ref().to_string_lossy().to_string(), 160, @@ -1407,7 +1406,7 @@ mod test { #[test] fn test_disk_cache_recovery() { - let rt = Arc::new(Builder::default().build().unwrap()); + let rt = Arc::new(Builder::default().enable_all().build().unwrap()); rt.block_on(async { let cache_dir = tempdir().unwrap(); let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string(); @@ -1415,9 +1414,9 @@ mod test { let location = Path::from("recovery.sst"); { let store = { - let local_path = tempdir().unwrap(); + let local_path = tempdir().unwrap().as_ref().to_string_lossy().to_string(); let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + Arc::new(local_file::try_new_with_default(local_path).unwrap()); DiskCacheStore::try_new( cache_root_dir.clone(), 10240, @@ -1448,9 +1447,9 @@ mod test { // recover { let store = { - let local_path = tempdir().unwrap(); + let local_path = tempdir().unwrap().as_ref().to_string_lossy().to_string(); let local_store = - Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + Arc::new(local_file::try_new_with_default(local_path).unwrap()); DiskCacheStore::try_new( cache_root_dir.clone(), 160, diff --git a/src/components/object_store/src/lib.rs b/src/components/object_store/src/lib.rs index 350ccfa0b6..4627dbae77 100644 --- a/src/components/object_store/src/lib.rs +++ b/src/components/object_store/src/lib.rs @@ -19,25 +19,22 @@ use std::sync::Arc; -pub use multi_part::{ConcurrentMultipartUpload, MultiUploadWriter}; -use tokio::sync::Mutex; +pub use opendal::Error as OpenDalError; pub use upstream::{ - local::LocalFileSystem, path::Path, Error as ObjectStoreError, Error, GetResult, ListResult, - ObjectMeta, ObjectStore, PutPayloadMut, + path::Path, Error as ObjectStoreError, GetResult, ListResult, ObjectMeta, ObjectStore, + PutPayloadMut, }; pub mod aliyun; pub mod config; pub mod disk_cache; +pub mod local_file; pub mod mem_cache; pub mod metrics; -mod multi_part; +pub mod multi_part; pub mod prefix; pub mod s3; #[cfg(test)] pub mod test_util; pub type ObjectStoreRef = Arc; - -// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe -pub type WriteMultipartRef = Arc>; diff --git a/src/components/object_store/src/local_file.rs b/src/components/object_store/src/local_file.rs new file mode 100644 index 0000000000..4070b00489 --- /dev/null +++ b/src/components/object_store/src/local_file.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use object_store_opendal::OpendalStore; +use opendal::{ + layers::{RetryLayer, TimeoutLayer}, + services::Fs, + Operator, Result, +}; + +use crate::config::LocalOptions; + +pub fn try_new(local_opts: &LocalOptions) -> Result { + let builder = Fs::default().root(&local_opts.data_dir); + let op = Operator::new(builder)? + .layer( + TimeoutLayer::new() + .with_timeout(local_opts.timeout.timeout.0) + .with_io_timeout(local_opts.timeout.io_timeout.0), + ) + .layer(RetryLayer::new().with_max_times(local_opts.max_retries)) + .finish(); + + Ok(OpendalStore::new(op)) +} + +pub fn try_new_with_default(data_dir: String) -> Result { + let local_opts = LocalOptions::new_with_default(data_dir); + try_new(&local_opts) +} diff --git a/src/components/object_store/src/mem_cache.rs b/src/components/object_store/src/mem_cache.rs index 0fa8a91294..9e40fb8e5c 100644 --- a/src/components/object_store/src/mem_cache.rs +++ b/src/components/object_store/src/mem_cache.rs @@ -294,13 +294,13 @@ impl ObjectStore for MemCacheStore { #[cfg(test)] mod test { use tempfile::tempdir; - use upstream::local::LocalFileSystem; use super::*; + use crate::local_file; fn prepare_store(bits: usize, mem_cap: usize) -> MemCacheStore { - let local_path = tempdir().unwrap(); - let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + let local_path = tempdir().unwrap().as_ref().to_string_lossy().to_string(); + let local_store = Arc::new(local_file::try_new_with_default(local_path).unwrap()); let mem_cache = Arc::new(MemCache::try_new(bits, NonZeroUsize::new(mem_cap).unwrap()).unwrap()); diff --git a/src/components/object_store/src/multi_part.rs b/src/components/object_store/src/multi_part.rs index 871ffe2a13..fb5b9dd9fb 100644 --- a/src/components/object_store/src/multi_part.rs +++ b/src/components/object_store/src/multi_part.rs @@ -28,7 +28,13 @@ use tokio::{io::AsyncWrite, sync::Mutex, task::JoinSet}; pub use upstream::PutPayloadMut; use upstream::{path::Path, Error, MultipartUpload, PutPayload, PutResult}; -use crate::{ObjectStoreRef, WriteMultipartRef}; +use crate::ObjectStoreRef; + +// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe +pub type MultiUploadRef = Arc>; + +const CHUNK_SIZE: usize = 5 * 1024 * 1024; +const MAX_CONCURRENCY: usize = 10; #[derive(Debug)] pub struct ConcurrentMultipartUpload { @@ -113,15 +119,12 @@ impl ConcurrentMultipartUpload { } pub struct MultiUploadWriter { - pub multi_upload: WriteMultipartRef, + pub multi_upload: MultiUploadRef, upload_task: Option>>, flush_task: Option>>, completion_task: Option>>, } -const CHUNK_SIZE: usize = 5 * 1024 * 1024; -const MAX_CONCURRENCY: usize = 10; - impl<'a> MultiUploadWriter { pub async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) -> Result { let upload_writer = object_store.put_multipart(location).await?; @@ -141,7 +144,7 @@ impl<'a> MultiUploadWriter { Ok(multi_upload) } - pub fn aborter(&self) -> WriteMultipartRef { + pub fn aborter(&self) -> MultiUploadRef { self.multi_upload.clone() } } diff --git a/src/components/object_store/src/prefix.rs b/src/components/object_store/src/prefix.rs index 187b10a8d5..24233eebf9 100644 --- a/src/components/object_store/src/prefix.rs +++ b/src/components/object_store/src/prefix.rs @@ -238,9 +238,9 @@ mod tests { use chrono::{DateTime, Utc}; use futures::{stream, stream::StreamExt}; use tempfile::tempdir; - use upstream::local::LocalFileSystem; use super::*; + use crate::local_file; #[derive(Debug, Clone)] struct PathPrefixChecker { @@ -423,8 +423,8 @@ mod tests { ("/0/1/", "100/101.sst", "0/1/100/101.sst"), ]; - let local_path = tempdir().unwrap(); - let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); + let local_path = tempdir().unwrap().as_ref().to_string_lossy().to_string(); + let local_store = Arc::new(local_file::try_new_with_default(local_path).unwrap()); for (prefix, filename, expect_loc) in cases.clone() { let prefix_store = StoreWithPrefix::new(prefix.to_string(), local_store.clone()).unwrap(); diff --git a/src/components/object_store/src/s3.rs b/src/components/object_store/src/s3.rs index fdbce02773..2b81521f81 100644 --- a/src/components/object_store/src/s3.rs +++ b/src/components/object_store/src/s3.rs @@ -15,34 +15,40 @@ // specific language governing permissions and limitations // under the License. -use upstream::{ - aws::{AmazonS3, AmazonS3Builder}, - ClientOptions, RetryConfig, +use object_store_opendal::OpendalStore; +use opendal::{ + layers::{RetryLayer, TimeoutLayer}, + raw::HttpClient, + services::S3, + Operator, Result, }; use crate::config::S3Options; -pub fn try_new(s3_option: &S3Options) -> upstream::Result { - let cli_opt = ClientOptions::new() - .with_allow_http(true) - .with_pool_max_idle_per_host(s3_option.http.pool_max_idle_per_host) - .with_http2_keep_alive_timeout(s3_option.http.keep_alive_timeout.0) - .with_http2_keep_alive_while_idle() - .with_http2_keep_alive_interval(s3_option.http.keep_alive_interval.0) - .with_timeout(s3_option.http.timeout.0); - let retry_config = RetryConfig { - max_retries: s3_option.retry.max_retries, - retry_timeout: s3_option.retry.retry_timeout.0, - ..Default::default() - }; +pub fn try_new(s3_option: &S3Options) -> Result { + let http_builder = reqwest::ClientBuilder::new() + .pool_max_idle_per_host(s3_option.http.pool_max_idle_per_host) + .http2_keep_alive_timeout(s3_option.http.keep_alive_timeout.0) + .http2_keep_alive_while_idle(true) + .http2_keep_alive_interval(s3_option.http.keep_alive_interval.0) + .timeout(s3_option.http.timeout.0); + let http_client = HttpClient::build(http_builder)?; - AmazonS3Builder::new() - .with_region(&s3_option.region) - .with_access_key_id(&s3_option.key_id) - .with_secret_access_key(&s3_option.key_secret) - .with_endpoint(&s3_option.endpoint) - .with_bucket_name(&s3_option.bucket) - .with_client_options(cli_opt) - .with_retry(retry_config) - .build() + let builder = S3::default() + .region(&s3_option.region) + .access_key_id(&s3_option.key_id) + .secret_access_key(&s3_option.key_secret) + .endpoint(&s3_option.endpoint) + .bucket(&s3_option.bucket) + .http_client(http_client); + let op = Operator::new(builder)? + .layer( + TimeoutLayer::new() + .with_timeout(s3_option.timeout.timeout.0) + .with_io_timeout(s3_option.timeout.io_timeout.0), + ) + .layer(RetryLayer::new().with_max_times(s3_option.max_retries)) + .finish(); + + Ok(OpendalStore::new(op)) } diff --git a/src/tools/src/bin/sst-convert.rs b/src/tools/src/bin/sst-convert.rs index c4c9935cb3..7c7856be0c 100644 --- a/src/tools/src/bin/sst-convert.rs +++ b/src/tools/src/bin/sst-convert.rs @@ -37,7 +37,7 @@ use common_types::{ request_id::RequestId, }; use generic_error::BoxError; -use object_store::{LocalFileSystem, Path}; +use object_store::{config::LocalOptions, local_file, Path}; use runtime::Runtime; use table_engine::predicate::Predicate; use tools::sst_util; @@ -91,7 +91,12 @@ fn main() { } async fn run(args: Args, runtime: Arc) -> Result<()> { - let storage = LocalFileSystem::new_with_prefix(args.store_path).expect("invalid path"); + let local_opts = LocalOptions { + data_dir: args.store_path, + max_retries: 3, + timeout: Default::default(), + }; + let storage = local_file::try_new(&local_opts).expect("invalid path"); let store = Arc::new(storage) as _; let input_path = Path::from(args.input); let sst_meta = sst_util::meta_from_sst(&store, &input_path).await; diff --git a/src/tools/src/bin/sst-metadata.rs b/src/tools/src/bin/sst-metadata.rs index bf6599608b..b48ca929e9 100644 --- a/src/tools/src/bin/sst-metadata.rs +++ b/src/tools/src/bin/sst-metadata.rs @@ -23,7 +23,7 @@ use analytic_engine::sst::{meta_data::cache::MetaData, parquet::async_reader::Ch use anyhow::{Context, Result}; use clap::Parser; use futures::StreamExt; -use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path}; +use object_store::{config::LocalOptions, local_file, ObjectMeta, ObjectStoreRef, Path}; use parquet_ext::{meta_data::fetch_parquet_metadata, reader::ObjectStoreReader}; use runtime::Runtime; use time_ext::format_as_ymdhms; @@ -141,7 +141,12 @@ fn main() { async fn run(args: Args) -> Result<()> { let handle = Handle::current(); - let storage = LocalFileSystem::new_with_prefix(&args.dir)?; + let local_opts = LocalOptions { + data_dir: args.dir, + max_retries: 3, + timeout: Default::default(), + }; + let storage = local_file::try_new(&local_opts)?; let storage: ObjectStoreRef = Arc::new(storage); let mut join_set = JoinSet::new();