Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use opendal to access underlying storage #1557

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 188 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ query_frontend = { path = "src/query_frontend" }
rand = "0.8.5"
regex = "1"
remote_engine_client = { path = "src/remote_engine_client" }
reqwest = { version = "0.11", default-features = false, features = [
reqwest = { version = "0.12.4", default-features = false, features = [
"rustls-tls",
"json",
"http2",
] }
router = { path = "src/router" }
runtime = { path = "src/components/runtime" }
Expand Down Expand Up @@ -200,14 +201,14 @@ zstd = { version = "0.12", default-features = false }
# This profile optimizes for good runtime performance.
[profile.release]
# reference: https://doc.rust-lang.org/rustc/codegen-options/index.html#codegen-units
codegen-units = 1
debug = true
codegen-units = 1
debug = true
overflow-checks = true

# This profile is used to produce a smaller (no symbols) binary with a little bit poorer performance,
# but with a faster speed and low memory consumption required by compiling.
[profile.release-slim]
inherits = "release"
inherits = "release"
codegen-units = 16
debug = false
strip = true
debug = false
strip = true
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ dev-setup:
echo "Installing dependencies using Homebrew..."
HOMEBREW_NO_AUTO_UPDATE=1 brew install git openssl protobuf cmake pre-commit
cargo install cargo-udeps
cargo install cargo-sort
cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked
else ifeq ($(shell uname), Linux)
dev-setup:
echo "Detecting Linux system..."
Expand All @@ -137,7 +137,7 @@ dev-setup:
sudo apt-get update; \
sudo apt install -y git gcc g++ libssl-dev pkg-config protobuf-compiler cmake pre-commit; \
cargo install cargo-udeps; \
cargo install cargo-sort; \
cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked; \
else \
echo "Error: Unsupported Linux distribution. Exiting..."; \
exit 1; \
Expand Down
6 changes: 4 additions & 2 deletions src/analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ mod tests {
column_schema, datum::DatumKind, schema, schema::Schema, table::DEFAULT_SHARD_ID,
};
use futures::future::BoxFuture;
use object_store::LocalFileSystem;
use object_store::local_file;
use runtime::Runtime;
use table_engine::table::{SchemaId, TableId, TableSeqGenerator};
use wal::rocksdb_impl::manager::Builder as WalBuilder;
Expand Down Expand Up @@ -836,7 +836,9 @@ mod tests {
.build()
.unwrap();

let object_store = LocalFileSystem::new_with_prefix(&self.dir).unwrap();
let local_path = self.dir.to_string_lossy().to_string();
let object_store = local_file::try_new_with_default(local_path).unwrap();

ManifestImpl::open(
self.options.clone(),
Arc::new(manifest_wal),
Expand Down
31 changes: 20 additions & 11 deletions src/analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ use object_store::{
aliyun,
config::{ObjectStoreOptions, StorageOptions},
disk_cache::DiskCacheStore,
local_file,
mem_cache::{MemCache, MemCacheStore},
metrics::StoreWithMetrics,
prefix::StoreWithPrefix,
s3, LocalFileSystem, ObjectStoreRef,
s3, ObjectStoreRef,
};
use snafu::{ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
Expand Down Expand Up @@ -61,6 +62,9 @@ pub enum Error {
source: object_store::ObjectStoreError,
},

#[snafu(display("Failed to access object store by openDal , err:{}", source))]
OpenDal { source: object_store::OpenDalError },

#[snafu(display("Failed to create dir for {}, err:{}", path, source))]
CreateDir {
path: String,
Expand Down Expand Up @@ -192,27 +196,32 @@ fn open_storage(
) -> Pin<Box<dyn Future<Output = Result<OpenedStorages>> + Send>> {
Box::pin(async move {
let mut store = match opts.object_store {
ObjectStoreOptions::Local(local_opts) => {
ObjectStoreOptions::Local(mut local_opts) => {
let data_path = Path::new(&local_opts.data_dir);
let sst_path = data_path.join(STORE_DIR_NAME);
let sst_path = data_path
.join(STORE_DIR_NAME)
.to_string_lossy()
.into_owned();
tokio::fs::create_dir_all(&sst_path)
.await
.context(CreateDir {
path: sst_path.to_string_lossy().into_owned(),
path: sst_path.clone(),
})?;
let store = LocalFileSystem::new_with_prefix(sst_path).context(OpenObjectStore)?;
local_opts.data_dir = sst_path;

let store: ObjectStoreRef =
Arc::new(local_file::try_new(&local_opts).context(OpenDal)?);
Arc::new(store) as _
}
ObjectStoreOptions::Aliyun(aliyun_opts) => {
let oss: ObjectStoreRef =
Arc::new(aliyun::try_new(&aliyun_opts).context(OpenObjectStore)?);
let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, oss);
let store: ObjectStoreRef =
Arc::new(aliyun::try_new(&aliyun_opts).context(OpenDal)?);
let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, store);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
ObjectStoreOptions::S3(s3_option) => {
let oss: ObjectStoreRef =
Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?);
let store_with_prefix = StoreWithPrefix::new(s3_option.prefix, oss);
let store: ObjectStoreRef = Arc::new(s3::try_new(&s3_option).context(OpenDal)?);
let store_with_prefix = StoreWithPrefix::new(s3_option.prefix, store);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
};
Expand Down
6 changes: 4 additions & 2 deletions src/analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ mod tests {
schema::Builder as CustomSchemaBuilder,
time::{TimeRange, Timestamp},
};
use object_store::{LocalFileSystem, ObjectStoreRef};
use object_store::{local_file, ObjectStoreRef};
use parquet::{arrow::ArrowWriter, file::footer};
use parquet_ext::ParquetMetaData;

Expand Down Expand Up @@ -329,7 +329,9 @@ mod tests {
parquet_filter: None,
column_values: None,
};
let store = Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap());

let local_path = temp_dir.as_ref().to_string_lossy().to_string();
let store = Arc::new(local_file::try_new_with_default(local_path).unwrap());
write_parquet_file_with_metadata(
store.clone(),
parquet_file_path.as_path(),
Expand Down
16 changes: 9 additions & 7 deletions src/analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use datafusion::parquet::basic::Compression;
use futures::StreamExt;
use generic_error::BoxError;
use logger::{debug, error};
use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path, WriteMultipartRef};
use object_store::{
multi_part::{MultiUploadRef, MultiUploadWriter},
ObjectStore, ObjectStoreRef, Path,
};
use snafu::{OptionExt, ResultExt};
use tokio::io::AsyncWrite;

Expand Down Expand Up @@ -417,7 +420,7 @@ async fn write_metadata(
Ok(buf_size)
}

async fn multi_upload_abort(aborter: WriteMultipartRef) {
async fn multi_upload_abort(aborter: MultiUploadRef) {
// The uploading file will be leaked if failed to abort. A repair command
// will be provided to clean up the leaked files.
if let Err(e) = aborter.lock().await.abort().await {
Expand Down Expand Up @@ -589,7 +592,7 @@ mod tests {
time::{TimeRange, Timestamp},
};
use futures::stream;
use object_store::LocalFileSystem;
use object_store::local_file;
use runtime::{self, Runtime};
use table_engine::predicate::Predicate;
use tempfile::tempdir;
Expand All @@ -613,7 +616,7 @@ mod tests {
fn test_parquet_build_and_read() {
test_util::init_log_for_test();

let runtime = Arc::new(runtime::Builder::default().build().unwrap());
let runtime = Arc::new(runtime::Builder::default().enable_all().build().unwrap());
parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]);
parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3, 3, 2]);
parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 4, 4]);
Expand All @@ -635,9 +638,8 @@ mod tests {
column_stats: Default::default(),
};

let dir = tempdir().unwrap();
let root = dir.path();
let store: ObjectStoreRef = Arc::new(LocalFileSystem::new_with_prefix(root).unwrap());
let root = tempdir().unwrap().as_ref().to_string_lossy().to_string();
let store: ObjectStoreRef = Arc::new(local_file::try_new_with_default(root).unwrap());
let store_picker: ObjectStorePickerRef = Arc::new(store);
let sst_file_path = Path::from("data.par");

Expand Down
8 changes: 8 additions & 0 deletions src/analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,8 @@ impl Builder {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down Expand Up @@ -588,6 +590,8 @@ impl Default for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down Expand Up @@ -621,6 +625,8 @@ impl Clone for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
};

Expand Down Expand Up @@ -685,6 +691,8 @@ impl Default for MemoryEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down
5 changes: 3 additions & 2 deletions src/benchmarks/src/merge_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use common_types::{
projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema, time::TimeRange,
};
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef};
use object_store::{local_file, ObjectStoreRef};
use runtime::Runtime;
use table_engine::{predicate::Predicate, table::TableId};

Expand All @@ -69,7 +69,8 @@ impl MergeMemTableBench {
pub fn new(config: MergeMemTableBenchConfig) -> Self {
assert!(!config.sst_file_ids.is_empty());

let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let space_id = config.space_id;
let table_id = config.table_id;
Expand Down
5 changes: 3 additions & 2 deletions src/benchmarks/src/merge_sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use analytic_engine::{
};
use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema};
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef};
use object_store::{local_file, ObjectStoreRef};
use runtime::Runtime;
use table_engine::{predicate::Predicate, table::TableId};
use tokio::sync::mpsc::{self, UnboundedReceiver};
Expand All @@ -65,7 +65,8 @@ impl MergeSstBench {
pub fn new(config: MergeSstBenchConfig) -> Self {
assert!(!config.sst_file_ids.is_empty());

let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let space_id = config.space_id;
let table_id = config.table_id;
Expand Down
4 changes: 2 additions & 2 deletions src/benchmarks/src/parquet_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use analytic_engine::sst::meta_data::cache::MetaCacheRef;
use common_types::schema::Schema;
use futures::StreamExt;
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use object_store::{local_file, ObjectStoreRef, Path};
use parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ParquetRecordBatchStreamBuilder,
};
Expand All @@ -46,7 +46,7 @@ pub struct ParquetBench {

impl ParquetBench {
pub fn new(config: SstBenchConfig) -> Self {
let store = Arc::new(LocalFileSystem::new_with_prefix(&config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let runtime = util::new_runtime(config.runtime_thread_num);

Expand Down
4 changes: 2 additions & 2 deletions src/benchmarks/src/scan_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use common_types::{
time::TimeRange,
};
use logger::info;
use object_store::{LocalFileSystem, Path};
use object_store::{local_file, Path};

use crate::{config::ScanMemTableBenchConfig, util};

Expand All @@ -45,7 +45,7 @@ pub struct ScanMemTableBench {

impl ScanMemTableBench {
pub fn new(config: ScanMemTableBenchConfig) -> Self {
let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let meta_cache: Option<MetaCacheRef> = None;
Expand Down
4 changes: 2 additions & 2 deletions src/benchmarks/src/sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use common_types::{
schema::Schema,
};
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use object_store::{local_file, ObjectStoreRef, Path};
use runtime::Runtime;

use crate::{config::SstBenchConfig, util};
Expand All @@ -50,7 +50,7 @@ impl SstBench {
pub fn new(config: SstBenchConfig) -> Self {
let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));

let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
let sst_path = Path::from(config.sst_file_name.clone());
let meta_cache: Option<MetaCacheRef> = config
.sst_meta_cache_cap
Expand Down
9 changes: 5 additions & 4 deletions src/benchmarks/src/sst_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use common_types::{
};
use generic_error::BoxError;
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use object_store::{local_file, ObjectStoreRef, Path};
use runtime::Runtime;
use serde::Deserialize;
use table_engine::{predicate::Predicate, table::TableId};
Expand Down Expand Up @@ -81,7 +81,7 @@ async fn create_sst_from_stream(config: SstConfig, record_batch_stream: RecordBa
);

let store: ObjectStoreRef =
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap());
Arc::new(local_file::try_new_with_default(config.store_path).unwrap());
let store_picker: ObjectStorePickerRef = Arc::new(store);
let sst_file_path = Path::from(config.sst_file_name);

Expand Down Expand Up @@ -115,7 +115,7 @@ pub struct RebuildSstConfig {
pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc<Runtime>) {
info!("Start rebuild sst, config:{:?}", config);

let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path.clone()).unwrap()) as _;
let input_path = Path::from(config.input_file_name);

let parquet_metadata = util::parquet_metadata(&store, &input_path).await;
Expand Down Expand Up @@ -210,7 +210,8 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc<Runtime>) {

let space_id = config.space_id;
let table_id = config.table_id;
let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let (tx, _rx) = mpsc::unbounded_channel();
let purge_queue = FilePurgeQueue::new(space_id, table_id, tx);

Expand Down
6 changes: 6 additions & 0 deletions src/benchmarks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ impl Builder {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down Expand Up @@ -386,6 +388,8 @@ impl Default for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down Expand Up @@ -419,6 +423,8 @@ impl Clone for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
};

Expand Down
Loading
Loading