Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Arrow/Parquet to 51.0.0, tonic to 0.11 #9613

Merged
merged 11 commits into from
Mar 19, 2024
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ version = "36.0.0"
# for the inherited dependency but cannot do the reverse (override from true to false).
#
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
arrow = { version = "50.0.0", features = ["prettyprint"] }
arrow-array = { version = "50.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "50.0.0", default-features = false }
arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
arrow-ipc = { version = "50.0.0", default-features = false, features = ["lz4"] }
arrow-ord = { version = "50.0.0", default-features = false }
arrow-schema = { version = "50.0.0", default-features = false }
arrow-string = { version = "50.0.0", default-features = false }
arrow = { version = "51.0.0", features = ["prettyprint"] }
arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "51.0.0", default-features = false }
arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
arrow-ipc = { version = "51.0.0", default-features = false, features = ["lz4"] }
arrow-ord = { version = "51.0.0", default-features = false }
arrow-schema = { version = "51.0.0", default-features = false }
arrow-string = { version = "51.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
Expand Down Expand Up @@ -95,7 +95,7 @@ log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.9.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "50.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
rstest = "0.18.0"
serde_json = "1"
Expand Down
61 changes: 29 additions & 32 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rust-version = "1.72"
readme = "README.md"

[dependencies]
arrow = "50.0.0"
arrow = "51.0.0"
async-trait = "0.1.41"
aws-config = "0.55"
aws-credential-types = "0.55"
Expand All @@ -52,7 +52,7 @@ futures = "0.3"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.9.0", features = ["aws", "gcp", "http"] }
parking_lot = { version = "0.12" }
parquet = { version = "50.0.0", default-features = false }
parquet = { version = "51.0.0", default-features = false }
regex = "1.8"
rustyline = "11.0"
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "signal"] }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
# 0.10 and 0.11 are incompatible. Need to upgrade tonic to 0.11 when upgrading to arrow 51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# 0.10 and 0.11 are incompatible. Need to upgrade tonic to 0.11 when upgrading to arrow 51

No longer need this comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 456e2fe

tonic = "0.10"
tonic = "0.11"
url = { workspace = true }
uuid = "1.2"
58 changes: 29 additions & 29 deletions datafusion-examples/examples/deserialize_to_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,61 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::AsArray;
use arrow::datatypes::{Float64Type, Int32Type};
use datafusion::error::Result;
use datafusion::prelude::*;
use serde::Deserialize;
use futures::StreamExt;

/// This example shows that it is possible to convert query results into Rust structs .
/// It will collect the query results into RecordBatch, then convert it to serde_json::Value.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache/arrow-rs#5318 deprecated the serde_json based APIs

/// Then, serde_json::Value is turned into Rust's struct.
/// Any datatype with `Deserialize` implemeneted works.
#[tokio::main]
async fn main() -> Result<()> {
let data_list = Data::new().await?;
println!("{data_list:#?}");
Ok(())
}

#[derive(Deserialize, Debug)]
#[derive(Debug)]
struct Data {
#[allow(dead_code)]
int_col: i64,
int_col: i32,
#[allow(dead_code)]
double_col: f64,
}

impl Data {
pub async fn new() -> Result<Vec<Self>> {
// this group is almost the same as the one you find it in parquet_sql.rs
let batches = {
let ctx = SessionContext::new();
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();

ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;

let df = ctx
.sql("SELECT int_col, double_col FROM alltypes_plain")
.await?;
let df = ctx
.sql("SELECT int_col, double_col FROM alltypes_plain")
.await?;

df.clone().show().await?;
df.clone().show().await?;

df.collect().await?
};
let batches: Vec<_> = batches.iter().collect();
let mut stream = df.execute_stream().await?;
let mut list = vec![];
while let Some(b) = stream.next().await.transpose()? {
let int_col = b.column(0).as_primitive::<Int32Type>();
let float_col = b.column(1).as_primitive::<Float64Type>();

// converts it to serde_json type and then convert that into Rust type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think showing how to use serde to convert arrow --> rust structs is important. While I am well aware its performance is not good, the serde concept is widely understood and supported in the rust Ecosystem.

Is there any API that can do serde into Rust structs in the core arrow crates anymore?

If not, perhaps we can point in comments at a crate like https://github.com/chmp/serde_arrow (or bring an example that parses the JSON back to Json::Value and then serde's)

We/I can do this as a follow on PR

Copy link
Contributor Author

@tustvold tustvold Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can serialize to JSON and parse it, but I would rather encourage people towards the performant way of doing things

Is there any API that can do serde into Rust structs in the core arrow crates anymore?

I'd dispute that we ever really had a way to do this, going via serde_json::Value is more of a hack than anything else. Serializing to a JSON string and back will likely be faster

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd dispute that we ever really had a way to do this, going via serde_json::Value is more of a hack than anything else. Serializing to a JSON string and back will likely be faster

The key thing in my mind is to make it easy / quick for new users to get something working quickly. I am well aware that custom array -> struct will be the fastest performance, but I think it takes non trivial expertise in manipulating the arrow-rs API (especially when it comes to StructArray and ListArray) -- so offering them a fast way to get started with a slower API is important I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since this is an example, we can always update / improve it as a follow on PR

let list = arrow::json::writer::record_batches_to_json_rows(&batches[..])?
.into_iter()
.map(|val| serde_json::from_value(serde_json::Value::Object(val)))
.take_while(|val| val.is_ok())
.map(|val| val.unwrap())
.collect();
for (i, f) in int_col.values().iter().zip(float_col.values()) {
list.push(Data {
int_col: *i,
double_col: *f,
})
}
}

Ok(list)
}
Expand Down
9 changes: 8 additions & 1 deletion datafusion-examples/examples/flight/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
use std::sync::Arc;

use arrow_flight::SchemaAsIpc;
use arrow_flight::{PollInfo, SchemaAsIpc};
use datafusion::arrow::error::ArrowError;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
Expand Down Expand Up @@ -177,6 +177,13 @@ impl FlightService for FlightServiceImpl {
) -> Result<Response<Self::DoExchangeStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
}

async fn poll_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<PollInfo>, Status> {
Err(Status::unimplemented("Not yet implemented"))
}
}

fn to_tonic_err(e: datafusion::error::DataFusionError) -> Status {
Expand Down
3 changes: 3 additions & 0 deletions datafusion-examples/examples/flight/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ impl FlightSqlService for FlightSqlServiceImpl {
let endpoint = FlightEndpoint {
ticket: Some(ticket),
location: vec![],
expiration_time: None,
app_metadata: Default::default(),
};
let endpoints = vec![endpoint];

Expand All @@ -329,6 +331,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
total_records: -1_i64,
total_bytes: -1_i64,
ordered: false,
app_metadata: Default::default(),
};
let resp = Response::new(info);
Ok(resp)
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ pub(crate) fn parse_encoding_string(
"plain" => Ok(parquet::basic::Encoding::PLAIN),
"plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
"rle" => Ok(parquet::basic::Encoding::RLE),
#[allow(deprecated)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the reference (to the JSON writer) when this is for parquet encoding. Is there some other encoding/compression scheme that was deprecated too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copypasta meant to link to apache/arrow-rs#5348

"bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
"delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
"delta_length_byte_array" => {
Expand Down
Loading
Loading