Skip to content

Commit 14176ff

Browse files
authored
Update to arrow-7.0.0 (#1523)
* Update tonic/prost deps * Update to arrow 7.0.0-SNAPSHOT * Update datafusion and tests for arrow changes * fix doc tests * Update avro support * Use released arrow 7.0.0
1 parent e1e7b86 commit 14176ff

File tree

21 files changed

+182
-101
lines changed

21 files changed

+182
-101
lines changed

ballista-examples/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ rust-version = "1.57"
3131
[dependencies]
3232
datafusion = { path = "../datafusion" }
3333
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
34-
prost = "0.8"
35-
tonic = "0.5"
34+
prost = "0.9"
35+
tonic = "0.6"
3636
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
3737
futures = "0.3"
3838
num_cpus = "1.13.0"

ballista/rust/core/Cargo.toml

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,20 @@ async-trait = "0.1.36"
3535
futures = "0.3"
3636
hashbrown = "0.11"
3737
log = "0.4"
38-
prost = "0.8"
38+
prost = "0.9"
3939
serde = {version = "1", features = ["derive"]}
4040
sqlparser = "0.13"
4141
tokio = "1.0"
42-
tonic = "0.5"
42+
tonic = "0.6"
4343
uuid = { version = "0.8", features = ["v4"] }
4444
chrono = { version = "0.4", default-features = false }
4545

46-
# workaround for https://github.com/apache/arrow-datafusion/issues/1498
47-
# should be able to remove when we update arrow-flight
48-
quote = "=1.0.10"
49-
arrow-flight = { version = "6.4.0" }
46+
arrow-flight = { version = "7.0.0" }
5047

5148
datafusion = { path = "../../../datafusion", version = "6.0.0" }
5249

5350
[dev-dependencies]
5451
tempfile = "3"
5552

5653
[build-dependencies]
57-
tonic-build = { version = "0.5" }
54+
tonic-build = { version = "0.6" }

ballista/rust/core/proto/ballista.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,7 @@ enum TimeUnit{
10151015
enum IntervalUnit{
10161016
YearMonth = 0;
10171017
DayTime = 1;
1018+
MonthDayNano = 2;
10181019
}
10191020

10201021
message Decimal{
@@ -1040,11 +1041,18 @@ message Struct{
10401041
repeated Field sub_field_types = 1;
10411042
}
10421043

1044+
enum UnionMode{
1045+
sparse = 0;
1046+
dense = 1;
1047+
}
1048+
10431049
message Union{
10441050
repeated Field union_types = 1;
1051+
UnionMode union_mode = 2;
10451052
}
10461053

10471054

1055+
10481056
message ScalarListValue{
10491057
ScalarType datatype = 1;
10501058
repeated ScalarValue values = 2;

ballista/rust/core/src/client.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Client API for sending requests to executors.
1919
20-
use std::sync::Arc;
20+
use std::sync::{Arc, Mutex};
2121
use std::{collections::HashMap, pin::Pin};
2222
use std::{
2323
convert::{TryFrom, TryInto},
@@ -135,24 +135,28 @@ impl BallistaClient {
135135
}
136136

137137
struct FlightDataStream {
138-
stream: Streaming<FlightData>,
138+
stream: Mutex<Streaming<FlightData>>,
139139
schema: SchemaRef,
140140
}
141141

142142
impl FlightDataStream {
143143
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
144-
Self { stream, schema }
144+
Self {
145+
stream: Mutex::new(stream),
146+
schema,
147+
}
145148
}
146149
}
147150

148151
impl Stream for FlightDataStream {
149152
type Item = ArrowResult<RecordBatch>;
150153

151154
fn poll_next(
152-
mut self: std::pin::Pin<&mut Self>,
155+
self: std::pin::Pin<&mut Self>,
153156
cx: &mut Context<'_>,
154157
) -> Poll<Option<Self::Item>> {
155-
self.stream.poll_next_unpin(cx).map(|x| match x {
158+
let mut stream = self.stream.lock().expect("mutex is bad");
159+
stream.poll_next_unpin(cx).map(|x| match x {
156160
Some(flight_data_chunk_result) => {
157161
let converted_chunk = flight_data_chunk_result
158162
.map_err(|e| ArrowError::from_external_error(Box::new(e)))

ballista/rust/core/src/serde/logical_plan/mod.rs

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod roundtrip_tests {
2424
use super::super::{super::error::Result, protobuf};
2525
use crate::error::BallistaError;
2626
use core::panic;
27+
use datafusion::arrow::datatypes::UnionMode;
2728
use datafusion::logical_plan::Repartition;
2829
use datafusion::{
2930
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
@@ -413,25 +414,31 @@ mod roundtrip_tests {
413414
true,
414415
),
415416
]),
416-
DataType::Union(vec![
417-
Field::new("nullable", DataType::Boolean, false),
418-
Field::new("name", DataType::Utf8, false),
419-
Field::new("datatype", DataType::Binary, false),
420-
]),
421-
DataType::Union(vec![
422-
Field::new("nullable", DataType::Boolean, false),
423-
Field::new("name", DataType::Utf8, false),
424-
Field::new("datatype", DataType::Binary, false),
425-
Field::new(
426-
"nested_struct",
427-
DataType::Struct(vec![
428-
Field::new("nullable", DataType::Boolean, false),
429-
Field::new("name", DataType::Utf8, false),
430-
Field::new("datatype", DataType::Binary, false),
431-
]),
432-
true,
433-
),
434-
]),
417+
DataType::Union(
418+
vec![
419+
Field::new("nullable", DataType::Boolean, false),
420+
Field::new("name", DataType::Utf8, false),
421+
Field::new("datatype", DataType::Binary, false),
422+
],
423+
UnionMode::Dense,
424+
),
425+
DataType::Union(
426+
vec![
427+
Field::new("nullable", DataType::Boolean, false),
428+
Field::new("name", DataType::Utf8, false),
429+
Field::new("datatype", DataType::Binary, false),
430+
Field::new(
431+
"nested_struct",
432+
DataType::Struct(vec![
433+
Field::new("nullable", DataType::Boolean, false),
434+
Field::new("name", DataType::Utf8, false),
435+
Field::new("datatype", DataType::Binary, false),
436+
]),
437+
true,
438+
),
439+
],
440+
UnionMode::Sparse,
441+
),
435442
DataType::Dictionary(
436443
Box::new(DataType::Utf8),
437444
Box::new(DataType::Struct(vec![
@@ -558,25 +565,31 @@ mod roundtrip_tests {
558565
true,
559566
),
560567
]),
561-
DataType::Union(vec![
562-
Field::new("nullable", DataType::Boolean, false),
563-
Field::new("name", DataType::Utf8, false),
564-
Field::new("datatype", DataType::Binary, false),
565-
]),
566-
DataType::Union(vec![
567-
Field::new("nullable", DataType::Boolean, false),
568-
Field::new("name", DataType::Utf8, false),
569-
Field::new("datatype", DataType::Binary, false),
570-
Field::new(
571-
"nested_struct",
572-
DataType::Struct(vec![
573-
Field::new("nullable", DataType::Boolean, false),
574-
Field::new("name", DataType::Utf8, false),
575-
Field::new("datatype", DataType::Binary, false),
576-
]),
577-
true,
578-
),
579-
]),
568+
DataType::Union(
569+
vec![
570+
Field::new("nullable", DataType::Boolean, false),
571+
Field::new("name", DataType::Utf8, false),
572+
Field::new("datatype", DataType::Binary, false),
573+
],
574+
UnionMode::Sparse,
575+
),
576+
DataType::Union(
577+
vec![
578+
Field::new("nullable", DataType::Boolean, false),
579+
Field::new("name", DataType::Utf8, false),
580+
Field::new("datatype", DataType::Binary, false),
581+
Field::new(
582+
"nested_struct",
583+
DataType::Struct(vec![
584+
Field::new("nullable", DataType::Boolean, false),
585+
Field::new("name", DataType::Utf8, false),
586+
Field::new("datatype", DataType::Binary, false),
587+
]),
588+
true,
589+
),
590+
],
591+
UnionMode::Dense,
592+
),
580593
DataType::Dictionary(
581594
Box::new(DataType::Utf8),
582595
Box::new(DataType::Struct(vec![

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
use super::super::proto_error;
2323
use crate::serde::{byte_to_string, protobuf, BallistaError};
2424
use datafusion::arrow::datatypes::{
25-
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
25+
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
2626
};
2727
use datafusion::datasource::file_format::avro::AvroFormat;
2828
use datafusion::datasource::file_format::csv::CsvFormat;
@@ -60,6 +60,7 @@ impl protobuf::IntervalUnit {
6060
match interval_unit {
6161
IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
6262
IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
63+
IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
6364
}
6465
}
6566

@@ -71,6 +72,7 @@ impl protobuf::IntervalUnit {
7172
Some(interval_unit) => Ok(match interval_unit {
7273
protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
7374
protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
75+
protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
7476
}),
7577
None => Err(proto_error(
7678
"Error converting i32 to DateUnit: Passed invalid variant",
@@ -238,12 +240,19 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
238240
.map(|field| field.into())
239241
.collect::<Vec<_>>(),
240242
}),
241-
DataType::Union(union_types) => ArrowTypeEnum::Union(protobuf::Union {
242-
union_types: union_types
243-
.iter()
244-
.map(|field| field.into())
245-
.collect::<Vec<_>>(),
246-
}),
243+
DataType::Union(union_types, union_mode) => {
244+
let union_mode = match union_mode {
245+
UnionMode::Sparse => protobuf::UnionMode::Sparse,
246+
UnionMode::Dense => protobuf::UnionMode::Dense,
247+
};
248+
ArrowTypeEnum::Union(protobuf::Union {
249+
union_types: union_types
250+
.iter()
251+
.map(|field| field.into())
252+
.collect::<Vec<_>>(),
253+
union_mode: union_mode.into(),
254+
})
255+
}
247256
DataType::Dictionary(key_type, value_type) => {
248257
ArrowTypeEnum::Dictionary(Box::new(protobuf::Dictionary {
249258
key: Some(Box::new(key_type.as_ref().into())),
@@ -387,7 +396,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
387396
| DataType::FixedSizeList(_, _)
388397
| DataType::LargeList(_)
389398
| DataType::Struct(_)
390-
| DataType::Union(_)
399+
| DataType::Union(_, _)
391400
| DataType::Dictionary(_, _)
392401
| DataType::Map(_, _)
393402
| DataType::Decimal(_, _) => {

ballista/rust/core/src/serde/mod.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
2121
use std::{convert::TryInto, io::Cursor};
2222

23+
use datafusion::arrow::datatypes::UnionMode;
2324
use datafusion::logical_plan::{JoinConstraint, JoinType, Operator};
2425
use datafusion::physical_plan::aggregates::AggregateFunction;
2526
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
@@ -246,13 +247,24 @@ impl TryInto<datafusion::arrow::datatypes::DataType>
246247
.map(|field| field.try_into())
247248
.collect::<Result<Vec<_>, _>>()?,
248249
),
249-
arrow_type::ArrowTypeEnum::Union(union) => DataType::Union(
250-
union
250+
arrow_type::ArrowTypeEnum::Union(union) => {
251+
let union_mode = protobuf::UnionMode::from_i32(union.union_mode)
252+
.ok_or_else(|| {
253+
proto_error(
254+
"Protobuf deserialization error: Unknown union mode type",
255+
)
256+
})?;
257+
let union_mode = match union_mode {
258+
protobuf::UnionMode::Dense => UnionMode::Dense,
259+
protobuf::UnionMode::Sparse => UnionMode::Sparse,
260+
};
261+
let union_types = union
251262
.union_types
252263
.iter()
253264
.map(|field| field.try_into())
254-
.collect::<Result<Vec<_>, _>>()?,
255-
),
265+
.collect::<Result<Vec<_>, _>>()?;
266+
DataType::Union(union_types, union_mode)
267+
}
256268
arrow_type::ArrowTypeEnum::Dictionary(dict) => {
257269
let pb_key_datatype = dict
258270
.as_ref()

ballista/rust/executor/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ edition = "2018"
2929
snmalloc = ["snmalloc-rs"]
3030

3131
[dependencies]
32-
arrow = { version = "6.4.0" }
33-
arrow-flight = { version = "6.4.0" }
32+
arrow = { version = "7.0.0" }
33+
arrow-flight = { version = "7.0.0" }
3434
anyhow = "1"
3535
async-trait = "0.1.36"
3636
ballista-core = { path = "../core", version = "0.6.0" }
@@ -43,7 +43,7 @@ snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true}
4343
tempfile = "3"
4444
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
4545
tokio-stream = { version = "0.1", features = ["net"] }
46-
tonic = "0.5"
46+
tonic = "0.6"
4747
uuid = { version = "0.8", features = ["v4"] }
4848

4949
[dev-dependencies]

ballista/rust/scheduler/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ http-body = "0.4"
4444
hyper = "0.14.4"
4545
log = "0.4"
4646
parse_arg = "0.1.3"
47-
prost = "0.8"
47+
prost = "0.9"
4848
rand = "0.8"
4949
serde = {version = "1", features = ["derive"]}
5050
sled_package = { package = "sled", version = "0.34", optional = true }
5151
tokio = { version = "1.0", features = ["full"] }
5252
tokio-stream = { version = "0.1", features = ["net"], optional = true }
53-
tonic = "0.5"
53+
tonic = "0.6"
5454
tower = { version = "0.4" }
5555
warp = "0.3"
5656

@@ -60,7 +60,7 @@ uuid = { version = "0.8", features = ["v4"] }
6060

6161
[build-dependencies]
6262
configure_me_codegen = "0.4.1"
63-
tonic-build = { version = "0.5" }
63+
tonic-build = { version = "0.6" }
6464

6565
[package.metadata.configure_me.bin]
6666
scheduler = "scheduler_config_spec.toml"

datafusion-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,5 @@ clap = "2.33"
3131
rustyline = "9.0"
3232
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
3333
datafusion = { path = "../datafusion", version = "6.0.0" }
34-
arrow = { version = "6.4.0" }
34+
arrow = { version = "7.0.0" }
3535
ballista = { path = "../ballista/rust/client", version = "0.6.0" }

0 commit comments

Comments
 (0)