Conversation
|
I've added nested field access for pg/generic dialect for things like |
|
@alamb Thanks |
alamb
left a comment
There was a problem hiding this comment.
Thank you for this contribution @Igosuki. Sorry that took so long. It was quite a bit of code 😅 !
The code is really nicely done, seems very well tested, and was a pleasure to review.
High level things I think we need prior to merging this in:
- Make the new
avro-rsdependency optional - Commit changes to the arrow-testing repo (or maybe we need a new repo if they decide that having avro files in an arrow repo is not ideal)
- Remove the changes for
GetFieldExpr(I think we should make that its own PR -- perhaps based off of #628, it just needed someone to push it over the line) - Adding 1 basic end-to-end test to
sql.rsthat reads one of the newly added avro files and does some simple query on it
I didn't look at the ballista changes carefully -- but I skimmed them and they looked reasonable to me
Also, FWIW I think this PR may be related to #811 (but I would say that PR is still in a bit of a WIP so we shouldn't hold this PR up).
.gitmodules
Outdated
| [submodule "testing"] | ||
| path = testing | ||
| url = https://github.com/apache/arrow-testing | ||
| url = https://github.com/Igosuki/arrow-testing.git |
There was a problem hiding this comment.
prior to merging this PR we should request/merge the changes into apache/arrow-testing I think
There was a problem hiding this comment.
This should be good to change back now that the PR is merged.
| BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256), | ||
| BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384), | ||
| BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512), | ||
| BuiltinScalarFunction::ToTimestampMillis => { |
There was a problem hiding this comment.
the addition of ToTimestampMillis seems unrelated to this PR, but is a good addition
datafusion/Cargo.toml
Outdated
| lazy_static = { version = "^1.4.0", optional = true } | ||
| smallvec = { version = "1.6", features = ["union"] } | ||
| rand = "0.8" | ||
| avro-rs = { version = "0.13", features = ["snappy"] } |
There was a problem hiding this comment.
As the datafusion codebase already has a non trivial number of depenencies I would personally prefer we did not add additional required ones.
What would you think about making
| avro-rs = { version = "0.13", features = ["snappy"] } | |
| avro-rs = { version = "0.13", features = ["snappy"], optional=true } |
And then document it as a crate feature -- e.g. like regex_expressions (but not a default feature)?
I think most of the rest of the code in this PR could be left the same and only the part that interacts with avro-rs could be #[cfg(...)] out
| return Ok(i); | ||
| } | ||
| } | ||
| println!("{}", name); |
There was a problem hiding this comment.
perhaps a stray debugging leftover
datafusion/src/logical_plan/expr.rs
Outdated
| /// arithmetic negation of an expression, the operand must be of a signed numeric data type | ||
| Negative(Box<Expr>), | ||
| /// Returns the field of a [`StructArray`] by name | ||
| GetField { |
There was a problem hiding this comment.
This appears to be some part of #628 -- I think it would be cleaner if we revived that PR to get the GetField functionality separately rather than including it in a single large PR
| n.into_iter() | ||
| .map(|v| { | ||
| resolve_string(&v) | ||
| // else if matches!( |
There was a problem hiding this comment.
perhaps this should return an error?
datafusion/src/physical_plan/avro.rs
Outdated
| Ok(Box::pin(AvroStream::new(builder.build(rdr)?, self.limit))) | ||
| } else { | ||
| Err(DataFusionError::Execution( | ||
| "Error reading CSV: Data can only be read a single time when the source is a reader" |
There was a problem hiding this comment.
| "Error reading CSV: Data can only be read a single time when the source is a reader" | |
| "Error reading AVRO: Data can only be read a single time when the source is a reader" |
datafusion/src/physical_plan/avro.rs
Outdated
| Source::Reader(rdr) => { | ||
| if partition != 0 { | ||
| Err(DataFusionError::Internal( | ||
| "Only partition 0 is valid when CSV comes from a reader" |
There was a problem hiding this comment.
| "Only partition 0 is valid when CSV comes from a reader" | |
| "Only partition 0 is valid when AVRO comes from a reader" |
| Vec::new() | ||
| } | ||
|
|
||
| fn with_new_children( |
There was a problem hiding this comment.
This code seems to support creating a table backed by multiple avro files (as is supported by the csv and parquet readers), but I don't see a test for that functionality anywhere.
Maybe you could have a test in sql.rs that refered to the same test files twice and ensured we got back two copies of the data
There was a problem hiding this comment.
Added a test for that in sql.rs
|
@alamb will address everything soon |
|
PR in arrow-testing apache/arrow-testing#62 |
|
I think I handled all your comments, let me know if there's more I can do |
datafusion/Cargo.toml
Outdated
| smallvec = { version = "1.6", features = ["union"] } | ||
| rand = "0.8" | ||
| avro-rs = { version = "0.13", features = ["snappy"], optional = true } | ||
| num-traits = "0.2" |
There was a problem hiding this comment.
I am not seeing num_traits being used in the code?
NVM, I see it's used in the avro reader code :)
There was a problem hiding this comment.
It looks like num-traits should be marked as optional and be included as part of avro feature flag?
There was a problem hiding this comment.
| num-traits = "0.2" | |
| num-traits = { version = "0.2", optional = true } |
|
|
||
| //! DataFusion data sources | ||
|
|
||
| pub mod avro; |
There was a problem hiding this comment.
this should be gated by feature flag, no?
There was a problem hiding this comment.
Yes, then the feature flags in the avro module can be removed
|
|
||
| /// Create a new NTH_VALUE window aggregate function | ||
| pub fn nth_value( | ||
| pub fn value( |
There was a problem hiding this comment.
maybe first, last and nth would be better names here, cc @jimexist
datafusion/Cargo.toml
Outdated
| smallvec = { version = "1.6", features = ["union"] } | ||
| rand = "0.8" | ||
| avro-rs = { version = "0.13", features = ["snappy"], optional = true } | ||
| num-traits = "0.2" |
There was a problem hiding this comment.
It looks like num-traits should be marked as optional and be included as part of avro feature flag?
@nevi-me I think having a reader in arrow-rs is definitely something to consider (I can definitely see other projects wanting to read from avro files to Arrow arrays). Given this PR is already fairly large I suggest we get it in as is and if there is interest / appetite to moving the avro reader to arrow-rs we do it as a follow on PR -- I filed apache/arrow-rs#727 to track this idea |
|
(And welcome back @nevi-me ! ) |
alamb
left a comment
There was a problem hiding this comment.
I think this is looking great @Igosuki - thank you so much!
I think the needed items prior to merging this are:
- Merge apache/arrow-testing#62 (review) and update references in this PR
- Get a clean CI run (looks like maybe a few more places need
#[cfg(feature = "avro")]sprinkled) as well as adding some apache license files
Again, thank you so much
datafusion/tests/sql.rs
Outdated
| \n CoalescePartitionsExec\ | ||
| \n HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ | ||
| \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ | ||
| \n ExecutionPlan(PlaceHolder)\ |
There was a problem hiding this comment.
note the fact that this says Placeholder can be fixed by implementing fmt_as in the Avro ExecutionPlan
datafusion/tests/sql.rs
Outdated
|
|
||
| #[cfg(feature = "avro")] | ||
| #[tokio::test] | ||
| async fn avro_explain_analyze() { |
There was a problem hiding this comment.
I don't think this particular test adds a lot of test coverage -- the non avro version is to ensure execution metrics are propagated correctly.
|
|
||
| #[cfg(feature = "avro")] | ||
| #[tokio::test] | ||
| async fn avro_query_multiple_files() { |
| export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data | ||
| # run tests on all workspace members with default feature list | ||
| cargo test | ||
| # run tests on all workspace members with default feature list + avro |
|
Looks like I should have run the entire CI pipeline before committing, will fix |
| }; | ||
| Ok(lit(utf8_val)) | ||
| Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => { | ||
| let utf8_val = if utf8_val == "foo" { |
There was a problem hiding this comment.
maybe this isn't intended to be merged?
There was a problem hiding this comment.
@jimexist I get a clippy failure on nightly :
error: unnecessary nested `if let` or `match`
--> datafusion/src/logical_plan/expr.rs:1915:21
|
1915 | / if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
1916 | | let utf8_val = if utf8_val == "foo" {
1917 | | "bar".to_string()
1918 | | } else {
... |
1923 | | Ok(Expr::Literal(scalar))
1924 | | }
| |_____________________^
|
= note: `-D clippy::collapsible-match` implied by `-D warnings`
help: the outer pattern can be modified to include the inner pattern
--> datafusion/src/logical_plan/expr.rs:1914:31
|
1914 | Expr::Literal(scalar) => {
| ^^^^^^ replace this binding
1915 | if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ with this pattern
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_match
|
rebased |
|
Rebased again after new conflict |
nevi-me
left a comment
There was a problem hiding this comment.
This looks great. I only reviewed the avro changes in datafusion in detail, so skimmed through ballista.
The feature flag can be simplified before this is merged.
Avro -> Arrow related changes can be addressed when/if moving the code to arrow-rs (which I'd be happy to do)
|
|
||
| //! This module contains utilities to manipulate avro metadata. | ||
|
|
||
| #[cfg(feature = "avro")] |
There was a problem hiding this comment.
It should be possible to move this feature flag to datafusion/src/lib.rs so you don't include it for each module here
There was a problem hiding this comment.
I think this would also be ok to do as a follow on PR
datafusion/src/avro_to_arrow/mod.rs
Outdated
|
|
||
| #[cfg(feature = "avro")] | ||
| /// Infer Avro schema given a reader | ||
| pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> { |
There was a problem hiding this comment.
nit: is it inference, or reading the schema from the file/input?
There was a problem hiding this comment.
It is actually reading the schema from the input and not infering. I went that way initially, because in the Java world, there is such a use case as infering avro schemas from a stream of avro datums.
The use case would be streaming data into arrow from avro, but it's not the case here so we can simply remove Seek and rename this.
| name: Option<&str>, | ||
| nullable: bool, | ||
| ) -> Result<Field> { | ||
| schema_to_field_with_props(schema, name, nullable, Some(&Default::default())) |
There was a problem hiding this comment.
You could pass None as the props, and then only set the metadata if there are actual properties.
We can adjust this when moving this module to arrow-rs though
| AvroSchema::Decimal { | ||
| precision, scale, .. | ||
| } => DataType::Decimal(*precision, *scale), | ||
| AvroSchema::Uuid => DataType::Utf8, |
There was a problem hiding this comment.
This should be a binary field, DataType::FixedSizeLen ideally because you otherwise lose the type information (that you're dealing with an UUID).
Extension arrays will also be helpful in future for this. UUID is common enough across different data sources (SQL, Parquet) that we might want to preserve its properties.
|
|
||
| //! DataFusion data sources | ||
|
|
||
| pub mod avro; |
There was a problem hiding this comment.
Yes, then the feature flags in the avro module can be removed
| let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose(); | ||
|
|
||
| Ok(match array { | ||
| ColumnarValue::Scalar(scalar) => { |
There was a problem hiding this comment.
is this change related to the avro addition?
There was a problem hiding this comment.
@nevi-me This is another clippy issue in nightly
|
|
||
| /// Create a new NTH_VALUE window aggregate function | ||
| pub fn nth_value( | ||
| pub fn value( |
There was a problem hiding this comment.
If no decision is made here, we could defer the clippy changes until they land on stable
| pub mod aggregates; | ||
| pub mod analyze; | ||
| pub mod array_expressions; | ||
| pub mod avro; |
There was a problem hiding this comment.
This should also be feature-flagged so that the flags inside its file can be removed. There's no use compiling the avro module if the flag is disabled.
There was a problem hiding this comment.
That implies adding the avro feature flag in a number of other places such as execution/context.rs, is that what you are asking for implicitly ?
There was a problem hiding this comment.
I think @nevi-me is suggesting something like
| pub mod avro; | |
| #[cfg(feature = "avro")] | |
| pub mod avro; |
Which would then let you avoid so much #[cfg...] in #910 (comment)
I am not sure what other changes that entails
There was a problem hiding this comment.
@alamb The physical plan is used everywhere in the codebase, so feature gating in the avro submod of physical plan seemed to be the best thing to do
|
It seems adding (compile time obviously increased by quite a bit) However, those tests now fail somewhere else (extract), because the arrow test data is not available here: |
|
Tests pass on my local environment setting RUST_MIN_STACK_SIZE without changing cargo, I put it in the github workflow to see if it passes |
|
Thanks for merging, hope to contribute some more. |
|
Thanks @Igosuki great contribution! Looking forward to future contributions 😁 |
| AvroSchema::Enum { symbols, name, .. } => { | ||
| return Ok(Field::new_dict( | ||
| &name.fullname(None), | ||
| index_type(symbols.len()), |
There was a problem hiding this comment.
Sorry for being late to the party. While porting this to arrow2 I noticed it: I think we want here
DataType::Dictionary(Box::new(index_type(symbols.len())), Box::new(DataType::Utf8))
Field::new_dict does not create a new dict datatype, it is only used to set the dict_id, order, etc.
There was a problem hiding this comment.
@jorgecarleitao thanks for pointing that out, I should fix it
* date_add test case. * Add DateAdd to proto and QueryPlanSerde. Next up is the native side. * Add DateAdd in planner.rs that generates a Literal for right child. Need to confirm if any other type of expression can occur here. * Minor refactor. * Change test predicate to actually select some rows. * Switch to scalar UDF implementation for date_add. * Docs and minor refactor. * Add a new test to explicitly cover array scenario. * cargo clippy fixes * Fix Scala 2.13. * New approved plans for q72 due to date_add. * Address first round of feedback. * Add date_sub and tests. * Fix error message to be more general. * Update error message for Spark 4.0+ * Support Int8 and Int16 for days.
* date_add test case. * Add DateAdd to proto and QueryPlanSerde. Next up is the native side. * Add DateAdd in planner.rs that generates a Literal for right child. Need to confirm if any other type of expression can occur here. * Minor refactor. * Change test predicate to actually select some rows. * Switch to scalar UDF implementation for date_add. * Docs and minor refactor. * Add a new test to explicitly cover array scenario. * cargo clippy fixes * Fix Scala 2.13. * New approved plans for q72 due to date_add. * Address first round of feedback. * Add date_sub and tests. * Fix error message to be more general. * Update error message for Spark 4.0+ * Support Int8 and Int16 for days.
Bumps [pyo3](https://github.com/pyo3/pyo3) from 0.22.3 to 0.22.4. - [Release notes](https://github.com/pyo3/pyo3/releases) - [Changelog](https://github.com/PyO3/pyo3/blob/main/CHANGELOG.md) - [Commits](https://github.com/pyo3/pyo3/commits) --- updated-dependencies: - dependency-name: pyo3 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Which issue does this PR close?
Closes #903.
Rationale for this change
Enables loading avro data files through datafusion.
What changes are included in this PR?
Avro is added as a table provider and a supported file format.
Avro schemas can be translated into arrow schemas.
Are there any user-facing changes?
Yes, as one can now call register_avro on df and 'STORED AS AVRO' in sqsl.
N.B.:
Missing :
I find there is duplication between modules with these additions, I should probably do some refactoring.