Skip to content

Commit

Permalink
feat: upgrade to Datafusion 17 (#601)
Browse files Browse the repository at this point in the history
* feat: upgrade to datafusion 17

* fix integration tests

* fix integration test

* make CI happy

* refactor code

* refactor by CR

* chore: remove uesless file
  • Loading branch information
chunshao90 authored Feb 8, 2023
1 parent d6f5e77 commit 47e4518
Show file tree
Hide file tree
Showing 55 changed files with 1,110 additions and 525 deletions.
647 changes: 543 additions & 104 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 8 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ name = "ceresdb-server"
path = "src/bin/ceresdb-server.rs"

[workspace.dependencies]
arrow = { version = "23.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "23.0.0" }
arrow = { version = "31.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "31.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
Expand All @@ -69,6 +69,10 @@ cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
common_util = { path = "common_util" }
datafusion = "17.0.0"
datafusion-expr = "17.0.0"
datafusion-optimizer = "17.0.0"
datafusion-proto = "17.0.0"
df_operator = { path = "df_operator" }
env_logger = "0.6"
ethbloom = "0.13.0"
Expand All @@ -81,7 +85,7 @@ interpreters = { path = "interpreters" }
meta_client = { path = "meta_client" }
object_store = { path = "components/object_store" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "23.0.0" }
parquet = { version = "31.0.0" }
paste = "1.0"
profile = { path = "components/profile" }
prometheus = "0.12"
Expand All @@ -100,6 +104,7 @@ server = { path = "server" }
smallvec = "1.6"
slog = "2.7"
sql = { path = "sql" }
sqlparser = { version = "0.30", features = ["serde"] }
system_catalog = { path = "system_catalog" }
table_engine = { path = "table_engine" }
table_kv = { path = "components/table_kv" }
Expand All @@ -115,18 +120,6 @@ zstd = { version = "0.12", default-features = false }
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "55495dd395d12a1f97f6c98e355290671d107c44"

[workspace.dependencies.datafusion]
git = "https://github.com/CeresDB/arrow-datafusion.git"
rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48"

[workspace.dependencies.datafusion-expr]
git = "https://github.com/CeresDB/arrow-datafusion.git"
rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48"

[workspace.dependencies.datafusion-proto]
git = "https://github.com/CeresDB/arrow-datafusion.git"
rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48"

[dependencies]
analytic_engine = { workspace = true }
catalog = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ bytes = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
ethbloom = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/row_iter/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_util::define_result;
use datafusion::{
common::ToDFSchema,
error::DataFusionError,
logical_expr::expr_fn,
optimizer::utils::conjunction,
physical_expr::{self, execution_props::ExecutionProps},
physical_plan::PhysicalExpr,
};
Expand Down Expand Up @@ -171,7 +171,7 @@ pub fn filter_stream(
input_schema: ArrowSchemaRef,
predicate: &Predicate,
) -> Result<SequencedRecordBatchStream> {
let filter = match expr_fn::combine_filters(predicate.exprs()) {
let filter = match conjunction(predicate.exprs().to_owned()) {
Some(filter) => filter,
None => return Ok(origin_stream),
};
Expand Down
8 changes: 4 additions & 4 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{convert::TryFrom, sync::Arc};

use arrow::{
array::{Array, ArrayData, ArrayRef},
array::{make_array, Array, ArrayData, ArrayRef},
buffer::MutableBuffer,
compute,
record_batch::RecordBatch as ArrowRecordBatch,
Expand Down Expand Up @@ -557,7 +557,7 @@ impl HybridRecordDecoder {
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;

Ok(array_data.into())
Ok(make_array(array_data))
}

/// Like `stretch_variable_length_column`, but array value is fixed-size
Expand Down Expand Up @@ -601,7 +601,7 @@ impl HybridRecordDecoder {
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;

Ok(array_data.into())
Ok(make_array(array_data))
}

/// Decode offset slices into Vec<i32>
Expand Down Expand Up @@ -645,7 +645,7 @@ impl RecordDecoder for HybridRecordDecoder {
// are collapsed by hybrid storage format, to differentiate
// List column in original records
DataType::List(_nested_field) => {
Ok(array_ref.data().child_data()[0].clone().into())
Ok(make_array(array_ref.data().child_data()[0].clone()))
}
_ => {
let datum_kind = DatumKind::from_data_type(data_type).unwrap();
Expand Down
11 changes: 5 additions & 6 deletions analytic_engine/src/sst/parquet/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow::{
UInt64Array,
},
bitmap::Bitmap,
buffer::{Buffer, MutableBuffer},
buffer::MutableBuffer,
datatypes::Schema as ArrowSchema,
record_batch::RecordBatch as ArrowRecordBatch,
util::bit_util,
Expand Down Expand Up @@ -153,7 +153,7 @@ trait VariableSizeArray {
// Returns the length for the element at index i.
fn value_length(&self, index: usize) -> i32;
// Returns a clone of the value data buffer.
fn value_data(&self) -> Buffer;
fn value_data(&self) -> &[u8];
}

macro_rules! impl_offsets {
Expand All @@ -167,7 +167,7 @@ macro_rules! impl_offsets {
self.0.value_length(index)
}

fn value_data(&self) -> Buffer {
fn value_data(&self) -> &[u8] {
self.0.value_data()
}
}
Expand Down Expand Up @@ -404,9 +404,8 @@ impl ListArrayBuilder {
inner_offsets.push(inner_length_so_far);
}

inner_values.extend_from_slice(
&array.value_data().as_slice()[start as usize..end as usize],
);
inner_values
.extend_from_slice(&array.value_data()[start as usize..end as usize]);
}
}
// The data in the arrays belong to the same tsid, so the offsets is the total
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{collections::HashMap, fmt};

use async_trait::async_trait;
use common_types::{row::Row, schema::Schema, time::TimeRange};
use datafusion::logical_plan::{Column, Expr};
use datafusion::common::Column;
use datafusion_expr::Expr;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
Expand Down
3 changes: 1 addition & 2 deletions common_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
# TODO(yingwen): Make sqlparser support a feature
sqlparser = { version = "0.23.0", features = ["serde"] }
sqlparser = { workspace = true }
32 changes: 16 additions & 16 deletions common_types/src/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! Schema of column

use std::{collections::BTreeMap, convert::TryFrom, str::FromStr};
use std::{collections::HashMap, convert::TryFrom, str::FromStr};

use arrow::datatypes::{DataType, Field};
use proto::common as common_pb;
Expand Down Expand Up @@ -281,11 +281,7 @@ impl TryFrom<&Field> for ColumnSchema {
id,
is_tag,
comment,
} = field
.metadata()
.map(decode_arrow_field_meta_data)
.transpose()?
.unwrap_or_default();
} = decode_arrow_field_meta_data(field.metadata())?;
Ok(Self {
id,
name: field.name().clone(),
Expand All @@ -311,14 +307,14 @@ impl From<&ColumnSchema> for Field {
col_schema.data_type.into(),
col_schema.is_nullable,
);
field.set_metadata(Some(metadata));
field.set_metadata(metadata);

field
}
}

fn parse_arrow_field_meta_value<T>(
meta: &BTreeMap<String, String>,
meta: &HashMap<String, String>,
key: ArrowFieldMetaKey,
) -> Result<T>
where
Expand All @@ -333,16 +329,20 @@ where
.context(InvalidArrowFieldMetaValue { key, raw_value })
}

fn decode_arrow_field_meta_data(meta: &BTreeMap<String, String>) -> Result<ArrowFieldMeta> {
Ok(ArrowFieldMeta {
id: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Id)?,
is_tag: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::IsTag)?,
comment: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Comment)?,
})
fn decode_arrow_field_meta_data(meta: &HashMap<String, String>) -> Result<ArrowFieldMeta> {
if meta.is_empty() {
Ok(ArrowFieldMeta::default())
} else {
Ok(ArrowFieldMeta {
id: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Id)?,
is_tag: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::IsTag)?,
comment: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Comment)?,
})
}
}

fn encode_arrow_field_meta_data(col_schema: &ColumnSchema) -> BTreeMap<String, String> {
let mut meta = BTreeMap::new();
fn encode_arrow_field_meta_data(col_schema: &ColumnSchema) -> HashMap<String, String> {
let mut meta = HashMap::new();

meta.insert(ArrowFieldMetaKey::Id.to_string(), col_schema.id.to_string());
meta.insert(
Expand Down
24 changes: 17 additions & 7 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,15 @@ impl TryFrom<&SqlDataType> for DatumKind {
fn try_from(sql_type: &SqlDataType) -> Result<Self> {
match sql_type {
// TODO(yingwen): Consider timezone
SqlDataType::Timestamp => Ok(Self::Timestamp),
SqlDataType::Timestamp(_, _) => Ok(Self::Timestamp),
SqlDataType::Real | SqlDataType::Float(_) => Ok(Self::Float),
SqlDataType::Double => Ok(Self::Double),
SqlDataType::Boolean => Ok(Self::Boolean),
SqlDataType::BigInt(_) => Ok(Self::Int64),
SqlDataType::Int(_) => Ok(Self::Int32),
SqlDataType::SmallInt(_) => Ok(Self::Int16),
SqlDataType::String => Ok(Self::String),
SqlDataType::Custom(objects) if objects.0.len() == 1 => {
SqlDataType::Custom(objects, _) if objects.0.len() == 1 => {
match objects.0[0].value.as_str() {
"UINT64" | "uint64" => Ok(Self::UInt64),
"UINT32" | "uint32" => Ok(Self::UInt32),
Expand Down Expand Up @@ -538,7 +538,7 @@ impl Datum {
pub fn display_string(&self) -> String {
match self {
Datum::Null => "null".to_string(),
Datum::Timestamp(v) => Local.timestamp_millis(v.as_i64()).to_rfc3339(),
Datum::Timestamp(v) => Local.timestamp_millis_opt(v.as_i64()).unwrap().to_rfc3339(),
Datum::Double(v) => v.to_string(),
Datum::Float(v) => v.to_string(),
Datum::Varbinary(v) => format!("{:?}", v),
Expand Down Expand Up @@ -887,7 +887,9 @@ pub mod arrow_convert {
ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v
.as_ref()
.map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))),
ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => v
ScalarValue::Binary(v)
| ScalarValue::FixedSizeBinary(_, v)
| ScalarValue::LargeBinary(v) => v
.as_ref()
.map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))),
ScalarValue::TimestampMillisecond(v, _) => {
Expand All @@ -896,7 +898,10 @@ pub mod arrow_convert {
ScalarValue::List(_, _)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::Time64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_)
| ScalarValue::TimestampSecond(_, _)
| ScalarValue::TimestampMicrosecond(_, _)
| ScalarValue::TimestampNanosecond(_, _)
Expand Down Expand Up @@ -928,7 +933,9 @@ pub mod arrow_convert {
ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => {
v.as_ref().map(|v| DatumView::String(v.as_str()))
}
ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => {
ScalarValue::Binary(v)
| ScalarValue::FixedSizeBinary(_, v)
| ScalarValue::LargeBinary(v) => {
v.as_ref().map(|v| DatumView::Varbinary(v.as_slice()))
}
ScalarValue::TimestampMillisecond(v, _) => {
Expand All @@ -937,7 +944,10 @@ pub mod arrow_convert {
ScalarValue::List(_, _)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::Time64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_)
| ScalarValue::TimestampSecond(_, _)
| ScalarValue::TimestampMicrosecond(_, _)
| ScalarValue::TimestampNanosecond(_, _)
Expand Down
4 changes: 2 additions & 2 deletions components/arrow_ext/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub fn make_string_from_decimal<T: DecimalType>(
) -> Result<String> {
let array = column
.as_any()
.downcast_ref::<array::DecimalArray<T>>()
.downcast_ref::<array::PrimitiveArray<T>>()
.unwrap();

let formatted_decimal = array.value_as_string(row);
Expand Down Expand Up @@ -389,7 +389,7 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result<Str

let mut s = String::new();
s.push('{');
let mut kv_iter = st.columns().into_iter().zip(st.column_names().into_iter());
let mut kv_iter = st.columns().iter().zip(st.column_names().into_iter());
if let Some((col, name)) = kv_iter.next() {
append_struct_field_string(&mut s, name, col, row)?;
}
Expand Down
2 changes: 1 addition & 1 deletion components/message_queue/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::Message;

pub fn generate_test_data(cnt: usize) -> Vec<Message> {
let mut messages = Vec::with_capacity(cnt);
let base_ts = Utc.timestamp_millis(1337);
let base_ts = Utc.timestamp_millis_opt(1337).unwrap();
for i in 0..cnt {
let key = format!("test_key_{}", i);
let val = format!("test_val_{}", i);
Expand Down
2 changes: 1 addition & 1 deletion components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }
upstream = { package = "object_store", version = "0.5.1" }
upstream = { package = "object_store", version = "0.5.3" }

[dev-dependencies]
tempfile = { workspace = true }
6 changes: 3 additions & 3 deletions components/parquet_ext/src/prune/equal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow::datatypes::SchemaRef;
use datafusion::{logical_plan::Column, scalar::ScalarValue};
use datafusion_expr::{Expr, Operator};
use datafusion::{common::Column, scalar::ScalarValue};
use datafusion_expr::{self, Expr, Operator};

const MAX_ELEMS_IN_LIST_FOR_FILTER: usize = 100;

Expand Down Expand Up @@ -149,7 +149,7 @@ fn normalize_predicate_expression(expr: &Expr) -> NormalizedExpr {
let unhandled = NormalizedExpr::True;

match expr {
Expr::BinaryExpr { left, op, right } => match op {
Expr::BinaryExpr(datafusion_expr::BinaryExpr { left, op, right }) => match op {
Operator::And => {
let left = normalize_predicate_expression(left);
let right = normalize_predicate_expression(right);
Expand Down
6 changes: 3 additions & 3 deletions components/parquet_ext/src/prune/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@ mod test {
let testcases = vec![
// (expr, min, max, schema, expected)
(
col("a").eq(lit(5i32)), // a == 5
col("a").eq(lit(5i64)), // a == 5
10,
20,
vec![("a", ArrowDataType::Int32)],
vec![("a", ArrowDataType::Int64)],
vec![],
),
(
Expand All @@ -273,7 +273,7 @@ mod test {
col("a").in_list(vec![lit(17i64), lit(100i64)], false), // a in (17, 100)
101,
200,
vec![("a", ArrowDataType::Int32)],
vec![("a", ArrowDataType::Int64)],
vec![],
),
];
Expand Down
Loading

0 comments on commit 47e4518

Please sign in to comment.