Skip to content

Commit

Permalink
chore: upgrade object store version (#1541)
Browse files Browse the repository at this point in the history
## Rationale
The object store version is upgraded to 0.10.1 to prepare for access to
opendal

## Detailed Changes
- Impl AsyncWrite for ObjectStoreMultiUpload
- Impl MultipartUpload for ObkvMultiPartUpload
- Adapt new api on query writing path

## Test Plan
- Existing tests

---------

Co-authored-by: jiacai2050 <dev@liujiacai.net>
  • Loading branch information
baojinri and jiacai2050 authored Aug 19, 2024
1 parent 8a28840 commit b94c99e
Show file tree
Hide file tree
Showing 18 changed files with 822 additions and 2,149 deletions.
367 changes: 303 additions & 64 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 0 additions & 25 deletions src/analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ use object_store::{
disk_cache::DiskCacheStore,
mem_cache::{MemCache, MemCacheStore},
metrics::StoreWithMetrics,
obkv,
prefix::StoreWithPrefix,
s3, LocalFileSystem, ObjectStoreRef,
};
use snafu::{ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
use table_kv::obkv::ObkvImpl;
use wal::manager::{OpenedWals, WalManagerRef};

use crate::{
Expand All @@ -55,9 +53,6 @@ pub enum Error {
source: crate::instance::engine::Error,
},

#[snafu(display("Failed to open obkv, err:{}", source))]
OpenObkv { source: table_kv::obkv::Error },

#[snafu(display("Failed to execute in runtime, err:{}", source))]
RuntimeExec { source: runtime::Error },

Expand Down Expand Up @@ -214,26 +209,6 @@ fn open_storage(
let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, oss);
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
}
ObjectStoreOptions::Obkv(obkv_opts) => {
let obkv_config = obkv_opts.client;
let obkv = engine_runtimes
.write_runtime
.spawn_blocking(move || ObkvImpl::new(obkv_config).context(OpenObkv))
.await
.context(RuntimeExec)??;

let oss: ObjectStoreRef = Arc::new(
obkv::ObkvObjectStore::try_new(
Arc::new(obkv),
obkv_opts.shard_num,
obkv_opts.part_size.0 as usize,
obkv_opts.max_object_size.0 as usize,
obkv_opts.upload_parallelism,
)
.context(OpenObjectStore)?,
);
Arc::new(StoreWithPrefix::new(obkv_opts.prefix, oss).context(OpenObjectStore)?) as _
}
ObjectStoreOptions::S3(s3_option) => {
let oss: ObjectStoreRef =
Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?);
Expand Down
2 changes: 1 addition & 1 deletion src/analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ mod tests {

let bytes = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
let meta_path = object_store::Path::from(meta_path);
store.put(&meta_path, bytes).await.unwrap();
store.put(&meta_path, bytes.into()).await.unwrap();
}

#[tokio::test]
Expand Down
100 changes: 30 additions & 70 deletions src/analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ use datafusion::parquet::basic::Compression;
use futures::StreamExt;
use generic_error::BoxError;
use logger::{debug, error};
use object_store::{ObjectStoreRef, Path};
use parquet::data_type::AsBytes;
use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path, WriteMultipartRef};
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::io::AsyncWrite;

use crate::{
sst::{
Expand All @@ -45,8 +44,8 @@ use crate::{
},
},
writer::{
self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, Io,
MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage,
BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, MetaData,
PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage,
},
},
table::sst_util,
Expand Down Expand Up @@ -405,67 +404,24 @@ impl<'a> RecordBatchGroupWriter<'a> {
}
}

struct ObjectStoreMultiUploadAborter<'a> {
location: &'a Path,
session_id: String,
object_store: &'a ObjectStoreRef,
}

impl<'a> ObjectStoreMultiUploadAborter<'a> {
async fn initialize_upload(
object_store: &'a ObjectStoreRef,
location: &'a Path,
) -> Result<(
ObjectStoreMultiUploadAborter<'a>,
Box<dyn AsyncWrite + Unpin + Send>,
)> {
let (session_id, upload_writer) = object_store
.put_multipart(location)
.await
.context(Storage)?;
let aborter = Self {
location,
session_id,
object_store,
};
Ok((aborter, upload_writer))
}

async fn abort(self) -> Result<()> {
self.object_store
.abort_multipart(self.location, &self.session_id)
.await
.context(Storage)
}
}

async fn write_metadata<W>(
mut meta_sink: W,
async fn write_metadata(
meta_sink: MultiUploadWriter,
parquet_metadata: ParquetMetaData,
meta_path: &object_store::Path,
) -> writer::Result<usize>
where
W: AsyncWrite + Send + Unpin,
{
) -> Result<usize> {
let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
let bytes = buf.as_bytes();
let bytes_size = bytes.len();
meta_sink.write_all(bytes).await.with_context(|| Io {
file: meta_path.clone(),
})?;

meta_sink.shutdown().await.with_context(|| Io {
file: meta_path.clone(),
})?;
let buf_size = buf.len();
let mut uploader = meta_sink.multi_upload.lock().await;
uploader.put(buf);
uploader.finish().await.context(Storage)?;

Ok(bytes_size)
Ok(buf_size)
}

async fn multi_upload_abort(path: &Path, aborter: ObjectStoreMultiUploadAborter<'_>) {
// The uploading file will be leaked if failed to abort. A repair command will
// be provided to clean up the leaked files.
if let Err(e) = aborter.abort().await {
error!("Failed to abort multi-upload for sst:{}, err:{}", path, e);
async fn multi_upload_abort(aborter: WriteMultipartRef) {
// The uploading file will be leaked if failed to abort. A repair command
// will be provided to clean up the leaked files.
if let Err(e) = aborter.lock().await.abort().await {
error!("Failed to abort multi-upload sst, err:{}", e);
}
}

Expand All @@ -476,7 +432,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
request_id: RequestId,
meta: &MetaData,
input: RecordBatchStream,
) -> writer::Result<SstInfo> {
) -> Result<SstInfo> {
debug!(
"Build parquet file, request_id:{}, meta:{:?}, num_rows_per_row_group:{}",
request_id, meta, self.options.num_rows_per_row_group
Expand All @@ -491,28 +447,32 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
};
let group_writer = RecordBatchGroupWriter::new(request_id, input, meta, write_options);

let (aborter, sink) =
ObjectStoreMultiUploadAborter::initialize_upload(self.store, self.path).await?;
let sink = MultiUploadWriter::new(self.store, self.path)
.await
.context(Storage)?;
let aborter = sink.aborter();

let meta_path = Path::from(sst_util::new_metadata_path(self.path.as_ref()));

let (total_num_rows, parquet_metadata, mut data_encoder) =
match group_writer.write_all(sink, &meta_path).await {
Ok(v) => v,
Err(e) => {
multi_upload_abort(self.path, aborter).await;
multi_upload_abort(aborter).await;
return Err(e);
}
};
let time_range = parquet_metadata.time_range;

let (meta_aborter, meta_sink) =
ObjectStoreMultiUploadAborter::initialize_upload(self.store, &meta_path).await?;
let meta_size = match write_metadata(meta_sink, parquet_metadata, &meta_path).await {
let meta_sink = MultiUploadWriter::new(self.store, &meta_path)
.await
.context(Storage)?;
let meta_aborter = meta_sink.aborter();
let meta_size = match write_metadata(meta_sink, parquet_metadata).await {
Ok(v) => v,
Err(e) => {
multi_upload_abort(self.path, aborter).await;
multi_upload_abort(&meta_path, meta_aborter).await;
multi_upload_abort(aborter).await;
multi_upload_abort(meta_aborter).await;
return Err(e);
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ table_kv = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
twox-hash = "1.6"
upstream = { package = "object_store", version = "0.5.6", features = [ "aws" ] }
upstream = { package = "object_store", version = "0.10.1", features = [ "aws" ] }
uuid = { version = "1.3.3", features = ["v4"] }

[dev-dependencies]
Expand Down
35 changes: 0 additions & 35 deletions src/components/object_store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::time::Duration;

use serde::{Deserialize, Serialize};
use size_ext::ReadableSize;
use table_kv::config::ObkvConfig;
use time_ext::ReadableDuration;

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down Expand Up @@ -63,7 +62,6 @@ impl Default for StorageOptions {
pub enum ObjectStoreOptions {
Local(LocalOptions),
Aliyun(AliyunOptions),
Obkv(ObkvOptions),
S3(S3Options),
}

Expand All @@ -85,39 +83,6 @@ pub struct AliyunOptions {
pub retry: RetryOptions,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObkvOptions {
pub prefix: String,
#[serde(default = "ObkvOptions::default_shard_num")]
pub shard_num: usize,
#[serde(default = "ObkvOptions::default_part_size")]
pub part_size: ReadableSize,
#[serde(default = "ObkvOptions::default_max_object_size")]
pub max_object_size: ReadableSize,
#[serde(default = "ObkvOptions::default_upload_parallelism")]
pub upload_parallelism: usize,
/// Obkv client config
pub client: ObkvConfig,
}

impl ObkvOptions {
fn default_max_object_size() -> ReadableSize {
ReadableSize::gb(1)
}

fn default_part_size() -> ReadableSize {
ReadableSize::mb(1)
}

fn default_shard_num() -> usize {
512
}

fn default_upload_parallelism() -> usize {
8
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct S3Options {
pub region: String,
Expand Down
Loading

0 comments on commit b94c99e

Please sign in to comment.