From 21bafd8c3b23f808e368de2c003ce525674cce88 Mon Sep 17 00:00:00 2001 From: Wenxuan Shi Date: Fri, 26 Apr 2019 21:11:51 +0800 Subject: [PATCH] Adds BatchSelectionExecutor (#4562) Signed-off-by: Breezewish --- src/coprocessor/dag/batch/executors/mod.rs | 2 + .../dag/batch/executors/selection_executor.rs | 674 ++++++++++++++++++ .../dag/batch/executors/util/mock_executor.rs | 38 + .../dag/batch/executors/util/mod.rs | 2 + src/coprocessor/dag/batch_handler.rs | 22 +- src/coprocessor/dag/builder.rs | 61 +- src/coprocessor/dag/expr/ctx.rs | 4 +- src/coprocessor/dag/mod.rs | 2 +- src/coprocessor/dag/rpn_expr/mod.rs | 4 +- .../dag/rpn_expr/types/expr_builder.rs | 28 + tests/integrations/coprocessor/test_select.rs | 17 + 11 files changed, 831 insertions(+), 23 deletions(-) create mode 100644 src/coprocessor/dag/batch/executors/selection_executor.rs create mode 100644 src/coprocessor/dag/batch/executors/util/mock_executor.rs diff --git a/src/coprocessor/dag/batch/executors/mod.rs b/src/coprocessor/dag/batch/executors/mod.rs index d5956caf77a..9bf95ea2a78 100644 --- a/src/coprocessor/dag/batch/executors/mod.rs +++ b/src/coprocessor/dag/batch/executors/mod.rs @@ -2,9 +2,11 @@ mod index_scan_executor; mod limit; +mod selection_executor; mod table_scan_executor; mod util; pub use self::index_scan_executor::BatchIndexScanExecutor; pub use self::limit::BatchLimitExecutor; +pub use self::selection_executor::BatchSelectionExecutor; pub use self::table_scan_executor::BatchTableScanExecutor; diff --git a/src/coprocessor/dag/batch/executors/selection_executor.rs b/src/coprocessor/dag/batch/executors/selection_executor.rs new file mode 100644 index 00000000000..60905b920f6 --- /dev/null +++ b/src/coprocessor/dag/batch/executors/selection_executor.rs @@ -0,0 +1,674 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::Arc; + +use tipb::executor::Selection; +use tipb::expression::Expr; +use tipb::expression::FieldType; + +use super::super::interface::*; +use crate::coprocessor::dag::batch::statistics::ExecSummaryCollectorDisabled; +use crate::coprocessor::dag::expr::{EvalConfig, EvalContext}; +use crate::coprocessor::dag::rpn_expr::{RpnExpression, RpnExpressionBuilder}; +use crate::coprocessor::{Error, Result}; + +pub struct BatchSelectionExecutor { + summary_collector: C, + context: EvalContext, + src: Src, + + conditions: Vec, +} + +impl BatchSelectionExecutor> { + /// Checks whether this executor can be used. + #[inline] + pub fn check_supported(descriptor: &Selection) -> Result<()> { + let conditions = descriptor.get_conditions(); + for c in conditions { + RpnExpressionBuilder::check_expr_tree_supported(c).map_err(|e| { + Error::Other(box_err!("Unable to use BatchSelectionExecutor: {}", e)) + })?; + } + Ok(()) + } +} + +impl BatchSelectionExecutor { + #[cfg(test)] + pub fn new_for_test(src: Src, conditions: Vec) -> Self { + Self { + summary_collector: ExecSummaryCollectorDisabled, + context: EvalContext::default(), + src, + conditions, + } + } +} + +impl BatchSelectionExecutor { + pub fn new( + summary_collector: C, + config: Arc, + src: Src, + conditions_def: Vec, + ) -> Result { + let mut conditions = Vec::with_capacity(conditions_def.len()); + for def in conditions_def { + conditions.push(RpnExpressionBuilder::build_from_expr_tree( + def, + &config.tz, + src.schema().len(), + )?); + } + + Ok(Self { + summary_collector, + context: EvalContext::new(config), + src, + conditions, + }) + } + + #[inline] + fn handle_next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult { + let mut src_result = self.src.next_batch(scan_rows); + + // When there are errors during the `next_batch()` in the src executor, it means that the + // first several rows do not have error, which should be filtered according to predicate + // in this executor. So we actually don't care whether or not there are errors from src + // executor. + + let rows_len = src_result.data.rows_len(); + if rows_len > 0 { + let mut base_retain_map = vec![true; rows_len]; + let mut head_retain_map = vec![false; rows_len]; + + for condition in &self.conditions { + let r = condition.eval_as_mysql_bools( + &mut self.context, + rows_len, + self.src.schema(), + &mut src_result.data, + head_retain_map.as_mut_slice(), + ); + if let Err(e) = r { + // TODO: Rows before we meeting an evaluation error are innocent. + src_result.is_drained = src_result.is_drained.and(Err(e)); + src_result.data.truncate(0); + return src_result; + } + for i in 0..rows_len { + base_retain_map[i] &= head_retain_map[i]; + } + } + + // TODO: When there are many conditions, it would be better to filter column each time. + + src_result + .data + .retain_rows_by_index(|idx| base_retain_map[idx]); + } + + // Only append warnings when there is no error during filtering because we clear the data + // when there are errors. + src_result.warnings.merge(&mut self.context.warnings); + src_result + } +} + +impl BatchExecutor for BatchSelectionExecutor { + #[inline] + fn schema(&self) -> &[FieldType] { + // The selection executor's schema comes from its child. + self.src.schema() + } + + #[inline] + fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult { + let timer = self.summary_collector.on_start_batch(); + let result = self.handle_next_batch(scan_rows); + self.summary_collector + .on_finish_batch(timer, result.data.rows_len()); + result + } + + #[inline] + fn collect_statistics(&mut self, destination: &mut BatchExecuteStatistics) { + self.src.collect_statistics(destination); + self.summary_collector + .collect_into(&mut destination.summary_per_executor); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use cop_datatype::FieldTypeTp; + + use crate::coprocessor::codec::batch::LazyBatchColumnVec; + use crate::coprocessor::codec::data_type::VectorValue; + use crate::coprocessor::dag::batch::executors::util::mock_executor::MockExecutor; + use crate::coprocessor::dag::expr::EvalWarnings; + use crate::coprocessor::dag::rpn_expr::types::RpnFnCallPayload; + use crate::coprocessor::dag::rpn_expr::RpnFunction; + + #[test] + fn test_empty_rows() { + #[derive(Debug, Clone, Copy)] + struct FnFoo; + + impl RpnFunction for FnFoo { + fn name(&self) -> &'static str { + "FnFoo" + } + + fn args_len(&self) -> usize { + 0 + } + + fn eval( + &self, + _rows: usize, + _context: &mut EvalContext, + _payload: RpnFnCallPayload<'_>, + ) -> Result { + // This function should never be called because we filter no rows + unreachable!() + } + + fn box_clone(&self) -> Box { + Box::new(*self) + } + } + + let src_exec = MockExecutor::new( + vec![], + vec![ + BatchExecuteResult { + data: LazyBatchColumnVec::empty(), + warnings: EvalWarnings::default(), + is_drained: Ok(false), + }, + BatchExecuteResult { + data: LazyBatchColumnVec::empty(), + warnings: EvalWarnings::default(), + is_drained: Ok(true), + }, + ], + ); + + let mut exec = BatchSelectionExecutor::new_for_test( + src_exec, + vec![RpnExpressionBuilder::new() + .push_fn_call(FnFoo, FieldTypeTp::LongLong) + .build()], + ); + + // When source executor returns empty rows, selection executor should process correctly. + // No errors should be generated and the predicate function should not be called. + + let r = exec.next_batch(1); + // The scan rows parameter has no effect for mock executor. We don't care. + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(r.is_drained.unwrap()); + } + + /// Builds an executor that will return these data: + /// + /// == Schema == + /// Col0 (Int) Col1(Real) + /// == Call #1 == + /// 1 NULL + /// NULL 7.0 + /// == Call #2 == + /// == Call #3 == + /// NULL NULL + /// (drained) + fn make_src_executor_using_fixture_1() -> MockExecutor { + MockExecutor::new( + vec![FieldTypeTp::LongLong.into(), FieldTypeTp::Double.into()], + vec![ + BatchExecuteResult { + data: LazyBatchColumnVec::from(vec![ + VectorValue::Int(vec![Some(1), None]), + VectorValue::Real(vec![None, Some(7.0)]), + ]), + warnings: EvalWarnings::default(), + is_drained: Ok(false), + }, + BatchExecuteResult { + data: LazyBatchColumnVec::empty(), + warnings: EvalWarnings::default(), + is_drained: Ok(false), + }, + BatchExecuteResult { + data: LazyBatchColumnVec::from(vec![ + VectorValue::Int(vec![None]), + VectorValue::Real(vec![None]), + ]), + warnings: EvalWarnings::default(), + is_drained: Ok(true), + }, + ], + ) + } + + /// Tests the scenario that there is no predicate or there is a predicate but always returns + /// true (no data is filtered). + #[test] + fn test_no_predicate_or_predicate_always_true() { + // Build a selection executor without predicate. + let exec_no_predicate = + |src_exec: MockExecutor| BatchSelectionExecutor::new_for_test(src_exec, vec![]); + + // Build a selection executor with a predicate that always returns true. + let exec_predicate_true = |src_exec: MockExecutor| { + let predicate = RpnExpressionBuilder::new() + .push_constant(1i64, FieldTypeTp::LongLong) + .build(); + BatchSelectionExecutor::new_for_test(src_exec, vec![predicate]) + }; + + let executor_builders: Vec _>> = + vec![Box::new(exec_no_predicate), Box::new(exec_predicate_true)]; + + for exec_builder in executor_builders { + let src_exec = make_src_executor_using_fixture_1(); + + let mut exec = exec_builder(src_exec); + + // The selection executor should return data as it is. + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 2); + assert_eq!(r.data.columns_len(), 2); + assert_eq!(r.data[0].decoded().as_int_slice(), &[Some(1), None]); + assert_eq!(r.data[1].decoded().as_real_slice(), &[None, Some(7.0)]); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 1); + assert_eq!(r.data.columns_len(), 2); + assert_eq!(r.data[0].decoded().as_int_slice(), &[None]); + assert_eq!(r.data[1].decoded().as_real_slice(), &[None]); + assert!(r.is_drained.unwrap()); + } + } + + /// Tests the scenario that the predicate always returns false. + #[test] + fn test_predicate_always_false() { + let src_exec = make_src_executor_using_fixture_1(); + + let predicate = RpnExpressionBuilder::new() + .push_constant(0i64, FieldTypeTp::LongLong) + .build(); + let mut exec = BatchSelectionExecutor::new_for_test(src_exec, vec![predicate]); + + // The selection executor should always return empty rows. + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(r.is_drained.unwrap()); + } + + /// This function returns 1 when the value is even, 0 otherwise. + #[derive(Debug, Clone, Copy)] + struct FnIsEven; + + impl FnIsEven { + fn call( + _ctx: &mut EvalContext, + _payload: RpnFnCallPayload<'_>, + v: &Option, + ) -> Result> { + let r = match v { + None => Some(0), + Some(v) => { + if v % 2 == 0 { + Some(1) + } else { + Some(0) + } + } + }; + Ok(r) + } + } + + impl RpnFunction for FnIsEven { + fn name(&self) -> &'static str { + "FnIsEven" + } + + fn args_len(&self) -> usize { + 1 + } + + fn eval( + &self, + rows: usize, + context: &mut EvalContext, + payload: RpnFnCallPayload<'_>, + ) -> Result { + // This function should never be called because we filter no rows + crate::coprocessor::dag::rpn_expr::function::Helper::eval_1_arg( + rows, + Self::call, + context, + payload, + ) + } + + fn box_clone(&self) -> Box { + Box::new(*self) + } + } + + /// Builds an executor that will return these data: + /// + /// == Schema == + /// Col0 (Int) Col1(Int) Col2(Int) + /// == Call #1 == + /// 4 NULL 1 + /// NULL NULL 2 + /// 2 4 3 + /// NULL 2 4 + /// == Call #2 == + /// == Call #3 == + /// NULL NULL 2 + /// (drained) + fn make_src_executor_using_fixture_2() -> MockExecutor { + MockExecutor::new( + vec![ + FieldTypeTp::LongLong.into(), + FieldTypeTp::LongLong.into(), + FieldTypeTp::LongLong.into(), + ], + vec![ + BatchExecuteResult { + data: LazyBatchColumnVec::from(vec![ + VectorValue::Int(vec![Some(4), None, Some(2), None]), + VectorValue::Int(vec![None, None, Some(4), Some(2)]), + VectorValue::Int(vec![Some(1), Some(2), Some(3), Some(4)]), + ]), + warnings: EvalWarnings::default(), + is_drained: Ok(false), + }, + BatchExecuteResult { + data: LazyBatchColumnVec::empty(), + warnings: EvalWarnings::default(), + is_drained: Ok(false), + }, + BatchExecuteResult { + data: LazyBatchColumnVec::from(vec![ + VectorValue::Int(vec![None]), + VectorValue::Int(vec![None]), + VectorValue::Int(vec![Some(2)]), + ]), + warnings: EvalWarnings::default(), + is_drained: Ok(true), + }, + ], + ) + } + + /// Tests the scenario that the predicate returns both true and false. Rows that predicate + /// returns false should be removed from the result. + #[test] + fn test_predicate_1() { + let src_exec = make_src_executor_using_fixture_2(); + + // Use FnIsEven(column[0]) as the predicate. + + let predicate = RpnExpressionBuilder::new() + .push_column_ref(0) + .push_fn_call(FnIsEven, FieldTypeTp::LongLong) + .build(); + let mut exec = BatchSelectionExecutor::new_for_test(src_exec, vec![predicate]); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 2); + assert_eq!(r.data.columns_len(), 3); + assert_eq!(r.data[0].decoded().as_int_slice(), &[Some(4), Some(2)]); + assert_eq!(r.data[1].decoded().as_int_slice(), &[None, Some(4)]); + assert_eq!(r.data[2].decoded().as_int_slice(), &[Some(1), Some(3)]); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(r.is_drained.unwrap()); + } + + #[test] + fn test_predicate_2() { + let src_exec = make_src_executor_using_fixture_2(); + + // Use FnIsEven(column[1]) as the predicate. + + let predicate = RpnExpressionBuilder::new() + .push_column_ref(1) + .push_fn_call(FnIsEven, FieldTypeTp::LongLong) + .build(); + let mut exec = BatchSelectionExecutor::new_for_test(src_exec, vec![predicate]); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 2); + assert_eq!(r.data.columns_len(), 3); + assert_eq!(r.data[0].decoded().as_int_slice(), &[Some(2), None]); + assert_eq!(r.data[1].decoded().as_int_slice(), &[Some(4), Some(2)]); + assert_eq!(r.data[2].decoded().as_int_slice(), &[Some(3), Some(4)]); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(r.is_drained.unwrap()); + } + + /// Tests the scenario that there are multiple predicates. Only the row that all predicates + /// return true should be remained. + #[test] + fn test_multiple_predicate_1() { + // Use [FnIsEven(column[0]), FnIsEven(column[1])] as the predicate. + + let predicate: Vec<_> = (0..=1) + .map(|offset| { + RpnExpressionBuilder::new() + .push_column_ref(offset) + .push_fn_call(FnIsEven, FieldTypeTp::LongLong) + .build() + }) + .collect(); + + for predicates in vec![ + // Swap predicates should produce same results. + vec![predicate[0].clone(), predicate[1].clone()], + vec![predicate[1].clone(), predicate[0].clone()], + ] { + let src_exec = make_src_executor_using_fixture_2(); + let mut exec = BatchSelectionExecutor::new_for_test(src_exec, predicates); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 1); + assert_eq!(r.data.columns_len(), 3); + assert_eq!(r.data[0].decoded().as_int_slice(), &[Some(2)]); + assert_eq!(r.data[1].decoded().as_int_slice(), &[Some(4)]); + assert_eq!(r.data[2].decoded().as_int_slice(), &[Some(3)]); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(r.is_drained.unwrap()); + } + } + + #[test] + fn test_multiple_predicate_2() { + let predicate: Vec<_> = (0..=2) + .map(|offset| { + RpnExpressionBuilder::new() + .push_column_ref(offset) + .push_fn_call(FnIsEven, FieldTypeTp::LongLong) + .build() + }) + .collect(); + + for predicates in vec![ + // Swap predicates should produce same results. + vec![ + predicate[0].clone(), + predicate[1].clone(), + predicate[2].clone(), + ], + vec![ + predicate[1].clone(), + predicate[2].clone(), + predicate[0].clone(), + ], + ] { + let src_exec = make_src_executor_using_fixture_2(); + let mut exec = BatchSelectionExecutor::new_for_test(src_exec, predicates); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(!r.is_drained.unwrap()); + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(r.is_drained.unwrap()); + } + } + + #[test] + fn test_predicate_error() { + /// This function returns error when value is None. + #[derive(Debug, Clone, Copy)] + struct FnFoo; + + impl FnFoo { + fn call( + _ctx: &mut EvalContext, + _payload: RpnFnCallPayload<'_>, + v: &Option, + ) -> Result> { + match v { + None => Err(Error::Other(box_err!("foo"))), + Some(v) => Ok(Some(*v)), + } + } + } + + impl RpnFunction for FnFoo { + fn name(&self) -> &'static str { + "FnFoo" + } + + fn args_len(&self) -> usize { + 1 + } + + fn eval( + &self, + rows: usize, + context: &mut EvalContext, + payload: RpnFnCallPayload<'_>, + ) -> Result { + // This function should never be called because we filter no rows + crate::coprocessor::dag::rpn_expr::function::Helper::eval_1_arg( + rows, + Self::call, + context, + payload, + ) + } + + fn box_clone(&self) -> Box { + Box::new(*self) + } + } + + // The built data is as follows: + // + // == Schema == + // Col0 (Int) Col1(Int) + // == Call #1 == + // 4 4 + // 1 2 + // 2 NULL + // 1 NULL + // == Call #2 == + // (drained) + let src_exec = MockExecutor::new( + vec![FieldTypeTp::LongLong.into(), FieldTypeTp::LongLong.into()], + vec![ + BatchExecuteResult { + data: LazyBatchColumnVec::from(vec![ + VectorValue::Int(vec![Some(4), Some(1), Some(2), Some(1)]), + VectorValue::Int(vec![Some(4), Some(2), None, None]), + ]), + warnings: EvalWarnings::default(), + is_drained: Ok(false), + }, + BatchExecuteResult { + data: LazyBatchColumnVec::empty(), + warnings: EvalWarnings::default(), + is_drained: Ok(true), + }, + ], + ); + + // When evaluating predicates[0], there will be no error. However we will meet errors for + // predicates[1]. + + let predicates = (0..=1) + .map(|offset| { + RpnExpressionBuilder::new() + .push_column_ref(offset) + .push_fn_call(FnFoo, FieldTypeTp::LongLong) + .build() + }) + .collect(); + let mut exec = BatchSelectionExecutor::new_for_test(src_exec, predicates); + + // TODO: A more precise result is that the first two rows are returned and error starts from + // the third row. + + let r = exec.next_batch(1); + assert_eq!(r.data.rows_len(), 0); + assert!(r.is_drained.is_err()); + } +} diff --git a/src/coprocessor/dag/batch/executors/util/mock_executor.rs b/src/coprocessor/dag/batch/executors/util/mock_executor.rs new file mode 100644 index 00000000000..112395b4f9b --- /dev/null +++ b/src/coprocessor/dag/batch/executors/util/mock_executor.rs @@ -0,0 +1,38 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use tipb::expression::FieldType; + +use crate::coprocessor::dag::batch::interface::*; + +/// A simple mock executor that will return batch data according to a fixture without any +/// modification. +/// +/// Normally this should be only used in tests. +pub struct MockExecutor { + schema: Vec, + results: std::vec::IntoIter, +} + +impl MockExecutor { + pub fn new(schema: Vec, results: Vec) -> Self { + assert!(!results.is_empty()); + Self { + schema, + results: results.into_iter(), + } + } +} + +impl BatchExecutor for MockExecutor { + fn schema(&self) -> &[FieldType] { + &self.schema + } + + fn next_batch(&mut self, _scan_rows: usize) -> BatchExecuteResult { + self.results.next().unwrap() + } + + fn collect_statistics(&mut self, _destination: &mut BatchExecuteStatistics) { + // Do nothing + } +} diff --git a/src/coprocessor/dag/batch/executors/util/mod.rs b/src/coprocessor/dag/batch/executors/util/mod.rs index 2b25c2153f1..69466c452b2 100644 --- a/src/coprocessor/dag/batch/executors/util/mod.rs +++ b/src/coprocessor/dag/batch/executors/util/mod.rs @@ -1,4 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +#[cfg(test)] +pub mod mock_executor; pub mod ranges_iter; pub mod scan_executor; diff --git a/src/coprocessor/dag/batch_handler.rs b/src/coprocessor/dag/batch_handler.rs index 0548004072a..e24aedd6bc1 100644 --- a/src/coprocessor/dag/batch_handler.rs +++ b/src/coprocessor/dag/batch_handler.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use protobuf::{Message, RepeatedField}; use kvproto::coprocessor::Response; +use tipb::executor::ExecutorExecutionSummary; use tipb::select::{Chunk, SelectResponse}; use super::batch::interface::{BatchExecuteStatistics, BatchExecutor}; @@ -131,7 +132,7 @@ impl RequestHandler for BatchDAGHandler { let mut resp = Response::new(); let mut sel_resp = SelectResponse::new(); - sel_resp.set_chunks(RepeatedField::from_vec(chunks)); + sel_resp.set_chunks(chunks.into()); // TODO: output_counts should not be i64. Let's fix it in Coprocessor DAG V2. sel_resp.set_output_counts( self.statistics @@ -140,8 +141,25 @@ impl RequestHandler for BatchDAGHandler { .map(|v| *v as i64) .collect(), ); + sel_resp.set_execution_summaries(RepeatedField::from_vec( + self.statistics + .summary_per_executor + .iter() + .map(|summary| { + let mut ret = ExecutorExecutionSummary::new(); + if let Some(summary) = summary { + ret.set_num_iterations(summary.num_iterations as u64); + ret.set_num_produced_rows(summary.num_produced_rows as u64); + ret.set_time_processed_ns( + summary.time_processed_ms as u64 * 1_000_000, + ); + } + ret + }) + .collect(), + )); - sel_resp.set_warnings(RepeatedField::from_vec(warnings.warnings)); + sel_resp.set_warnings(warnings.warnings.into()); sel_resp.set_warning_count(warnings.warning_cnt as i64); let data = box_try!(sel_resp.write_to_bytes()); diff --git a/src/coprocessor/dag/builder.rs b/src/coprocessor/dag/builder.rs index 7c6a9ce3ef0..5c749595290 100644 --- a/src/coprocessor/dag/builder.rs +++ b/src/coprocessor/dag/builder.rs @@ -14,7 +14,7 @@ use super::executor::{ Executor, HashAggExecutor, LimitExecutor, ScanExecutor, SelectionExecutor, StreamAggExecutor, TopNExecutor, }; -use crate::coprocessor::dag::batch::statistics::ExecSummaryCollectorDisabled; +use crate::coprocessor::dag::batch::statistics::*; use crate::coprocessor::dag::expr::{EvalConfig, Flag, SqlMode}; use crate::coprocessor::metrics::*; use crate::coprocessor::*; @@ -42,6 +42,10 @@ impl DAGBuilder { let descriptor = ed.get_idx_scan(); BatchIndexScanExecutor::check_supported(&descriptor)?; } + ExecType::TypeSelection => { + let descriptor = ed.get_selection(); + BatchSelectionExecutor::check_supported(&descriptor)?; + } ExecType::TypeLimit => {} _ => { return Err(box_err!("Unsupported executor {:?}", ed.get_tp())); @@ -53,7 +57,7 @@ impl DAGBuilder { } // Note: `S` is `'static` because we have trait objects `Executor`. - pub fn build_batch( + pub fn build_batch( executor_descriptors: Vec, store: S, ranges: Vec, @@ -65,6 +69,7 @@ impl DAGBuilder { .ok_or_else(|| Error::Other(box_err!("No executors")))?; let mut executor: Box; + let mut summary_slot_index = 0; match first_ed.get_tp() { ExecType::TypeTableScan => { @@ -74,7 +79,7 @@ impl DAGBuilder { let mut descriptor = first_ed.take_tbl_scan(); let columns_info = descriptor.take_columns().into_vec(); executor = Box::new(BatchTableScanExecutor::new( - ExecSummaryCollectorDisabled, + C::new(summary_slot_index), store, config.clone(), columns_info, @@ -88,7 +93,7 @@ impl DAGBuilder { let mut descriptor = first_ed.take_idx_scan(); let columns_info = descriptor.take_columns().into_vec(); executor = Box::new(BatchIndexScanExecutor::new( - ExecSummaryCollectorDisabled, + C::new(summary_slot_index), store, config.clone(), columns_info, @@ -105,14 +110,28 @@ impl DAGBuilder { } } - for ed in executor_descriptors { - match ed.get_tp() { + for mut ed in executor_descriptors { + summary_slot_index += 1; + + let new_executor: Box = match ed.get_tp() { + ExecType::TypeSelection => { + COPR_EXECUTOR_COUNT.with_label_values(&["selection"]).inc(); + + Box::new(BatchSelectionExecutor::new( + C::new(summary_slot_index), + config.clone(), + executor, + ed.take_selection().take_conditions().into_vec(), + )?) + } ExecType::TypeLimit => { - executor = Box::new(BatchLimitExecutor::new( - ExecSummaryCollectorDisabled, + COPR_EXECUTOR_COUNT.with_label_values(&["limit"]).inc(); + + Box::new(BatchLimitExecutor::new( + C::new(summary_slot_index), executor, ed.get_limit().get_limit() as usize, - )?); + )?) } _ => { return Err(Error::Other(box_err!( @@ -120,7 +139,8 @@ impl DAGBuilder { first_ed.get_tp() ))); } - } + }; + executor = new_executor; } Ok(executor) @@ -243,12 +263,21 @@ impl DAGBuilder { let executors_len = req.get_executors().len(); let config = Arc::new(config); - let out_most_executor = super::builder::DAGBuilder::build_batch( - req.take_executors().into_vec(), - store, - ranges, - config.clone(), - )?; + let out_most_executor = if req.get_collect_execution_summaries() { + super::builder::DAGBuilder::build_batch::<_, ExecSummaryCollectorEnabled>( + req.take_executors().into_vec(), + store, + ranges, + config.clone(), + )? + } else { + super::builder::DAGBuilder::build_batch::<_, ExecSummaryCollectorDisabled>( + req.take_executors().into_vec(), + store, + ranges, + config.clone(), + )? + }; // Check output offsets let output_offsets = req.take_output_offsets(); diff --git a/src/coprocessor/dag/expr/ctx.rs b/src/coprocessor/dag/expr/ctx.rs index 4843229b97b..a0fe88ab066 100644 --- a/src/coprocessor/dag/expr/ctx.rs +++ b/src/coprocessor/dag/expr/ctx.rs @@ -140,7 +140,7 @@ impl EvalConfig { } // Warning details caused in eval computation. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct EvalWarnings { // max number of warnings to return. max_warning_cnt: usize, @@ -151,7 +151,7 @@ pub struct EvalWarnings { } impl EvalWarnings { - pub fn new(max_warning_cnt: usize) -> EvalWarnings { + fn new(max_warning_cnt: usize) -> EvalWarnings { EvalWarnings { max_warning_cnt, warning_cnt: 0, diff --git a/src/coprocessor/dag/mod.rs b/src/coprocessor/dag/mod.rs index 2129e61d8e4..6aaafdca6df 100644 --- a/src/coprocessor/dag/mod.rs +++ b/src/coprocessor/dag/mod.rs @@ -25,7 +25,7 @@ pub mod aggr_fn; pub mod batch; pub mod batch_handler; -mod builder; +pub mod builder; pub mod executor; pub mod expr; pub mod handler; diff --git a/src/coprocessor/dag/rpn_expr/mod.rs b/src/coprocessor/dag/rpn_expr/mod.rs index 516aab2d05f..534b121a04d 100644 --- a/src/coprocessor/dag/rpn_expr/mod.rs +++ b/src/coprocessor/dag/rpn_expr/mod.rs @@ -1,8 +1,8 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. #[macro_use] -mod function; -mod types; +pub mod function; +pub mod types; pub use self::function::RpnFunction; pub use self::types::{RpnExpression, RpnExpressionBuilder}; diff --git a/src/coprocessor/dag/rpn_expr/types/expr_builder.rs b/src/coprocessor/dag/rpn_expr/types/expr_builder.rs index 1ee66a57aae..47e1baabf3c 100644 --- a/src/coprocessor/dag/rpn_expr/types/expr_builder.rs +++ b/src/coprocessor/dag/rpn_expr/types/expr_builder.rs @@ -17,6 +17,34 @@ use crate::coprocessor::{Error, Result}; pub struct RpnExpressionBuilder(Vec); impl RpnExpressionBuilder { + /// Checks whether the given expression definition tree is supported. + pub fn check_expr_tree_supported(c: &Expr) -> Result<()> { + EvalType::try_from(c.get_field_type().tp()).map_err(|e| Error::Other(box_err!(e)))?; + + match c.get_tp() { + ExprType::ScalarFunc => { + let sig = c.get_sig(); + super::super::map_pb_sig_to_rpn_func(sig)?; + for n in c.get_children() { + RpnExpressionBuilder::check_expr_tree_supported(n)?; + } + } + ExprType::Null => {} + ExprType::Int64 => {} + ExprType::Uint64 => {} + ExprType::String | ExprType::Bytes => {} + ExprType::Float32 | ExprType::Float64 => {} + ExprType::MysqlTime => {} + ExprType::MysqlDuration => {} + ExprType::MysqlDecimal => {} + ExprType::MysqlJson => {} + ExprType::ColumnRef => {} + _ => return Err(box_err!("Unsupported expression type {:?}", c.get_tp())), + } + + Ok(()) + } + /// Builds the RPN expression node list from an expression definition tree. pub fn build_from_expr_tree( tree_node: Expr, diff --git a/tests/integrations/coprocessor/test_select.rs b/tests/integrations/coprocessor/test_select.rs index 2b1a681abd8..390d0ba00a4 100644 --- a/tests/integrations/coprocessor/test_select.rs +++ b/tests/integrations/coprocessor/test_select.rs @@ -1159,6 +1159,8 @@ fn test_index_aggr_extre() { #[test] fn test_where() { + use cop_datatype::{FieldTypeAccessor, FieldTypeTp}; + let data = vec![ (1, Some("name:0"), 2), (2, Some("name:4"), 3), @@ -1174,18 +1176,33 @@ fn test_where() { col.set_tp(ExprType::ColumnRef); let count_offset = offset_for_column(&cols, product["count"].id); col.mut_val().encode_i64(count_offset).unwrap(); + col.mut_field_type() + .as_mut_accessor() + .set_tp(FieldTypeTp::LongLong); let mut value = Expr::new(); value.set_tp(ExprType::String); value.set_val(String::from("2").into_bytes()); + value + .mut_field_type() + .as_mut_accessor() + .set_tp(FieldTypeTp::VarString); + let mut right = Expr::new(); right.set_tp(ExprType::ScalarFunc); right.set_sig(ScalarFuncSig::CastStringAsInt); + right + .mut_field_type() + .as_mut_accessor() + .set_tp(FieldTypeTp::LongLong); right.mut_children().push(value); let mut cond = Expr::new(); cond.set_tp(ExprType::ScalarFunc); cond.set_sig(ScalarFuncSig::LTInt); + cond.mut_field_type() + .as_mut_accessor() + .set_tp(FieldTypeTp::LongLong); cond.mut_children().push(col); cond.mut_children().push(right); cond