Skip to content

Commit

Permalink
update to datafusion-v42/arrow-v53/arrow-java-v16 (#574)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangli20 <zhangli20@kuaishou.com>
  • Loading branch information
richox and zhangli20 authored Oct 9, 2024
1 parent ed846af commit 4027465
Show file tree
Hide file tree
Showing 56 changed files with 1,296 additions and 806 deletions.
608 changes: 368 additions & 240 deletions Cargo.lock

Large diffs are not rendered by default.

54 changes: 27 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,42 +51,42 @@ datafusion-ext-exprs = { path = "./native-engine/datafusion-ext-exprs" }
datafusion-ext-functions = { path = "./native-engine/datafusion-ext-functions" }
datafusion-ext-plans = { path = "./native-engine/datafusion-ext-plans" }

# datafusion: branch=v36-blaze
datafusion = { version = "36.0.0" }
# datafusion: branch=v42-blaze
datafusion = { version = "42.0.0" }

orc-rust = { version = "0.3.1" }

# arrow: branch=v50-blaze
arrow = { version = "50.0.0", features = ["ffi"]}
arrow-schema = { version = "50.0.0", features = ["serde"] }
parquet = { version = "50.0.0" }
# arrow: branch=v53-blaze
arrow = { version = "53.0.0", features = ["ffi"]}
arrow-schema = { version = "53.0.0", features = ["serde"] }
parquet = { version = "53.0.0" }

# serde_json: branch=v1.0.96-blaze
serde_json = { version = "1.0.96" }

[patch.crates-io]
# datafusion: branch=v36-blaze
datafusion = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
datafusion-common = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
datafusion-expr = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
datafusion-execution = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
datafusion-optimizer = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
datafusion-physical-expr = { git = "https://github.com/harveyyue/datafusion.git", rev = "d33877f8fbc7c57de946dc6081b2b357eedd0df9"}
orc-rust = { git = "https://github.com/harveyyue/datafusion-orc.git", rev = "f0ff4bcffa762b62e8c57ed4c2f6e1a9547b4abb"}
# datafusion: branch=v42-blaze
datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "dc799de77"}
orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "c54bfb5"}

# arrow: branch=v50-blaze
arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "7471d70f7ae6edd5d4da82b7d966a8ede720e499"}
# arrow: branch=v53-blaze
arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}

# serde_json: branch=v1.0.96-blaze
serde_json = { git = "https://github.com/blaze-init/json", branch = "v1.0.96-blaze" }
2 changes: 1 addition & 1 deletion native-engine/blaze-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ datafusion-ext-exprs = { workspace = true }
datafusion-ext-functions = { workspace = true }
datafusion-ext-plans = { workspace = true }
log = "0.4.22"
object_store = "0.9.0"
object_store = "0.11.0"
prost = "0.13.3"

[build-dependencies]
Expand Down
3 changes: 1 addition & 2 deletions native-engine/blaze-serde/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ fn main() -> Result<(), String> {
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");

println!("cargo:rerun-if-changed=proto/blaze.proto");
tonic_build::configure()
.compile(&["proto/blaze.proto"], &["proto"])
tonic_build::compile_protos("proto/blaze.proto")
.map_err(|e| format!("protobuf compilation failed: {}", e))
}
2 changes: 1 addition & 1 deletion native-engine/blaze-serde/proto/blaze.proto
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ enum ScalarFunction {
Sqrt=17;
Tan=18;
Trunc=19;
Array=20;
NullIf=20;
RegexpMatch=21;
BitLength=22;
Btrim=23;
Expand Down
166 changes: 87 additions & 79 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ use datafusion::{
physical_plan::FileScanConfig,
},
error::DataFusionError,
execution::context::ExecutionProps,
logical_expr::{BuiltinScalarFunction, ColumnarValue, Operator},
logical_expr::{ColumnarValue, Operator, ScalarUDF, Volatility},
physical_expr::{
expressions::{in_list, LikeExpr, SCAndExpr, SCOrExpr},
functions, ScalarFunctionExpr,
ScalarFunctionExpr,
},
physical_plan::{
expressions as phys_expr,
Expand All @@ -48,6 +47,7 @@ use datafusion::{
union::UnionExec,
ColumnStatistics, ExecutionPlan, Partitioning, PhysicalExpr, Statistics,
},
prelude::create_udf,
};
use datafusion_ext_commons::downcast_any;
use datafusion_ext_exprs::{
Expand Down Expand Up @@ -116,7 +116,7 @@ fn bind(
let new_children = expr_in
.children()
.iter()
.map(|child_expr| bind(child_expr.clone(), input_schema))
.map(|&child_expr| bind(child_expr.clone(), input_schema))
.collect::<Result<Vec<_>, DataFusionError>>()?;
Ok(expr_in.with_new_children(new_children)?)
}
Expand Down Expand Up @@ -804,74 +804,75 @@ impl From<&protobuf::BoundReference> for Column {
}
}

impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
fn from(f: &protobuf::ScalarFunction) -> BuiltinScalarFunction {
impl From<protobuf::ScalarFunction> for Arc<ScalarUDF> {
fn from(f: protobuf::ScalarFunction) -> Self {
use datafusion::functions as f;
use protobuf::ScalarFunction;

match f {
ScalarFunction::Sqrt => Self::Sqrt,
ScalarFunction::Sin => Self::Sin,
ScalarFunction::Cos => Self::Cos,
ScalarFunction::Tan => Self::Tan,
ScalarFunction::Asin => Self::Asin,
ScalarFunction::Acos => Self::Acos,
ScalarFunction::Atan => Self::Atan,
ScalarFunction::Exp => Self::Exp,
ScalarFunction::Log => Self::Log,
ScalarFunction::Ln => Self::Ln,
ScalarFunction::Log10 => Self::Log10,
ScalarFunction::Floor => Self::Floor,
ScalarFunction::Ceil => Self::Ceil,
ScalarFunction::Round => Self::Round,
ScalarFunction::Trunc => Self::Trunc,
ScalarFunction::Abs => Self::Abs,
ScalarFunction::OctetLength => Self::OctetLength,
ScalarFunction::Concat => Self::Concat,
ScalarFunction::Lower => Self::Lower,
ScalarFunction::Upper => Self::Upper,
ScalarFunction::Trim => Self::Trim,
ScalarFunction::Ltrim => Self::Ltrim,
ScalarFunction::Rtrim => Self::Rtrim,
ScalarFunction::ToTimestamp => Self::ToTimestamp,
ScalarFunction::Array => Self::MakeArray,
// ScalarFunction::NullIf => todo!(),
ScalarFunction::DatePart => Self::DatePart,
ScalarFunction::DateTrunc => Self::DateTrunc,
ScalarFunction::Md5 => Self::MD5,
ScalarFunction::Sha224 => Self::SHA224,
ScalarFunction::Sha256 => Self::SHA256,
ScalarFunction::Sha384 => Self::SHA384,
ScalarFunction::Sha512 => Self::SHA512,
ScalarFunction::Digest => Self::Digest,
ScalarFunction::ToTimestampMillis => Self::ToTimestampMillis,
ScalarFunction::Log2 => Self::Log2,
ScalarFunction::Signum => Self::Signum,
ScalarFunction::Ascii => Self::Ascii,
ScalarFunction::BitLength => Self::BitLength,
ScalarFunction::Btrim => Self::Btrim,
ScalarFunction::CharacterLength => Self::CharacterLength,
ScalarFunction::Chr => Self::Chr,
ScalarFunction::ConcatWithSeparator => Self::ConcatWithSeparator,
ScalarFunction::InitCap => Self::InitCap,
ScalarFunction::Left => Self::Left,
ScalarFunction::Lpad => Self::Lpad,
ScalarFunction::Random => Self::Random,
ScalarFunction::RegexpReplace => Self::RegexpReplace,
ScalarFunction::Repeat => Self::Repeat,
ScalarFunction::Replace => Self::Replace,
ScalarFunction::Reverse => Self::Reverse,
ScalarFunction::Right => Self::Right,
ScalarFunction::Rpad => Self::Rpad,
ScalarFunction::SplitPart => Self::SplitPart,
ScalarFunction::StartsWith => Self::StartsWith,
ScalarFunction::Strpos => Self::Strpos,
ScalarFunction::Substr => Self::Substr,
ScalarFunction::ToHex => Self::ToHex,
ScalarFunction::ToTimestampMicros => Self::ToTimestampMicros,
ScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds,
ScalarFunction::Now => Self::Now,
ScalarFunction::Translate => Self::Translate,
ScalarFunction::RegexpMatch => Self::RegexpMatch,
ScalarFunction::Coalesce => Self::Coalesce,
ScalarFunction::Sqrt => f::math::sqrt(),
ScalarFunction::Sin => f::math::sin(),
ScalarFunction::Cos => f::math::cos(),
ScalarFunction::Tan => f::math::tan(),
ScalarFunction::Asin => f::math::asin(),
ScalarFunction::Acos => f::math::acos(),
ScalarFunction::Atan => f::math::atan(),
ScalarFunction::Exp => f::math::exp(),
ScalarFunction::Log => f::math::log(),
ScalarFunction::Ln => f::math::ln(),
ScalarFunction::Log10 => f::math::log10(),
ScalarFunction::Floor => f::math::floor(),
ScalarFunction::Ceil => f::math::ceil(),
ScalarFunction::Round => f::math::round(),
ScalarFunction::Trunc => f::math::trunc(),
ScalarFunction::Abs => f::math::abs(),
ScalarFunction::OctetLength => f::string::octet_length(),
ScalarFunction::Concat => f::string::concat(),
ScalarFunction::Lower => f::string::lower(),
ScalarFunction::Upper => f::string::upper(),
ScalarFunction::Trim => f::string::btrim(),
ScalarFunction::Ltrim => f::string::ltrim(),
ScalarFunction::Rtrim => f::string::rtrim(),
ScalarFunction::ToTimestamp => f::datetime::to_timestamp(),
ScalarFunction::NullIf => f::core::nullif(),
ScalarFunction::DatePart => f::datetime::date_part(),
ScalarFunction::DateTrunc => f::datetime::date_trunc(),
ScalarFunction::Md5 => f::crypto::md5(),
ScalarFunction::Sha224 => f::crypto::sha224(),
ScalarFunction::Sha256 => f::crypto::sha256(),
ScalarFunction::Sha384 => f::crypto::sha384(),
ScalarFunction::Sha512 => f::crypto::sha512(),
ScalarFunction::Digest => f::crypto::digest(),
ScalarFunction::ToTimestampMillis => f::datetime::to_timestamp_millis(),
ScalarFunction::Log2 => f::math::log2(),
ScalarFunction::Signum => f::math::signum(),
ScalarFunction::Ascii => f::string::ascii(),
ScalarFunction::BitLength => f::string::bit_length(),
ScalarFunction::Btrim => f::string::btrim(),
ScalarFunction::CharacterLength => f::unicode::character_length(),
ScalarFunction::Chr => f::string::chr(),
ScalarFunction::ConcatWithSeparator => f::string::concat_ws(),
ScalarFunction::InitCap => f::string::initcap(),
ScalarFunction::Left => f::unicode::left(),
ScalarFunction::Lpad => f::unicode::lpad(),
ScalarFunction::Random => f::math::random(),
ScalarFunction::RegexpReplace => f::regex::regexp_replace(),
ScalarFunction::Repeat => f::string::repeat(),
ScalarFunction::Replace => f::string::replace(),
ScalarFunction::Reverse => f::unicode::reverse(),
ScalarFunction::Right => f::unicode::right(),
ScalarFunction::Rpad => f::unicode::rpad(),
ScalarFunction::SplitPart => f::string::split_part(),
ScalarFunction::StartsWith => f::string::starts_with(),
ScalarFunction::Strpos => f::unicode::strpos(),
ScalarFunction::Substr => f::unicode::substr(),
ScalarFunction::ToHex => f::string::to_hex(),
ScalarFunction::ToTimestampMicros => f::datetime::to_timestamp_micros(),
ScalarFunction::ToTimestampSeconds => f::datetime::to_timestamp_seconds(),
ScalarFunction::Now => f::datetime::now(),
ScalarFunction::Translate => f::unicode::translate(),
ScalarFunction::RegexpMatch => f::regex::regexp_match(),
ScalarFunction::Coalesce => f::core::coalesce(),
ScalarFunction::SparkExtFunctions => {
unreachable!()
}
Expand Down Expand Up @@ -998,20 +999,26 @@ fn try_parse_physical_expr(
.map(|x| try_parse_physical_expr(x, input_schema))
.collect::<Result<Vec<_>, _>>()?;

let execution_props = ExecutionProps::new();
let fun_expr = if scalar_function == protobuf::ScalarFunction::SparkExtFunctions {
datafusion_ext_functions::create_spark_ext_function(&e.name)?
let scalar_udf = if scalar_function == protobuf::ScalarFunction::SparkExtFunctions {
let fun = datafusion_ext_functions::create_spark_ext_function(&e.name)?;
Arc::new(create_udf(
"spark_ext_function",
args.iter()
.map(|e| e.data_type(input_schema))
.collect::<Result<Vec<_>, _>>()?,
Arc::new(convert_required!(e.return_type)?),
Volatility::Volatile,
fun,
))
} else {
functions::create_physical_fun(&(&scalar_function).into(), &execution_props)?
let scalar_udf: Arc<ScalarUDF> = scalar_function.into();
scalar_udf
};

Arc::new(ScalarFunctionExpr::new(
&e.name,
fun_expr,
scalar_udf.name(),
scalar_udf.clone(),
args,
convert_required!(e.return_type)?,
None,
false,
))
}
ExprType::SparkUdfWrapperExpr(e) => Arc::new(SparkUDFWrapperExpr::try_new(
Expand Down Expand Up @@ -1153,6 +1160,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
statistics: None,
extensions: None,
})
}
Expand Down
7 changes: 6 additions & 1 deletion native-engine/blaze-serde/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,11 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::ScalarValue {
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()?;
let scalar_type: DataType = pb_scalar_type.try_into()?;
ScalarValue::List(ScalarValue::new_list(&typechecked_values, &scalar_type))
ScalarValue::List(ScalarValue::new_list(
&typechecked_values,
&scalar_type,
true,
))
}
protobuf::scalar_value::Value::NullValue(v) => {
match v.datatype.as_ref().expect("missing scalar data type") {
Expand Down Expand Up @@ -633,6 +637,7 @@ impl TryInto<ScalarValue> for &protobuf::ScalarListValue {
Ok(ScalarValue::List(ScalarValue::new_list(
&values,
&element_scalar_type,
true,
)))
}
}
Expand Down
2 changes: 1 addition & 1 deletion native-engine/blaze/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn update_spark_metric_node(
)?;

// update children nodes
for (i, child_plan) in execution_plan.children().iter().enumerate() {
for (i, &child_plan) in execution_plan.children().iter().enumerate() {
let child_metric_node = jni_call!(
SparkMetricNode(metric_node).getChild(i as i32) -> JObject
)?;
Expand Down
5 changes: 2 additions & 3 deletions native-engine/datafusion-ext-commons/src/io/batch_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,13 @@ fn write_bits_buffer<W: Write>(

fn read_bits_buffer<R: Read>(input: &mut R, bits_len: usize) -> Result<Buffer> {
let buf = read_bytes_slice(input, (bits_len + 7) / 8)?;
Ok(Buffer::from(buf))
Ok(Buffer::from_vec(buf.into()))
}

fn write_primitive_array<W: Write, PT: ArrowPrimitiveType>(
array: &PrimitiveArray<PT>,
output: &mut W,
) -> Result<()> {
let _item_size = PT::get_byte_width();
let offset = array.offset();
let len = array.len();
let array_data = array.to_data();
Expand Down Expand Up @@ -510,7 +509,7 @@ fn read_bytes_array<R: Read>(
let offsets_buffer: Buffer = offsets_buffer.into();

let data_len = cur_offset as usize;
let data_buffer = Buffer::from(read_bytes_slice(input, data_len)?);
let data_buffer = Buffer::from_vec(read_bytes_slice(input, data_len)?.into());
let array_data = ArrayData::try_new(
data_type,
num_rows,
Expand Down
10 changes: 5 additions & 5 deletions native-engine/datafusion-ext-commons/src/io/scalar_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ pub fn write_scalar<W: Write>(value: &ScalarValue, nullable: bool, output: &mut
write_array(col, output)?;
}
}
ScalarValue::Map(value, _bool) => {
write_scalar(value, nullable, output)?;
ScalarValue::Map(v) => {
write_array(v.as_ref(), output)?;
}
other => df_unimplemented_err!("unsupported scalarValue type: {other}")?,
}
Expand Down Expand Up @@ -186,9 +186,9 @@ pub fn read_scalar<R: Read>(
.collect::<Result<Vec<_>>>()?;
ScalarValue::Struct(Arc::new(StructArray::new(fields.clone(), columns, None)))
}
DataType::Map(field, bool) => {
let map_value = read_scalar(input, field.data_type(), field.is_nullable())?;
ScalarValue::Map(Box::new(map_value), *bool)
DataType::Map(field, _bool) => {
let map = read_array(input, field.data_type(), 1)?.as_map().clone();
ScalarValue::Map(Arc::new(map))
}
other => df_unimplemented_err!("unsupported data type: {other}")?,
})
Expand Down
Loading

0 comments on commit 4027465

Please sign in to comment.