Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: bump datafusion, add influxql aggregator support #778

Merged
merged 13 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,138 changes: 1,234 additions & 904 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ name = "ceresdb-server"
path = "src/bin/ceresdb-server.rs"

[workspace.dependencies]
arrow = { version = "32.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "32.0.0" }
arrow = { version = "34.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "34.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
Expand All @@ -73,10 +73,10 @@ cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
common_util = { path = "common_util" }
datafusion = "18.0.0"
datafusion-expr = "18.0.0"
datafusion-optimizer = "18.0.0"
datafusion-proto = "18.0.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e1b20ea3362ea62cb713004a0636b8af6a16d7" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e1b20ea3362ea62cb713004a0636b8af6a16d7" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e1b20ea3362ea62cb713004a0636b8af6a16d7" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e1b20ea3362ea62cb713004a0636b8af6a16d7" }
df_operator = { path = "df_operator" }
env_logger = "0.6"
futures = "0.3"
Expand All @@ -92,7 +92,7 @@ itertools = "0.10.5"
meta_client = { path = "meta_client" }
object_store = { path = "components/object_store" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "32.0.0" }
parquet = { version = "34.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
profile = { path = "components/profile" }
Expand All @@ -111,7 +111,7 @@ server = { path = "server" }
smallvec = "1.6"
slog = "2.7"
sql = { path = "sql" }
sqlparser = { version = "0.30", features = ["serde"] }
sqlparser = { version = "0.32", features = ["serde"] }
system_catalog = { path = "system_catalog" }
table_engine = { path = "table_engine" }
table_kv = { path = "components/table_kv" }
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl From<&i64> for Timestamp {
///
/// The start time is inclusive and the end time is exclusive: [start, end).
/// The range is empty if start equals end.
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct TimeRange {
/// The start timestamp (inclusive)
inclusive_start: Timestamp,
Expand Down
22 changes: 22 additions & 0 deletions common_util/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ macro_rules! define_result {
};
}

#[macro_export]
macro_rules! hash_map(
{ $($key:expr => $value:expr),+ } => {
{
let mut m = ::std::collections::HashMap::new();
$(
m.insert($key, $value);
)+
m
}
};
);

#[cfg(test)]
mod tests {
#[test]
Expand All @@ -22,4 +35,13 @@ mod tests {

assert_eq!(Err(18), return_i32_error());
}

#[test]
fn test_hash_map() {
let m = hash_map! { 1 => "hello", 2 => "world" };

assert_eq!(2, m.len());
assert_eq!("hello", *m.get(&1).unwrap());
assert_eq!("world", *m.get(&2).unwrap());
}
}
17 changes: 15 additions & 2 deletions components/parquet_ext/src/prune/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use std::sync::Arc;

use arrow::{array::ArrayRef, datatypes::Schema as ArrowSchema};
use datafusion::{
common::ToDFSchema,
error::Result as DataFusionResult,
physical_expr::{create_physical_expr, execution_props::ExecutionProps},
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
physical_plan::PhysicalExpr,
prelude::{Column, Expr},
scalar::ScalarValue,
};
Expand Down Expand Up @@ -40,9 +44,10 @@ fn filter_row_groups_inner(
row_groups: &[RowGroupMetaData],
) -> Vec<bool> {
let mut results = vec![true; row_groups.len()];
// let arrow_schema: SchemaRef = schema.clone().into_arrow_schema_ref();
for expr in exprs {
match PruningPredicate::try_new(expr.clone(), schema.clone()) {
match logical2physical(expr, &schema)
.and_then(|physical_expr| PruningPredicate::try_new(physical_expr, schema.clone()))
{
Ok(pruning_predicate) => {
trace!("pruning_predicate is:{:?}", pruning_predicate);

Expand All @@ -66,6 +71,14 @@ fn filter_row_groups_inner(
results
}

fn logical2physical(expr: &Expr, schema: &ArrowSchema) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
schema.clone().to_dfschema().and_then(|df_schema| {
// TODO: props should be an argument
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props)
})
}

fn build_row_group_predicate(
predicate_builder: &PruningPredicate,
row_group_metadata: &[RowGroupMetaData],
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ FROM issue59
GROUP BY id+1;

plan_type,plan,
String("logical_plan"),String("Projection: issue59.id + Int64(1), COUNT(DISTINCT issue59.account)\n Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[issue59.id + Int64(1)@0 as issue59.id + Int64(1), COUNT(DISTINCT issue59.account)@1 as COUNT(DISTINCT issue59.account)]\n ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n Projection: issue59.id, issue59.account\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),


DROP TABLE IF EXISTS issue59;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/dummy/select_1.result
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Int64(1),

SELECT x;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SELECT x;. Caused by: Failed to create plan, err:Failed to generate datafusion plan, err:Schema error: No field named 'x'." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SELECT x;. Caused by: Failed to create plan, err:Failed to generate datafusion plan, err:Schema error: No field named \"x\"." })

SELECT 'a';

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/common/optimizer/optimizer.result
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ affected_rows: 0
EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY name;

plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"),
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n Projection: 07_optimizer_t.name, 07_optimizer_t.value\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8, order=None, \n"),


DROP TABLE `07_optimizer_t`;
Expand Down
16 changes: 14 additions & 2 deletions integration_tests/cases/env/local/influxql/basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ affected_rows: 6
-- SQLNESS ARG protocol=influxql
SELECT * FROM "h2o_feet";

{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","level_description","location","water_level"],"values":[[1439827200000,"below 3 feet","santa_monica",2.064],[1439827200000,"between 6 and 9 feet","coyote_creek",8.12],[1439827560000,"below 3 feet","santa_monica",2.116],[1439827560000,"between 6 and 9 feet","coyote_creek",8.005],[1439827620000,"below 3 feet","santa_monica",2.028],[1439827620000,"between 6 and 9 feet","coyote_creek",7.887]]}]}]}
{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","level_description","location","tsid","water_level"],"values":[[1439827200000,"below 3 feet","santa_monica",8247797837995683878,2.064],[1439827200000,"between 6 and 9 feet","coyote_creek",4483051411356144610,8.12],[1439827560000,"below 3 feet","santa_monica",8247797837995683878,2.116],[1439827560000,"between 6 and 9 feet","coyote_creek",4483051411356144610,8.005],[1439827620000,"below 3 feet","santa_monica",8247797837995683878,2.028],[1439827620000,"between 6 and 9 feet","coyote_creek",4483051411356144610,7.887]]}]}]}

-- SQLNESS ARG protocol=influxql method=get
SELECT * FROM "h2o_feet";

{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","level_description","location","water_level"],"values":[[1439827200000,"below 3 feet","santa_monica",2.064],[1439827200000,"between 6 and 9 feet","coyote_creek",8.12],[1439827560000,"below 3 feet","santa_monica",2.116],[1439827560000,"between 6 and 9 feet","coyote_creek",8.005],[1439827620000,"below 3 feet","santa_monica",2.028],[1439827620000,"between 6 and 9 feet","coyote_creek",7.887]]}]}]}
{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","level_description","location","tsid","water_level"],"values":[[1439827200000,"below 3 feet","santa_monica",8247797837995683878,2.064],[1439827200000,"between 6 and 9 feet","coyote_creek",4483051411356144610,8.12],[1439827560000,"below 3 feet","santa_monica",8247797837995683878,2.116],[1439827560000,"between 6 and 9 feet","coyote_creek",4483051411356144610,8.005],[1439827620000,"below 3 feet","santa_monica",8247797837995683878,2.028],[1439827620000,"between 6 and 9 feet","coyote_creek",4483051411356144610,7.887]]}]}]}

-- SQLNESS ARG protocol=influxql
SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica';
Expand All @@ -44,6 +44,18 @@ show measurements;

{"results":[{"statement_id":0,"series":[{"name":"measurements","columns":["name"],"values":[["h2o_feet"]]}]}]}

-- SQLNESS ARG protocol=influxql
SELECT count(water_level) FROM "h2o_feet"
group by location;

{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","location","count"],"values":[[0,"coyote_creek",3],[0,"santa_monica",3]]}]}]}

-- SQLNESS ARG protocol=influxql
SELECT count(water_level) FROM "h2o_feet"
group by time(5m), location;

{"code":500,"message":"UNKNOWN_ERROR: Rejection(InfluxDbHandlerWithCause { msg: \"failed to query by influxql\", source: CreatePlan { query: \" SELECT count(water_level) FROM \\\"h2o_feet\\\" \\n group by time(5m), location;\", source: CreatePlan { source: BuildInfluxqlPlan { source: BuildPlanWithCause { msg: \"planner stmt to plan\", source: NotImplemented(\"FILL(NULL)\") } } } } })"}

DROP TABLE IF EXISTS `h2o_feet`;

affected_rows: 0
Expand Down
8 changes: 8 additions & 0 deletions integration_tests/cases/env/local/influxql/basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ SELECT "level_description", location, water_level FROM "h2o_feet" where location
-- SQLNESS ARG protocol=influxql
show measurements;

-- SQLNESS ARG protocol=influxql
SELECT count(water_level) FROM "h2o_feet"
group by location;

-- SQLNESS ARG protocol=influxql
SELECT count(water_level) FROM "h2o_feet"
group by time(5m), location;

DROP TABLE IF EXISTS `h2o_feet`;
2 changes: 1 addition & 1 deletion interpreters/src/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl ShowInterpreter {
QueryType::InfluxQL => {
// TODO: refactor those constants
let schema = DataSchema::new(vec![
Field::new("ceresdb::measurement", DataType::Utf8, false),
Field::new("iox::measurement", DataType::Utf8, false),
Field::new("name", DataType::Utf8, false),
]);

Expand Down
4 changes: 2 additions & 2 deletions interpreters/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ where
"+------------+---------------------+--------+--------+------------+--------------+",
"| key1 | key2 | field1 | field2 | field3 | field4 |",
"+------------+---------------------+--------+--------+------------+--------------+",
"| 7461676b | 2021-12-02T07:00:34 | 100 | hello3 | 2022-10-10 | 10:10:10.234 |",
"| 7461676b32 | 2021-12-02T07:00:34 | 100 | hello3 | 2022-10-11 | 11:10:10.234 |",
"| 7461676b | 2021-12-02T07:00:34 | 100.0 | hello3 | 2022-10-10 | 10:10:10.234 |",
"| 7461676b32 | 2021-12-02T07:00:34 | 100.0 | hello3 | 2022-10-11 | 11:10:10.234 |",
"+------------+---------------------+--------+--------+------------+--------------+",
];
common_util::record_batch::assert_record_batches_eq(&expected, records);
Expand Down
2 changes: 1 addition & 1 deletion query_engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Context {
)
.with_target_partitions(config.read_parallelism);
df_session_config
.config_options_mut()
.options_mut()
.extensions
.insert(ceresdb_options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{
any::Any,
fmt::{Debug, Formatter},
hash::{Hash, Hasher},
sync::Arc,
};

Expand Down Expand Up @@ -50,7 +51,7 @@ impl ExtensionPlanner for Planner {
/// It differs from the default [`TableScan`] in its corresponding
/// [`ExecutionPlan`] is a special [`ScanTable`] which can controls the scan
/// order.
#[derive(Clone)]
#[derive(Clone, Hash, PartialEq)]
pub struct TableScanByPrimaryKey {
asc: bool,
scan_plan: Arc<LogicalPlan>,
Expand Down Expand Up @@ -154,4 +155,20 @@ impl UserDefinedLogicalNode for TableScanByPrimaryKey {
scan_plan: self.scan_plan.clone(),
})
}

fn name(&self) -> &str {
"ScanTableInPrimaryKeyOrder"
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}

fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool {
match other.as_any().downcast_ref::<Self>() {
Some(o) => self == o,
None => false,
}
}
}
2 changes: 1 addition & 1 deletion query_engine/src/logical_optimizer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl LogicalPlanNodeBuilder {
let projected_schema = self.df_schema_ref();

let plan = LogicalPlan::TableScan(TableScan {
table_name: self.table_name.clone(),
table_name: self.table_name.clone().into(),
source: Arc::new(provider),
projection: None,
projected_schema,
Expand Down
14 changes: 7 additions & 7 deletions query_engine/src/logical_optimizer/type_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct TypeConversion;

impl OptimizerRule for TypeConversion {
#[allow(clippy::only_used_in_recursion)]
#[allow(deprecated)]
fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -374,13 +375,12 @@ mod tests {
Arc::new(
DFSchema::new_with_metadata(
vec![
DFField::new(None, "c1", DataType::Utf8, true),
DFField::new(None, "c2", DataType::Int64, true),
DFField::new(None, "c3", DataType::Float64, true),
DFField::new(None, "c4", DataType::Float32, true),
DFField::new(None, "c5", DataType::Boolean, true),
DFField::new(
None,
DFField::new_unqualified("c1", DataType::Utf8, true),
DFField::new_unqualified("c2", DataType::Int64, true),
DFField::new_unqualified("c3", DataType::Float64, true),
DFField::new_unqualified("c4", DataType::Float32, true),
DFField::new_unqualified("c5", DataType::Boolean, true),
DFField::new_unqualified(
"c6",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ mod tests {
let arrow_schema = schema.to_arrow_schema_ref();
let fields = arrow_schema.fields.to_owned();
let measurement_field = ArrowField::new(
"ceresdb::measurement".to_string(),
CERESDB_MEASUREMENT_COLUMN_NAME.to_string(),
schema::DataType::Utf8,
false,
);
Expand Down
6 changes: 5 additions & 1 deletion server/src/handlers/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,11 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
);

frontend
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.influxql_stmt_to_plan(
&mut sql_ctx,
stmts.remove(0),
instance.catalog_manager.clone(),
)
.context(CreatePlan {
query: &request.query,
})?
Expand Down
Loading