Skip to content

Commit

Permalink
feat: impl key partition rule (#507)
Browse files Browse the repository at this point in the history
* impl key partition rule(draft).

* refactor to more extensible and testable version.

* add tests for key partition.

* add df adapter and partition rule's building.

* address CR.
  • Loading branch information
Rachelint authored Dec 27, 2022
1 parent 6139d89 commit eb6af7a
Show file tree
Hide file tree
Showing 10 changed files with 941 additions and 44 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions table_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ common_types = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
df_operator = { workspace = true }
futures = { workspace = true }
itertools = "0.10.5"
log = { workspace = true }
parquet = { workspace = true }
parquet_ext = { workspace = true }
Expand All @@ -29,3 +31,6 @@ serde_derive = { workspace = true }
smallvec = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
env_logger = { workspace = true }
34 changes: 33 additions & 1 deletion table_engine/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,38 @@ pub mod rule;

use common_types::bytes::Bytes;
use proto::meta_update as meta_pb;
use snafu::{Backtrace, Snafu};

// TODO: we should refactor for splitting the errors.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display(
"Failed to build partition rule, msg:{}.\nBacktrace:{}\n",
msg,
backtrace
))]
BuildPartitionRule { msg: String, backtrace: Backtrace },

#[snafu(display(
"Failed to locate partitions for write, msg:{}.\nBacktrace:{}\n",
msg,
backtrace
))]
LocateWritePartition { msg: String, backtrace: Backtrace },

#[snafu(display(
"Failed to locate partitions for read, msg:{}.\nBacktrace:{}\n",
msg,
backtrace
))]
LocateReadPartition { msg: String, backtrace: Backtrace },

#[snafu(display("Internal error occurred, msg:{}", msg,))]
Internal { msg: String },
}

define_result!(Error);

/// Info for how to partition table
#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -23,7 +55,7 @@ impl PartitionInfo {
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct Definition {
pub name: String,
pub origin_name: Option<String>,
Expand Down
110 changes: 110 additions & 0 deletions table_engine/src/partition/rule/df_adapter/extractor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Partition filter extractor

use std::collections::HashSet;

use common_types::datum::Datum;
use datafusion_expr::{Expr, Operator};
use df_operator::visitor::find_columns_by_expr;

use crate::partition::rule::filter::{PartitionCondition, PartitionFilter};

/// The datafusion filter exprs extractor
///
/// It's used to extract the meaningful `Expr`s and convert them to
/// [PartitionFilter](the inner filter type in ceresdb).
///
/// NOTICE: When you implements [PartitionRule] for specific partition strategy,
/// you should implement the corresponding [FilterExtractor], too.
///
/// For example: [KeyRule] and [KeyExtractor].
/// If they are not related, [PartitionRule] may not take effect.
pub trait FilterExtractor: Send + Sync + 'static {
fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec<PartitionFilter>;
}
pub struct KeyExtractor;

impl FilterExtractor for KeyExtractor {
fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec<PartitionFilter> {
if filters.is_empty() {
return Vec::default();
}

let mut target = Vec::with_capacity(filters.len());
for filter in filters {
// If no target columns included in `filter`, ignore this `filter`.
let columns_in_filter = find_columns_by_expr(filter)
.into_iter()
.collect::<HashSet<_>>();
let find_result = columns
.iter()
.find(|col| columns_in_filter.contains(col.as_str()));

if find_result.is_none() {
continue;
}

// If target columns included, now only the situation that only target column in
// filter is supported. Once other type column found here, we ignore it.
// TODO: support above situation.
if columns_in_filter.len() != 1 {
continue;
}

// Finally, we try to convert `filter` to `PartitionFilter`.
// We just support the simple situation: "colum = value" now.
// TODO: support "colum in [value list]".
// TODO: we need to compare and check the datatype of column and value.
// (Actually, there is type conversion on high-level, but when converted data
// is overflow, it may take no effect).
let partition_filter = match filter.clone() {
Expr::BinaryExpr { left, op, right } => match (*left, op, *right) {
(Expr::Column(col), Operator::Eq, Expr::Literal(val))
| (Expr::Literal(val), Operator::Eq, Expr::Column(col)) => {
let datum_opt = Datum::from_scalar_value(&val);
datum_opt.map(|d| PartitionFilter::new(col.name, PartitionCondition::Eq(d)))
}
_ => None,
},
_ => None,
};

if let Some(pf) = partition_filter {
target.push(pf);
}
}

target
}
}

pub type FilterExtractorRef = Box<dyn FilterExtractor>;

#[cfg(test)]
mod tests {
use datafusion::scalar::ScalarValue;
use datafusion_expr::col;

use super::{FilterExtractor, *};

#[test]
fn test_key_extractor_basic() {
let extractor = KeyExtractor;

// `Eq` expr will be accepted.
let columns = vec!["col1".to_string()];
let accepted_expr = col("col1").eq(Expr::Literal(ScalarValue::Int32(Some(42))));
let partition_filter = extractor.extract(&[accepted_expr], &columns);
let expected = PartitionFilter {
column: "col1".to_string(),
condition: PartitionCondition::Eq(Datum::Int32(42)),
};
assert_eq!(partition_filter.get(0).unwrap(), &expected);

// Other expr will be rejected now.
let rejected_expr = col("col1").gt(Expr::Literal(ScalarValue::Int32(Some(42))));
let partition_filter = extractor.extract(&[rejected_expr], &columns);
assert!(partition_filter.is_empty());
}
}
Loading

0 comments on commit eb6af7a

Please sign in to comment.