Skip to content

Commit

Permalink
feat: use opendal to access underlying storage (#1557)
Browse files Browse the repository at this point in the history
## Rationale
Use opendal to access the object store, thus unifying the access method
of the underlying storage.

## Detailed Changes
- use opendal to access s3/oss/local file

## Test Plan
- Existed tests
  • Loading branch information
baojinri authored Aug 27, 2024
1 parent b94c99e commit 9321505
Show file tree
Hide file tree
Showing 27 changed files with 462 additions and 225 deletions.
265 changes: 188 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ query_frontend = { path = "src/query_frontend" }
rand = "0.8.5"
regex = "1"
remote_engine_client = { path = "src/remote_engine_client" }
reqwest = { version = "0.11", default-features = false, features = [
reqwest = { version = "0.12.4", default-features = false, features = [
"rustls-tls",
"json",
"http2",
] }
router = { path = "src/router" }
runtime = { path = "src/components/runtime" }
Expand Down Expand Up @@ -200,14 +201,14 @@ zstd = { version = "0.12", default-features = false }
# This profile optimizes for good runtime performance.
[profile.release]
# reference: https://doc.rust-lang.org/rustc/codegen-options/index.html#codegen-units
codegen-units = 1
debug = true
codegen-units = 1
debug = true
overflow-checks = true

# This profile is used to produce a smaller (no symbols) binary with a little bit poorer performance,
# but with a faster speed and low memory consumption required by compiling.
[profile.release-slim]
inherits = "release"
inherits = "release"
codegen-units = 16
debug = false
strip = true
debug = false
strip = true
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ dev-setup:
echo "Installing dependencies using Homebrew..."
HOMEBREW_NO_AUTO_UPDATE=1 brew install git openssl protobuf cmake pre-commit
cargo install cargo-udeps
cargo install cargo-sort
cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked
else ifeq ($(shell uname), Linux)
dev-setup:
echo "Detecting Linux system..."
Expand All @@ -137,7 +137,7 @@ dev-setup:
sudo apt-get update; \
sudo apt install -y git gcc g++ libssl-dev pkg-config protobuf-compiler cmake pre-commit; \
cargo install cargo-udeps; \
cargo install cargo-sort; \
cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked; \
else \
echo "Error: Unsupported Linux distribution. Exiting..."; \
exit 1; \
Expand Down
6 changes: 4 additions & 2 deletions src/analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ mod tests {
column_schema, datum::DatumKind, schema, schema::Schema, table::DEFAULT_SHARD_ID,
};
use futures::future::BoxFuture;
use object_store::LocalFileSystem;
use object_store::local_file;
use runtime::Runtime;
use table_engine::table::{SchemaId, TableId, TableSeqGenerator};
use wal::rocksdb_impl::manager::Builder as WalBuilder;
Expand Down Expand Up @@ -836,7 +836,9 @@ mod tests {
.build()
.unwrap();

let object_store = LocalFileSystem::new_with_prefix(&self.dir).unwrap();
let local_path = self.dir.to_string_lossy().to_string();
let object_store = local_file::try_new_with_default(local_path).unwrap();

ManifestImpl::open(
self.options.clone(),
Arc::new(manifest_wal),
Expand Down
31 changes: 20 additions & 11 deletions src/analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ use object_store::{
aliyun,
config::{ObjectStoreOptions, StorageOptions},
disk_cache::DiskCacheStore,
local_file,
mem_cache::{MemCache, MemCacheStore},
metrics::StoreWithMetrics,
prefix::StoreWithPrefix,
s3, LocalFileSystem, ObjectStoreRef,
s3, ObjectStoreRef,
};
use snafu::{ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
Expand Down Expand Up @@ -61,6 +62,9 @@ pub enum Error {
source: object_store::ObjectStoreError,
},

#[snafu(display("Failed to access object store by openDal , err:{}", source))]
OpenDal { source: object_store::OpenDalError },

#[snafu(display("Failed to create dir for {}, err:{}", path, source))]
CreateDir {
path: String,
Expand Down Expand Up @@ -192,27 +196,32 @@ fn open_storage(
) -> Pin<Box<dyn Future<Output = Result<OpenedStorages>> + Send>> {
Box::pin(async move {
let mut store = match opts.object_store {
ObjectStoreOptions::Local(local_opts) => {
ObjectStoreOptions::Local(mut local_opts) => {
let data_path = Path::new(&local_opts.data_dir);
let sst_path = data_path.join(STORE_DIR_NAME);
let sst_path = data_path
.join(STORE_DIR_NAME)
.to_string_lossy()
.into_owned();
tokio::fs::create_dir_all(&sst_path)
.await
.context(CreateDir {
path: sst_path.to_string_lossy().into_owned(),
path: sst_path.clone(),
})?;
let store = LocalFileSystem::new_with_prefix(sst_path).context(OpenObjectStore)?;
local_opts.data_dir = sst_path;

let store: ObjectStoreRef =
Arc::new(local_file::try_new(&local_opts).context(OpenDal)?);
Arc::new(store) as _
}
ObjectStoreOptions::Aliyun(aliyun_opts) => {
let oss: ObjectStoreRef =
Arc::new(aliyun::try_new(&aliyun_opts).context(OpenObjectStore)?);
let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, oss);
let store: ObjectStoreRef =
Arc::new(aliyun::try_new(&aliyun_opts).context(OpenDal)?);
let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, store);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
ObjectStoreOptions::S3(s3_option) => {
let oss: ObjectStoreRef =
Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?);
let store_with_prefix = StoreWithPrefix::new(s3_option.prefix, oss);
let store: ObjectStoreRef = Arc::new(s3::try_new(&s3_option).context(OpenDal)?);
let store_with_prefix = StoreWithPrefix::new(s3_option.prefix, store);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
};
Expand Down
6 changes: 4 additions & 2 deletions src/analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ mod tests {
schema::Builder as CustomSchemaBuilder,
time::{TimeRange, Timestamp},
};
use object_store::{LocalFileSystem, ObjectStoreRef};
use object_store::{local_file, ObjectStoreRef};
use parquet::{arrow::ArrowWriter, file::footer};
use parquet_ext::ParquetMetaData;

Expand Down Expand Up @@ -329,7 +329,9 @@ mod tests {
parquet_filter: None,
column_values: None,
};
let store = Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap());

let local_path = temp_dir.as_ref().to_string_lossy().to_string();
let store = Arc::new(local_file::try_new_with_default(local_path).unwrap());
write_parquet_file_with_metadata(
store.clone(),
parquet_file_path.as_path(),
Expand Down
16 changes: 9 additions & 7 deletions src/analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use datafusion::parquet::basic::Compression;
use futures::StreamExt;
use generic_error::BoxError;
use logger::{debug, error};
use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path, WriteMultipartRef};
use object_store::{
multi_part::{MultiUploadRef, MultiUploadWriter},
ObjectStore, ObjectStoreRef, Path,
};
use snafu::{OptionExt, ResultExt};
use tokio::io::AsyncWrite;

Expand Down Expand Up @@ -417,7 +420,7 @@ async fn write_metadata(
Ok(buf_size)
}

async fn multi_upload_abort(aborter: WriteMultipartRef) {
async fn multi_upload_abort(aborter: MultiUploadRef) {
// The uploading file will be leaked if failed to abort. A repair command
// will be provided to clean up the leaked files.
if let Err(e) = aborter.lock().await.abort().await {
Expand Down Expand Up @@ -589,7 +592,7 @@ mod tests {
time::{TimeRange, Timestamp},
};
use futures::stream;
use object_store::LocalFileSystem;
use object_store::local_file;
use runtime::{self, Runtime};
use table_engine::predicate::Predicate;
use tempfile::tempdir;
Expand All @@ -613,7 +616,7 @@ mod tests {
fn test_parquet_build_and_read() {
test_util::init_log_for_test();

let runtime = Arc::new(runtime::Builder::default().build().unwrap());
let runtime = Arc::new(runtime::Builder::default().enable_all().build().unwrap());
parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]);
parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3, 3, 2]);
parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 4, 4]);
Expand All @@ -635,9 +638,8 @@ mod tests {
column_stats: Default::default(),
};

let dir = tempdir().unwrap();
let root = dir.path();
let store: ObjectStoreRef = Arc::new(LocalFileSystem::new_with_prefix(root).unwrap());
let root = tempdir().unwrap().as_ref().to_string_lossy().to_string();
let store: ObjectStoreRef = Arc::new(local_file::try_new_with_default(root).unwrap());
let store_picker: ObjectStorePickerRef = Arc::new(store);
let sst_file_path = Path::from("data.par");

Expand Down
8 changes: 8 additions & 0 deletions src/analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,8 @@ impl Builder {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down Expand Up @@ -588,6 +590,8 @@ impl Default for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down Expand Up @@ -621,6 +625,8 @@ impl Clone for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
};

Expand Down Expand Up @@ -685,6 +691,8 @@ impl Default for MemoryEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down
5 changes: 3 additions & 2 deletions src/benchmarks/src/merge_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use common_types::{
projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema, time::TimeRange,
};
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef};
use object_store::{local_file, ObjectStoreRef};
use runtime::Runtime;
use table_engine::{predicate::Predicate, table::TableId};

Expand All @@ -69,7 +69,8 @@ impl MergeMemTableBench {
pub fn new(config: MergeMemTableBenchConfig) -> Self {
assert!(!config.sst_file_ids.is_empty());

let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let space_id = config.space_id;
let table_id = config.table_id;
Expand Down
5 changes: 3 additions & 2 deletions src/benchmarks/src/merge_sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use analytic_engine::{
};
use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema};
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef};
use object_store::{local_file, ObjectStoreRef};
use runtime::Runtime;
use table_engine::{predicate::Predicate, table::TableId};
use tokio::sync::mpsc::{self, UnboundedReceiver};
Expand All @@ -65,7 +65,8 @@ impl MergeSstBench {
pub fn new(config: MergeSstBenchConfig) -> Self {
assert!(!config.sst_file_ids.is_empty());

let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let space_id = config.space_id;
let table_id = config.table_id;
Expand Down
4 changes: 2 additions & 2 deletions src/benchmarks/src/parquet_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use analytic_engine::sst::meta_data::cache::MetaCacheRef;
use common_types::schema::Schema;
use futures::StreamExt;
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use object_store::{local_file, ObjectStoreRef, Path};
use parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ParquetRecordBatchStreamBuilder,
};
Expand All @@ -46,7 +46,7 @@ pub struct ParquetBench {

impl ParquetBench {
pub fn new(config: SstBenchConfig) -> Self {
let store = Arc::new(LocalFileSystem::new_with_prefix(&config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let runtime = util::new_runtime(config.runtime_thread_num);

Expand Down
4 changes: 2 additions & 2 deletions src/benchmarks/src/scan_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use common_types::{
time::TimeRange,
};
use logger::info;
use object_store::{LocalFileSystem, Path};
use object_store::{local_file, Path};

use crate::{config::ScanMemTableBenchConfig, util};

Expand All @@ -45,7 +45,7 @@ pub struct ScanMemTableBench {

impl ScanMemTableBench {
pub fn new(config: ScanMemTableBenchConfig) -> Self {
let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
let meta_cache: Option<MetaCacheRef> = None;
Expand Down
4 changes: 2 additions & 2 deletions src/benchmarks/src/sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use common_types::{
schema::Schema,
};
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use object_store::{local_file, ObjectStoreRef, Path};
use runtime::Runtime;

use crate::{config::SstBenchConfig, util};
Expand All @@ -50,7 +50,7 @@ impl SstBench {
pub fn new(config: SstBenchConfig) -> Self {
let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));

let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
let sst_path = Path::from(config.sst_file_name.clone());
let meta_cache: Option<MetaCacheRef> = config
.sst_meta_cache_cap
Expand Down
9 changes: 5 additions & 4 deletions src/benchmarks/src/sst_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use common_types::{
};
use generic_error::BoxError;
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use object_store::{local_file, ObjectStoreRef, Path};
use runtime::Runtime;
use serde::Deserialize;
use table_engine::{predicate::Predicate, table::TableId};
Expand Down Expand Up @@ -81,7 +81,7 @@ async fn create_sst_from_stream(config: SstConfig, record_batch_stream: RecordBa
);

let store: ObjectStoreRef =
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap());
Arc::new(local_file::try_new_with_default(config.store_path).unwrap());
let store_picker: ObjectStorePickerRef = Arc::new(store);
let sst_file_path = Path::from(config.sst_file_name);

Expand Down Expand Up @@ -115,7 +115,7 @@ pub struct RebuildSstConfig {
pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc<Runtime>) {
info!("Start rebuild sst, config:{:?}", config);

let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path.clone()).unwrap()) as _;
let input_path = Path::from(config.input_file_name);

let parquet_metadata = util::parquet_metadata(&store, &input_path).await;
Expand Down Expand Up @@ -210,7 +210,8 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc<Runtime>) {

let space_id = config.space_id;
let table_id = config.table_id;
let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) as _;
let store = Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;

let (tx, _rx) = mpsc::unbounded_channel();
let purge_queue = FilePurgeQueue::new(space_id, table_id, tx);

Expand Down
6 changes: 6 additions & 0 deletions src/benchmarks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ impl Builder {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down Expand Up @@ -386,6 +388,8 @@ impl Default for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
},
wal: WalConfig {
Expand Down Expand Up @@ -419,6 +423,8 @@ impl Clone for RocksDBEngineBuildContext {
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
max_retries: 3,
timeout: Default::default(),
}),
};

Expand Down
Loading

0 comments on commit 9321505

Please sign in to comment.