Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl key partition rule #507

Merged
merged 5 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions table_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ common_types = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
df_operator = { workspace = true }
futures = { workspace = true }
itertools = "0.10.5"
log = { workspace = true }
parquet = { workspace = true }
parquet_ext = { workspace = true }
Expand All @@ -29,3 +31,6 @@ serde_derive = { workspace = true }
smallvec = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
env_logger = { workspace = true }
34 changes: 33 additions & 1 deletion table_engine/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,38 @@ pub mod rule;

use common_types::bytes::Bytes;
use proto::meta_update as meta_pb;
use snafu::{Backtrace, Snafu};

// TODO: we should refactor for splitting the errors.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display(
"Failed to build partition rule, msg:{}.\nBacktrace:{}\n",
msg,
backtrace
))]
BuildPartitionRule { msg: String, backtrace: Backtrace },

#[snafu(display(
"Failed to locate partitions for write, msg:{}.\nBacktrace:{}\n",
msg,
backtrace
))]
LocateWritePartition { msg: String, backtrace: Backtrace },

#[snafu(display(
"Failed to locate partitions for read, msg:{}.\nBacktrace:{}\n",
msg,
backtrace
))]
LocateReadPartition { msg: String, backtrace: Backtrace },

#[snafu(display("Internal error occurred, msg:{}", msg,))]
Internal { msg: String },
}

define_result!(Error);

/// Info for how to partition table
#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -23,7 +55,7 @@ impl PartitionInfo {
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct Definition {
pub name: String,
pub origin_name: Option<String>,
Expand Down
110 changes: 110 additions & 0 deletions table_engine/src/partition/rule/df_adapter/extractor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Partition filter extractor

use std::collections::HashSet;

use common_types::datum::Datum;
use datafusion_expr::{Expr, Operator};
use df_operator::visitor::find_columns_by_expr;

use crate::partition::rule::filter::{PartitionCondition, PartitionFilter};

/// The datafusion filter exprs extractor
///
/// It's used to extract the meaningful `Expr`s and convert them to
/// [PartitionFilter](the inner filter type in ceresdb).
///
/// NOTICE: When you implements [PartitionRule] for specific partition strategy,
/// you should implement the corresponding [FilterExtractor], too.
///
/// For example: [KeyRule] and [KeyExtractor].
/// If they are not related, [PartitionRule] may not take effect.
pub trait FilterExtractor: Send + Sync + 'static {
fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec<PartitionFilter>;
}
pub struct KeyExtractor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one more newline above.


impl FilterExtractor for KeyExtractor {
fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec<PartitionFilter> {
if filters.is_empty() {
return Vec::default();
}

let mut target = Vec::with_capacity(filters.len());
for filter in filters {
// If no target columns included in `filter`, ignore this `filter`.
let columns_in_filter = find_columns_by_expr(filter)
.into_iter()
.collect::<HashSet<_>>();
let find_result = columns
.iter()
.find(|col| columns_in_filter.contains(col.as_str()));

if find_result.is_none() {
continue;
}

// If target columns included, now only the situation that only target column in
// filter is supported. Once other type column found here, we ignore it.
// TODO: support above situation.
if columns_in_filter.len() != 1 {
continue;
}

// Finally, we try to convert `filter` to `PartitionFilter`.
// We just support the simple situation: "colum = value" now.
// TODO: support "colum in [value list]".
// TODO: we need to compare and check the datatype of column and value.
// (Actually, there is type conversion on high-level, but when converted data
// is overflow, it may take no effect).
let partition_filter = match filter.clone() {
Expr::BinaryExpr { left, op, right } => match (*left, op, *right) {
(Expr::Column(col), Operator::Eq, Expr::Literal(val))
| (Expr::Literal(val), Operator::Eq, Expr::Column(col)) => {
let datum_opt = Datum::from_scalar_value(&val);
datum_opt.map(|d| PartitionFilter::new(col.name, PartitionCondition::Eq(d)))
}
_ => None,
},
_ => None,
};

if let Some(pf) = partition_filter {
target.push(pf);
}
}

target
}
}

pub type FilterExtractorRef = Box<dyn FilterExtractor>;

#[cfg(test)]
mod tests {
use datafusion::scalar::ScalarValue;
use datafusion_expr::col;

use super::{FilterExtractor, *};

#[test]
fn test_key_extractor_basic() {
let extractor = KeyExtractor;

// `Eq` expr will be accepted.
let columns = vec!["col1".to_string()];
let accepted_expr = col("col1").eq(Expr::Literal(ScalarValue::Int32(Some(42))));
let partition_filter = extractor.extract(&[accepted_expr], &columns);
let expected = PartitionFilter {
column: "col1".to_string(),
condition: PartitionCondition::Eq(Datum::Int32(42)),
};
assert_eq!(partition_filter.get(0).unwrap(), &expected);

// Other expr will be rejected now.
let rejected_expr = col("col1").gt(Expr::Literal(ScalarValue::Int32(Some(42))));
let partition_filter = extractor.extract(&[rejected_expr], &columns);
assert!(partition_filter.is_empty());
}
}
Loading