Skip to content

Commit

Permalink
feat: upgrade to Datafusion 17 (apache#601)
Browse files Browse the repository at this point in the history
* feat: upgrade to datafusion 17

* fix integration tests

* fix integration test

* make CI happy

* refactor code

* refactor by CR

* chore: remove uesless file
  • Loading branch information
chunshao90 authored Feb 8, 2023
1 parent 7fdaa4e commit b9e4992
Show file tree
Hide file tree
Showing 48 changed files with 1,095 additions and 506 deletions.
647 changes: 543 additions & 104 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions analytic_engine/src/row_iter/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_util::define_result;
use datafusion::{
common::ToDFSchema,
error::DataFusionError,
logical_expr::expr_fn,
optimizer::utils::conjunction,
physical_expr::{self, execution_props::ExecutionProps},
physical_plan::PhysicalExpr,
};
Expand Down Expand Up @@ -171,7 +171,7 @@ pub fn filter_stream(
input_schema: ArrowSchemaRef,
predicate: &Predicate,
) -> Result<SequencedRecordBatchStream> {
let filter = match expr_fn::combine_filters(predicate.exprs()) {
let filter = match conjunction(predicate.exprs().to_owned()) {
Some(filter) => filter,
None => return Ok(origin_stream),
};
Expand Down
8 changes: 4 additions & 4 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{convert::TryFrom, sync::Arc};

use arrow::{
array::{Array, ArrayData, ArrayRef},
array::{make_array, Array, ArrayData, ArrayRef},
buffer::MutableBuffer,
compute,
record_batch::RecordBatch as ArrowRecordBatch,
Expand Down Expand Up @@ -557,7 +557,7 @@ impl HybridRecordDecoder {
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;

Ok(array_data.into())
Ok(make_array(array_data))
}

/// Like `stretch_variable_length_column`, but array value is fixed-size
Expand Down Expand Up @@ -601,7 +601,7 @@ impl HybridRecordDecoder {
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;

Ok(array_data.into())
Ok(make_array(array_data))
}

/// Decode offset slices into Vec<i32>
Expand Down Expand Up @@ -645,7 +645,7 @@ impl RecordDecoder for HybridRecordDecoder {
// are collapsed by hybrid storage format, to differentiate
// List column in original records
DataType::List(_nested_field) => {
Ok(array_ref.data().child_data()[0].clone().into())
Ok(make_array(array_ref.data().child_data()[0].clone()))
}
_ => {
let datum_kind = DatumKind::from_data_type(data_type).unwrap();
Expand Down
11 changes: 5 additions & 6 deletions analytic_engine/src/sst/parquet/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow::{
UInt64Array,
},
bitmap::Bitmap,
buffer::{Buffer, MutableBuffer},
buffer::MutableBuffer,
datatypes::Schema as ArrowSchema,
record_batch::RecordBatch as ArrowRecordBatch,
util::bit_util,
Expand Down Expand Up @@ -153,7 +153,7 @@ trait VariableSizeArray {
// Returns the length for the element at index i.
fn value_length(&self, index: usize) -> i32;
// Returns a clone of the value data buffer.
fn value_data(&self) -> Buffer;
fn value_data(&self) -> &[u8];
}

macro_rules! impl_offsets {
Expand All @@ -167,7 +167,7 @@ macro_rules! impl_offsets {
self.0.value_length(index)
}

fn value_data(&self) -> Buffer {
fn value_data(&self) -> &[u8] {
self.0.value_data()
}
}
Expand Down Expand Up @@ -404,9 +404,8 @@ impl ListArrayBuilder {
inner_offsets.push(inner_length_so_far);
}

inner_values.extend_from_slice(
&array.value_data().as_slice()[start as usize..end as usize],
);
inner_values
.extend_from_slice(&array.value_data()[start as usize..end as usize]);
}
}
// The data in the arrays belong to the same tsid, so the offsets is the total
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{collections::HashMap, fmt};

use async_trait::async_trait;
use common_types::{row::Row, schema::Schema, time::TimeRange};
use datafusion::logical_plan::{Column, Expr};
use datafusion::common::Column;
use datafusion_expr::Expr;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
Expand Down
32 changes: 16 additions & 16 deletions common_types/src/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! Schema of column

use std::{collections::BTreeMap, convert::TryFrom, str::FromStr};
use std::{collections::HashMap, convert::TryFrom, str::FromStr};

use arrow::datatypes::{DataType, Field};
use proto::common as common_pb;
Expand Down Expand Up @@ -281,11 +281,7 @@ impl TryFrom<&Field> for ColumnSchema {
id,
is_tag,
comment,
} = field
.metadata()
.map(decode_arrow_field_meta_data)
.transpose()?
.unwrap_or_default();
} = decode_arrow_field_meta_data(field.metadata())?;
Ok(Self {
id,
name: field.name().clone(),
Expand All @@ -311,14 +307,14 @@ impl From<&ColumnSchema> for Field {
col_schema.data_type.into(),
col_schema.is_nullable,
);
field.set_metadata(Some(metadata));
field.set_metadata(metadata);

field
}
}

fn parse_arrow_field_meta_value<T>(
meta: &BTreeMap<String, String>,
meta: &HashMap<String, String>,
key: ArrowFieldMetaKey,
) -> Result<T>
where
Expand All @@ -333,16 +329,20 @@ where
.context(InvalidArrowFieldMetaValue { key, raw_value })
}

fn decode_arrow_field_meta_data(meta: &BTreeMap<String, String>) -> Result<ArrowFieldMeta> {
Ok(ArrowFieldMeta {
id: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Id)?,
is_tag: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::IsTag)?,
comment: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Comment)?,
})
fn decode_arrow_field_meta_data(meta: &HashMap<String, String>) -> Result<ArrowFieldMeta> {
if meta.is_empty() {
Ok(ArrowFieldMeta::default())
} else {
Ok(ArrowFieldMeta {
id: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Id)?,
is_tag: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::IsTag)?,
comment: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Comment)?,
})
}
}

fn encode_arrow_field_meta_data(col_schema: &ColumnSchema) -> BTreeMap<String, String> {
let mut meta = BTreeMap::new();
fn encode_arrow_field_meta_data(col_schema: &ColumnSchema) -> HashMap<String, String> {
let mut meta = HashMap::new();

meta.insert(ArrowFieldMetaKey::Id.to_string(), col_schema.id.to_string());
meta.insert(
Expand Down
24 changes: 17 additions & 7 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,15 @@ impl TryFrom<&SqlDataType> for DatumKind {
fn try_from(sql_type: &SqlDataType) -> Result<Self> {
match sql_type {
// TODO(yingwen): Consider timezone
SqlDataType::Timestamp => Ok(Self::Timestamp),
SqlDataType::Timestamp(_, _) => Ok(Self::Timestamp),
SqlDataType::Real | SqlDataType::Float(_) => Ok(Self::Float),
SqlDataType::Double => Ok(Self::Double),
SqlDataType::Boolean => Ok(Self::Boolean),
SqlDataType::BigInt(_) => Ok(Self::Int64),
SqlDataType::Int(_) => Ok(Self::Int32),
SqlDataType::SmallInt(_) => Ok(Self::Int16),
SqlDataType::String => Ok(Self::String),
SqlDataType::Custom(objects) if objects.0.len() == 1 => {
SqlDataType::Custom(objects, _) if objects.0.len() == 1 => {
match objects.0[0].value.as_str() {
"UINT64" | "uint64" => Ok(Self::UInt64),
"UINT32" | "uint32" => Ok(Self::UInt32),
Expand Down Expand Up @@ -538,7 +538,7 @@ impl Datum {
pub fn display_string(&self) -> String {
match self {
Datum::Null => "null".to_string(),
Datum::Timestamp(v) => Local.timestamp_millis(v.as_i64()).to_rfc3339(),
Datum::Timestamp(v) => Local.timestamp_millis_opt(v.as_i64()).unwrap().to_rfc3339(),
Datum::Double(v) => v.to_string(),
Datum::Float(v) => v.to_string(),
Datum::Varbinary(v) => format!("{:?}", v),
Expand Down Expand Up @@ -887,7 +887,9 @@ pub mod arrow_convert {
ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v
.as_ref()
.map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))),
ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => v
ScalarValue::Binary(v)
| ScalarValue::FixedSizeBinary(_, v)
| ScalarValue::LargeBinary(v) => v
.as_ref()
.map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))),
ScalarValue::TimestampMillisecond(v, _) => {
Expand All @@ -896,7 +898,10 @@ pub mod arrow_convert {
ScalarValue::List(_, _)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::Time64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_)
| ScalarValue::TimestampSecond(_, _)
| ScalarValue::TimestampMicrosecond(_, _)
| ScalarValue::TimestampNanosecond(_, _)
Expand Down Expand Up @@ -928,7 +933,9 @@ pub mod arrow_convert {
ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => {
v.as_ref().map(|v| DatumView::String(v.as_str()))
}
ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => {
ScalarValue::Binary(v)
| ScalarValue::FixedSizeBinary(_, v)
| ScalarValue::LargeBinary(v) => {
v.as_ref().map(|v| DatumView::Varbinary(v.as_slice()))
}
ScalarValue::TimestampMillisecond(v, _) => {
Expand All @@ -937,7 +944,10 @@ pub mod arrow_convert {
ScalarValue::List(_, _)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::Time64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_)
| ScalarValue::TimestampSecond(_, _)
| ScalarValue::TimestampMicrosecond(_, _)
| ScalarValue::TimestampNanosecond(_, _)
Expand Down
4 changes: 2 additions & 2 deletions components/arrow_ext/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub fn make_string_from_decimal<T: DecimalType>(
) -> Result<String> {
let array = column
.as_any()
.downcast_ref::<array::DecimalArray<T>>()
.downcast_ref::<array::PrimitiveArray<T>>()
.unwrap();

let formatted_decimal = array.value_as_string(row);
Expand Down Expand Up @@ -389,7 +389,7 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result<Str

let mut s = String::new();
s.push('{');
let mut kv_iter = st.columns().into_iter().zip(st.column_names().into_iter());
let mut kv_iter = st.columns().iter().zip(st.column_names().into_iter());
if let Some((col, name)) = kv_iter.next() {
append_struct_field_string(&mut s, name, col, row)?;
}
Expand Down
2 changes: 1 addition & 1 deletion components/message_queue/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::Message;

pub fn generate_test_data(cnt: usize) -> Vec<Message> {
let mut messages = Vec::with_capacity(cnt);
let base_ts = Utc.timestamp_millis(1337);
let base_ts = Utc.timestamp_millis_opt(1337).unwrap();
for i in 0..cnt {
let key = format!("test_key_{}", i);
let val = format!("test_val_{}", i);
Expand Down
6 changes: 3 additions & 3 deletions components/parquet_ext/src/prune/equal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow::datatypes::SchemaRef;
use datafusion::{logical_plan::Column, scalar::ScalarValue};
use datafusion_expr::{Expr, Operator};
use datafusion::{common::Column, scalar::ScalarValue};
use datafusion_expr::{self, Expr, Operator};

const MAX_ELEMS_IN_LIST_FOR_FILTER: usize = 100;

Expand Down Expand Up @@ -149,7 +149,7 @@ fn normalize_predicate_expression(expr: &Expr) -> NormalizedExpr {
let unhandled = NormalizedExpr::True;

match expr {
Expr::BinaryExpr { left, op, right } => match op {
Expr::BinaryExpr(datafusion_expr::BinaryExpr { left, op, right }) => match op {
Operator::And => {
let left = normalize_predicate_expression(left);
let right = normalize_predicate_expression(right);
Expand Down
6 changes: 3 additions & 3 deletions components/parquet_ext/src/prune/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@ mod test {
let testcases = vec![
// (expr, min, max, schema, expected)
(
col("a").eq(lit(5i32)), // a == 5
col("a").eq(lit(5i64)), // a == 5
10,
20,
vec![("a", ArrowDataType::Int32)],
vec![("a", ArrowDataType::Int64)],
vec![],
),
(
Expand All @@ -273,7 +273,7 @@ mod test {
col("a").in_list(vec![lit(17i64), lit(100i64)], false), // a in (17, 100)
101,
200,
vec![("a", ArrowDataType::Int32)],
vec![("a", ArrowDataType::Int64)],
vec![],
),
];
Expand Down
14 changes: 9 additions & 5 deletions df_operator/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use datafusion::{
physical_plan::Accumulator as DfAccumulator,
scalar::ScalarValue as DfScalarValue,
};
use datafusion_expr::AggregateState as DfAggregateState;
use snafu::Snafu;

use crate::functions::{ScalarValue, ScalarValueRef};
Expand All @@ -35,8 +34,9 @@ define_result!(Error);
pub struct State(Vec<DfScalarValue>);

impl State {
fn into_df_aggregate_states(self) -> Vec<DfAggregateState> {
self.0.into_iter().map(DfAggregateState::Scalar).collect()
/// Convert to a set of ScalarValues
fn into_state(self) -> Vec<DfScalarValue> {
self.0
}
}

Expand Down Expand Up @@ -112,11 +112,11 @@ impl<T> ToDfAccumulator<T> {
}

impl<T: Accumulator> DfAccumulator for ToDfAccumulator<T> {
fn state(&self) -> DfResult<Vec<DfAggregateState>> {
fn state(&self) -> DfResult<Vec<DfScalarValue>> {
let state = self.accumulator.state().map_err(|e| {
DataFusionError::Execution(format!("Accumulator failed to get state, err:{}", e))
})?;
Ok(state.into_df_aggregate_states())
Ok(state.into_state())
}

fn update_batch(&mut self, values: &[DfArrayRef]) -> DfResult<()> {
Expand Down Expand Up @@ -160,4 +160,8 @@ impl<T: Accumulator> DfAccumulator for ToDfAccumulator<T> {

Ok(value.into_df_scalar_value())
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}
Loading

0 comments on commit b9e4992

Please sign in to comment.