Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Upgrade to Datafusion 17 #601

Merged
merged 9 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
666 changes: 558 additions & 108 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 8 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ name = "ceresdb-server"
path = "src/bin/ceresdb-server.rs"

[workspace.dependencies]
arrow = { version = "23.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "23.0.0" }
arrow = { version = "31.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "31.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
Expand All @@ -69,6 +69,10 @@ cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
common_util = { path = "common_util" }
datafusion = "17.0.0"
datafusion-expr = "17.0.0"
datafusion-optimizer = "17.0.0"
datafusion-proto = "17.0.0"
df_operator = { path = "df_operator" }
env_logger = "0.6"
ethbloom = "0.13.0"
Expand All @@ -81,7 +85,7 @@ interpreters = { path = "interpreters" }
meta_client = { path = "meta_client" }
object_store = { path = "components/object_store" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "23.0.0" }
parquet = { version = "31.0.0" }
paste = "1.0"
profile = { path = "components/profile" }
prometheus = "0.12"
Expand All @@ -100,6 +104,7 @@ server = { path = "server" }
smallvec = "1.6"
slog = "2.7"
sql = { path = "sql" }
sqlparser = { version = "0.30", features = ["serde"] }
system_catalog = { path = "system_catalog" }
table_engine = { path = "table_engine" }
table_kv = { path = "components/table_kv" }
Expand All @@ -115,18 +120,6 @@ zstd = { version = "0.12", default-features = false }
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "9430fe7d321955878ca06c431b115d73640f5d8f"

[workspace.dependencies.datafusion]
git = "https://github.com/CeresDB/arrow-datafusion.git"
rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48"

[workspace.dependencies.datafusion-expr]
git = "https://github.com/CeresDB/arrow-datafusion.git"
rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48"

[workspace.dependencies.datafusion-proto]
git = "https://github.com/CeresDB/arrow-datafusion.git"
rev = "d84ea9c79c9e83ff0b4dadf8880a4983af59ef48"

[dependencies]
analytic_engine = { workspace = true }
catalog = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ bytes = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
ethbloom = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/row_iter/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_util::define_result;
use datafusion::{
common::ToDFSchema,
error::DataFusionError,
logical_expr::expr_fn,
optimizer::utils::conjunction,
physical_expr::{self, execution_props::ExecutionProps},
physical_plan::PhysicalExpr,
};
Expand Down Expand Up @@ -171,7 +171,7 @@ pub fn filter_stream(
input_schema: ArrowSchemaRef,
predicate: &Predicate,
) -> Result<SequencedRecordBatchStream> {
let filter = match expr_fn::combine_filters(predicate.exprs()) {
let filter = match conjunction(predicate.exprs().to_owned()) {
Some(filter) => filter,
None => return Ok(origin_stream),
};
Expand Down
8 changes: 4 additions & 4 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{convert::TryFrom, sync::Arc};

use arrow::{
array::{Array, ArrayData, ArrayRef},
array::{make_array, Array, ArrayData, ArrayRef},
buffer::MutableBuffer,
compute,
record_batch::RecordBatch as ArrowRecordBatch,
Expand Down Expand Up @@ -557,7 +557,7 @@ impl HybridRecordDecoder {
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;

Ok(array_data.into())
Ok(make_array(array_data))
}

/// Like `stretch_variable_length_column`, but array value is fixed-size
Expand Down Expand Up @@ -601,7 +601,7 @@ impl HybridRecordDecoder {
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;

Ok(array_data.into())
Ok(make_array(array_data))
}

/// Decode offset slices into Vec<i32>
Expand Down Expand Up @@ -645,7 +645,7 @@ impl RecordDecoder for HybridRecordDecoder {
// are collapsed by hybrid storage format, to differentiate
// List column in original records
DataType::List(_nested_field) => {
Ok(array_ref.data().child_data()[0].clone().into())
Ok(make_array(array_ref.data().child_data()[0].clone()))
}
_ => {
let datum_kind = DatumKind::from_data_type(data_type).unwrap();
Expand Down
11 changes: 5 additions & 6 deletions analytic_engine/src/sst/parquet/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow::{
UInt64Array,
},
bitmap::Bitmap,
buffer::{Buffer, MutableBuffer},
buffer::MutableBuffer,
datatypes::Schema as ArrowSchema,
record_batch::RecordBatch as ArrowRecordBatch,
util::bit_util,
Expand Down Expand Up @@ -153,7 +153,7 @@ trait VariableSizeArray {
// Returns the length for the element at index i.
fn value_length(&self, index: usize) -> i32;
// Returns a clone of the value data buffer.
fn value_data(&self) -> Buffer;
fn value_data(&self) -> &[u8];
}

macro_rules! impl_offsets {
Expand All @@ -167,7 +167,7 @@ macro_rules! impl_offsets {
self.0.value_length(index)
}

fn value_data(&self) -> Buffer {
fn value_data(&self) -> &[u8] {
self.0.value_data()
}
}
Expand Down Expand Up @@ -404,9 +404,8 @@ impl ListArrayBuilder {
inner_offsets.push(inner_length_so_far);
}

inner_values.extend_from_slice(
&array.value_data().as_slice()[start as usize..end as usize],
);
inner_values
.extend_from_slice(&array.value_data()[start as usize..end as usize]);
}
}
// The data in the arrays belong to the same tsid, so the offsets is the total
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{collections::HashMap, fmt};

use async_trait::async_trait;
use common_types::{row::Row, schema::Schema, time::TimeRange};
use datafusion::logical_plan::{Column, Expr};
use datafusion::common::Column;
use datafusion_expr::Expr;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
Expand Down
3 changes: 1 addition & 2 deletions common_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
# TODO(yingwen): Make sqlparser support a feature
sqlparser = { version = "0.23.0", features = ["serde"] }
sqlparser = { workspace = true }
32 changes: 16 additions & 16 deletions common_types/src/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! Schema of column

use std::{collections::BTreeMap, convert::TryFrom, str::FromStr};
use std::{collections::HashMap, convert::TryFrom, str::FromStr};

use arrow::datatypes::{DataType, Field};
use proto::common as common_pb;
Expand Down Expand Up @@ -281,11 +281,7 @@ impl TryFrom<&Field> for ColumnSchema {
id,
is_tag,
comment,
} = field
.metadata()
.map(decode_arrow_field_meta_data)
.transpose()?
.unwrap_or_default();
} = decode_arrow_field_meta_data(field.metadata())?;
Ok(Self {
id,
name: field.name().clone(),
Expand All @@ -311,14 +307,14 @@ impl From<&ColumnSchema> for Field {
col_schema.data_type.into(),
col_schema.is_nullable,
);
field.set_metadata(Some(metadata));
field.set_metadata(metadata);

field
}
}

fn parse_arrow_field_meta_value<T>(
meta: &BTreeMap<String, String>,
meta: &HashMap<String, String>,
key: ArrowFieldMetaKey,
) -> Result<T>
where
Expand All @@ -333,16 +329,20 @@ where
.context(InvalidArrowFieldMetaValue { key, raw_value })
}

fn decode_arrow_field_meta_data(meta: &BTreeMap<String, String>) -> Result<ArrowFieldMeta> {
Ok(ArrowFieldMeta {
id: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Id)?,
is_tag: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::IsTag)?,
comment: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Comment)?,
})
fn decode_arrow_field_meta_data(meta: &HashMap<String, String>) -> Result<ArrowFieldMeta> {
if meta.is_empty() {
Ok(ArrowFieldMeta::default())
} else {
Ok(ArrowFieldMeta {
id: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Id)?,
is_tag: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::IsTag)?,
comment: parse_arrow_field_meta_value(meta, ArrowFieldMetaKey::Comment)?,
})
}
}

fn encode_arrow_field_meta_data(col_schema: &ColumnSchema) -> BTreeMap<String, String> {
let mut meta = BTreeMap::new();
fn encode_arrow_field_meta_data(col_schema: &ColumnSchema) -> HashMap<String, String> {
let mut meta = HashMap::new();

meta.insert(ArrowFieldMetaKey::Id.to_string(), col_schema.id.to_string());
meta.insert(
Expand Down
24 changes: 17 additions & 7 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,15 @@ impl TryFrom<&SqlDataType> for DatumKind {
fn try_from(sql_type: &SqlDataType) -> Result<Self> {
match sql_type {
// TODO(yingwen): Consider timezone
SqlDataType::Timestamp => Ok(Self::Timestamp),
SqlDataType::Timestamp(_, _) => Ok(Self::Timestamp),
SqlDataType::Real | SqlDataType::Float(_) => Ok(Self::Float),
SqlDataType::Double => Ok(Self::Double),
SqlDataType::Boolean => Ok(Self::Boolean),
SqlDataType::BigInt(_) => Ok(Self::Int64),
SqlDataType::Int(_) => Ok(Self::Int32),
SqlDataType::SmallInt(_) => Ok(Self::Int16),
SqlDataType::String => Ok(Self::String),
SqlDataType::Custom(objects) if objects.0.len() == 1 => {
SqlDataType::Custom(objects, _) if objects.0.len() == 1 => {
match objects.0[0].value.as_str() {
"UINT64" | "uint64" => Ok(Self::UInt64),
"UINT32" | "uint32" => Ok(Self::UInt32),
Expand Down Expand Up @@ -538,7 +538,7 @@ impl Datum {
pub fn display_string(&self) -> String {
match self {
Datum::Null => "null".to_string(),
Datum::Timestamp(v) => Local.timestamp_millis(v.as_i64()).to_rfc3339(),
Datum::Timestamp(v) => Local.timestamp_millis_opt(v.as_i64()).unwrap().to_rfc3339(),
Datum::Double(v) => v.to_string(),
Datum::Float(v) => v.to_string(),
Datum::Varbinary(v) => format!("{:?}", v),
Expand Down Expand Up @@ -887,7 +887,9 @@ pub mod arrow_convert {
ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => v
.as_ref()
.map(|v| Datum::String(StringBytes::copy_from_str(v.as_str()))),
ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => v
ScalarValue::Binary(v)
| ScalarValue::FixedSizeBinary(_, v)
| ScalarValue::LargeBinary(v) => v
.as_ref()
.map(|v| Datum::Varbinary(Bytes::copy_from_slice(v.as_slice()))),
ScalarValue::TimestampMillisecond(v, _) => {
Expand All @@ -896,7 +898,10 @@ pub mod arrow_convert {
ScalarValue::List(_, _)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::Time64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_)
| ScalarValue::TimestampSecond(_, _)
| ScalarValue::TimestampMicrosecond(_, _)
| ScalarValue::TimestampNanosecond(_, _)
Expand Down Expand Up @@ -928,7 +933,9 @@ pub mod arrow_convert {
ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => {
v.as_ref().map(|v| DatumView::String(v.as_str()))
}
ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => {
ScalarValue::Binary(v)
| ScalarValue::FixedSizeBinary(_, v)
| ScalarValue::LargeBinary(v) => {
v.as_ref().map(|v| DatumView::Varbinary(v.as_slice()))
}
ScalarValue::TimestampMillisecond(v, _) => {
Expand All @@ -937,7 +944,10 @@ pub mod arrow_convert {
ScalarValue::List(_, _)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::Time64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_)
| ScalarValue::TimestampSecond(_, _)
| ScalarValue::TimestampMicrosecond(_, _)
| ScalarValue::TimestampNanosecond(_, _)
Expand Down
4 changes: 2 additions & 2 deletions components/arrow_ext/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub fn make_string_from_decimal<T: DecimalType>(
) -> Result<String> {
let array = column
.as_any()
.downcast_ref::<array::DecimalArray<T>>()
.downcast_ref::<array::PrimitiveArray<T>>()
.unwrap();

let formatted_decimal = array.value_as_string(row);
Expand Down Expand Up @@ -389,7 +389,7 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result<Str

let mut s = String::new();
s.push('{');
let mut kv_iter = st.columns().into_iter().zip(st.column_names().into_iter());
let mut kv_iter = st.columns().iter().zip(st.column_names().into_iter());
if let Some((col, name)) = kv_iter.next() {
append_struct_field_string(&mut s, name, col, row)?;
}
Expand Down
2 changes: 1 addition & 1 deletion components/message_queue/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::Message;

pub fn generate_test_data(cnt: usize) -> Vec<Message> {
let mut messages = Vec::with_capacity(cnt);
let base_ts = Utc.timestamp_millis(1337);
let base_ts = Utc.timestamp_millis_opt(1337).unwrap();
for i in 0..cnt {
let key = format!("test_key_{}", i);
let val = format!("test_val_{}", i);
Expand Down
2 changes: 1 addition & 1 deletion components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }
upstream = { package = "object_store", version = "0.5.1" }
upstream = { package = "object_store", version = "0.5.3" }

[dev-dependencies]
tempfile = { workspace = true }
6 changes: 3 additions & 3 deletions components/parquet_ext/src/prune/equal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use arrow::datatypes::SchemaRef;
use datafusion::{logical_plan::Column, scalar::ScalarValue};
use datafusion_expr::{Expr, Operator};
use datafusion::{common::Column, scalar::ScalarValue};
use datafusion_expr::{self, Expr, Operator};

const MAX_ELEMS_IN_LIST_FOR_FILTER: usize = 100;

Expand Down Expand Up @@ -149,7 +149,7 @@ fn normalize_predicate_expression(expr: &Expr) -> NormalizedExpr {
let unhandled = NormalizedExpr::True;

match expr {
Expr::BinaryExpr { left, op, right } => match op {
Expr::BinaryExpr(datafusion_expr::BinaryExpr { left, op, right }) => match op {
Operator::And => {
let left = normalize_predicate_expression(left);
let right = normalize_predicate_expression(right);
Expand Down
6 changes: 3 additions & 3 deletions components/parquet_ext/src/prune/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@ mod test {
let testcases = vec![
// (expr, min, max, schema, expected)
(
col("a").eq(lit(5i32)), // a == 5
col("a").eq(lit(5i64)), // a == 5
10,
20,
vec![("a", ArrowDataType::Int32)],
vec![("a", ArrowDataType::Int64)],
vec![],
),
(
Expand All @@ -273,7 +273,7 @@ mod test {
col("a").in_list(vec![lit(17i64), lit(100i64)], false), // a in (17, 100)
101,
200,
vec![("a", ArrowDataType::Int32)],
vec![("a", ArrowDataType::Int64)],
vec![],
),
];
Expand Down
Loading