Skip to content

Commit

Permalink
add BoxError trait to simplify the way to create generic error.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Feb 9, 2023
1 parent 8407006 commit 673c6f6
Show file tree
Hide file tree
Showing 84 changed files with 661 additions and 909 deletions.
11 changes: 4 additions & 7 deletions analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_util::error::BoxError;
use log::info;
use snafu::{OptionExt, ResultExt};
use table_engine::{
Expand Down Expand Up @@ -58,11 +59,7 @@ impl TableEngine for TableEngineImpl {
info!("Try to close table engine");

// Close the instance.
self.instance
.close()
.await
.map_err(|e| Box::new(e) as _)
.context(Close)?;
self.instance.close().await.box_err().context(Close)?;

info!("Table engine closed");

Expand Down Expand Up @@ -99,7 +96,7 @@ impl TableEngine for TableEngineImpl {
ANALYTIC_ENGINE_TYPE.to_string(),
space_table,
)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(Unexpected)?,
),
};
Expand Down Expand Up @@ -151,7 +148,7 @@ impl TableEngine for TableEngineImpl {
ANALYTIC_ENGINE_TYPE.to_string(),
space_table,
)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(Unexpected)?,
),
};
Expand Down
7 changes: 4 additions & 3 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::{collections::HashMap, sync::Arc};

use common_util::error::BoxError;
use log::info;
use snafu::{ensure, ResultExt};
use table_engine::table::AlterSchemaRequest;
Expand Down Expand Up @@ -146,7 +147,7 @@ impl Instance {
.wal_manager
.write(&write_ctx, &log_batch)
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.context(WriteWal {
space_id: table_data.space_id,
table: &table_data.name,
Expand Down Expand Up @@ -271,7 +272,7 @@ impl Instance {
);
let mut table_opts =
table_options::merge_table_options_for_alter(&options, &current_table_options)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(InvalidOptions {
space_id: table_data.space_id,
table: &table_data.name,
Expand Down Expand Up @@ -316,7 +317,7 @@ impl Instance {
.wal_manager
.write(&write_ctx, &log_batch)
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.context(WriteWal {
space_id: table_data.space_id,
table: &table_data.name,
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::sync::Arc;

use common_util::error::BoxError;
use log::info;
use snafu::ResultExt;
use table_engine::engine::CreateTableRequest;
Expand Down Expand Up @@ -32,7 +33,7 @@ impl Instance {

let mut table_opts =
table_options::merge_table_options_for_create(&request.options, &self.table_opts)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(InvalidOptions {
space_id: space.id,
table: &request.table_name,
Expand Down
14 changes: 6 additions & 8 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::sync::Arc;

use common_types::schema::Version;
use common_util::define_result;
use common_util::{define_result, error::GenericError};
use snafu::{Backtrace, OptionExt, Snafu};
use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
Expand Down Expand Up @@ -36,7 +36,7 @@ pub enum Error {
#[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))]
ReadMetaUpdate {
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
source: GenericError,
},

#[snafu(display(
Expand Down Expand Up @@ -106,7 +106,7 @@ pub enum Error {
space_id: SpaceId,
table: String,
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
source: GenericError,
},

#[snafu(display(
Expand All @@ -120,7 +120,7 @@ pub enum Error {
space_id: SpaceId,
table: String,
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
source: GenericError,
},

#[snafu(display(
Expand All @@ -134,7 +134,7 @@ pub enum Error {
space_id: SpaceId,
table: String,
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
source: GenericError,
},

#[snafu(display(
Expand Down Expand Up @@ -187,9 +187,7 @@ pub enum Error {
AlterDroppedTable { table: String, backtrace: Backtrace },

#[snafu(display("Failed to store version edit, err:{}", source))]
StoreVersionEdit {
source: Box<dyn std::error::Error + Send + Sync>,
},
StoreVersionEdit { source: GenericError },

#[snafu(display(
"Failed to get to log batch encoder, table:{}, wal_location:{:?}, err:{}",
Expand Down
42 changes: 17 additions & 25 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ use common_types::{
time::TimeRange,
SequenceNumber,
};
use common_util::{config::ReadableDuration, define_result, runtime::Runtime, time};
use common_util::{
config::ReadableDuration,
define_result,
error::{BoxError, GenericError},
runtime::Runtime,
time,
};
use futures::{
channel::{mpsc, mpsc::channel},
future::try_join_all,
Expand Down Expand Up @@ -62,9 +68,7 @@ const DEFAULT_CHANNEL_SIZE: usize = 5;
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display("Failed to store version edit, err:{}", source))]
StoreVersionEdit {
source: Box<dyn std::error::Error + Send + Sync>,
},
StoreVersionEdit { source: GenericError },

#[snafu(display(
"Failed to purge wal, wal_location:{:?}, sequence:{}",
Expand All @@ -78,9 +82,7 @@ pub enum Error {
},

#[snafu(display("Failed to build mem table iterator, source:{}", source))]
InvalidMemIter {
source: Box<dyn std::error::Error + Send + Sync>,
},
InvalidMemIter { source: GenericError },

#[snafu(display(
"Failed to create sst writer, storage_format_hint:{:?}, err:{}",
Expand All @@ -93,10 +95,7 @@ pub enum Error {
},

#[snafu(display("Failed to write sst, file_path:{}, source:{}", path, source))]
WriteSst {
path: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
WriteSst { path: String, source: GenericError },

#[snafu(display("Background flush failed, cannot schedule flush task, err:{}", source))]
BackgroundFlushFailed {
Expand Down Expand Up @@ -125,9 +124,7 @@ pub enum Error {
},

#[snafu(display("Failed to split record batch, source:{}", source))]
SplitRecordBatch {
source: Box<dyn std::error::Error + Send + Sync>,
},
SplitRecordBatch { source: GenericError },

#[snafu(display("Failed to read sst meta, source:{}", source))]
ReadSstMeta {
Expand Down Expand Up @@ -680,7 +677,7 @@ impl Instance {

for data in iter {
for (idx, record_batch) in split_record_batch_with_time_ranges(
data.map_err(|e| Box::new(e) as _).context(InvalidMemIter)?,
data.box_err().context(InvalidMemIter)?,
&time_ranges,
timestamp_idx,
)?
Expand Down Expand Up @@ -771,7 +768,7 @@ impl Instance {
let sst_info = writer
.write(request_id, &sst_meta, record_batch_stream)
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.with_context(|| WriteSst {
path: sst_file_path.to_string(),
})?;
Expand Down Expand Up @@ -995,7 +992,7 @@ impl SpaceStore {
let sst_info = sst_writer
.write(request_id, &sst_meta, record_batch_stream)
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.with_context(|| WriteSst {
path: sst_file_path.to_string(),
})?;
Expand Down Expand Up @@ -1098,7 +1095,7 @@ fn split_record_batch_with_time_ranges(
};
builders[idx]
.append_row_view(&view)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(SplitRecordBatch)?;
} else {
panic!(
Expand All @@ -1109,12 +1106,7 @@ fn split_record_batch_with_time_ranges(
}
let mut ret = Vec::with_capacity(builders.len());
for mut builder in builders {
ret.push(
builder
.build()
.map_err(|e| Box::new(e) as _)
.context(SplitRecordBatch)?,
);
ret.push(builder.build().box_err().context(SplitRecordBatch)?);
}
Ok(ret)
}
Expand All @@ -1131,7 +1123,7 @@ fn build_mem_table_iter(memtable: MemTableRef, table_data: &TableData) -> Result
};
memtable
.scan(scan_ctx, scan_req)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(InvalidMemIter)
}

Expand Down
13 changes: 5 additions & 8 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common_types::{
projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema,
time::TimeRange,
};
use common_util::{define_result, runtime::Runtime};
use common_util::{define_result, error::BoxError, runtime::Runtime};
use futures::stream::Stream;
use log::{debug, error, trace};
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -333,20 +333,17 @@ where
runtime.spawn(async move {
for mut iter in collection {
while let Some(record_batch) = iter.next_batch().await.transpose() {
let record_batch =
record_batch
.map_err(|e| Box::new(e) as _)
.context(ErrWithSource {
msg: "read record batch",
});
let record_batch = record_batch.box_err().context(ErrWithSource {
msg: "read record batch",
});

// Apply the projection to RecordBatchWithKey and gets the final RecordBatch.
let record_batch = record_batch.and_then(|batch_with_key| {
// TODO(yingwen): Try to use projector to do this, which precompute row
// indexes to project.
batch_with_key
.try_project(&projected_schema)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(ErrWithSource {
msg: "project record batch",
})
Expand Down
17 changes: 7 additions & 10 deletions analytic_engine/src/instance/write_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{

use common_util::{
define_result,
error::{BoxError, GenericError},
runtime::{JoinHandle, Runtime},
time::InstantExt,
};
Expand Down Expand Up @@ -43,9 +44,7 @@ use crate::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to wait flush completed, channel disconnected, err:{}", source))]
WaitFlush {
source: Box<dyn std::error::Error + Send + Sync>,
},
WaitFlush { source: GenericError },

#[snafu(display(
"Background flush failed, cannot write more data, err:{}.\nBacktrace:\n{}",
Expand All @@ -67,9 +66,7 @@ pub enum Error {
},

#[snafu(display("Channel error, err:{}", source))]
Channel {
source: Box<dyn std::error::Error + Send + Sync>,
},
Channel { source: GenericError },

#[snafu(display(
"Disallowed to manipulate table data, table does not belong to the worker, table:{}, worker:{}.\nBacktrace:\n{}",
Expand Down Expand Up @@ -232,7 +229,7 @@ impl WorkerLocal {
self.background_rx
.changed()
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.context(WaitFlush)?;
}
assert!(!self.data.is_flushing());
Expand Down Expand Up @@ -534,7 +531,7 @@ pub async fn process_command_in_write_worker<T, E: std::error::Error + Send + Sy

// Receive alter options result.
match rx.await {
Ok(res) => res.map_err(|e| Box::new(e) as _).context(Channel),
Ok(res) => res.box_err().context(Channel),
Err(_) => ReceiveFromWorker {
table: &table_data.name,
worker_id: table_data.write_handle.worker_id(),
Expand All @@ -553,7 +550,7 @@ pub async fn join_all<T, E: std::error::Error + Send + Sync + 'static>(
let table_data = &table_vec[pos];
match res {
Ok(res) => {
res.map_err(|e| Box::new(e) as _).context(Channel)?;
res.box_err().context(Channel)?;
}
Err(_) => {
return ReceiveFromWorker {
Expand Down Expand Up @@ -910,7 +907,7 @@ impl WriteWorker {
TableAlterSchemaPolicy::Unknown,
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
.map_err(|e| Box::new(e) as GenericError)
.context(Channel);
if let Err(res) = tx.send(alter_res) {
error!(
Expand Down
10 changes: 3 additions & 7 deletions analytic_engine/src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use common_types::{
schema::{IndexInWriterSchema, Schema},
SequenceNumber,
};
use common_util::define_result;
use common_util::{define_result, error::GenericError};
use snafu::{Backtrace, Snafu};

use crate::memtable::key::KeySequence;
Expand Down Expand Up @@ -65,14 +65,10 @@ pub enum Error {
},

#[snafu(display("Invalid row, err:{}", source))]
InvalidRow {
source: Box<dyn std::error::Error + Send + Sync>,
},
InvalidRow { source: GenericError },

#[snafu(display("Fail to iter in reverse order, err:{}", source))]
IterReverse {
source: Box<dyn std::error::Error + Send + Sync>,
},
IterReverse { source: GenericError },

#[snafu(display("Timeout when iter memtable.\nBacktrace:\n{}", backtrace))]
IterTimeout { backtrace: Backtrace },
Expand Down
Loading

0 comments on commit 673c6f6

Please sign in to comment.