-
Notifications
You must be signed in to change notification settings - Fork 2
data source v2 prototype #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
39036b9
to
bc078e1
Compare
8f59eaa
to
4d37934
Compare
|
||
public interface FilterPushDownSupport { | ||
// TODO: shall we introduce the concept of "unhandledFilters"? Personally I feel it's not very | ||
// useful and we can always execute the filter again at Spark side. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine unless the filter is very expensive (for example if it is some kind of UDF).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the public Filter
interface, we can only push down very simple filters, so this should not have performance issues. But we may need to add unhandledFilters
to CatalystFilterPushDownSupport
, which support arbitrary expressions and may have very expensive predicates.
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
public interface ColumnPruningSupport { | ||
void pruneColumns(String[] requiredColumns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work with nested columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question. It will be hard/tricky to support nested column pruning with this API, how about we use requiredSchema: StrcutType
as parameter?
4d37934
to
b78cc3d
Compare
* @param partitionColumns The columns for hash partition keys. | ||
* @param hashFunction the hash function, can be `murmur3` or `hive`. | ||
*/ | ||
void pushDownHashPartition(String[] partitionColumns, String hashFunction); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we keep the hashFunction
parameter or have 2 interfaces: DefaultHashPartitionPushDownSupport
and HiveHashPartitionPushDownSupport
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* required columns during scan. | ||
*/ | ||
public interface ColumnPruningSupport { | ||
void pruneColumns(String[] requiredColumns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be hard/tricky to support nested column pruning with this API, how about we use requiredSchema: StrcutType
as parameter? cc @marmbrus
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That could work.
b78cc3d
to
b197884
Compare
|
||
/** | ||
* The main interface and minimal requirement for data source v2 implementations. Users can mix in | ||
* more interfaces to implement more functionalists other than just scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
functionalists -> functions
*/ | ||
DataSourceV2Reader getReader( | ||
StructType schema, | ||
Map<String, String> options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd create an abstraction to make it clear this is case insensitive.
*/ | ||
DataSourceV2Writer getWriter(SaveMode mode, Map<String, String> options); | ||
|
||
// TODO: shall we force users to implement this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should
* careful when reading these options in the implementations. | ||
* @return a writer that implements the actual write logic. | ||
*/ | ||
DataSourceV2Writer getWriter(SaveMode mode, Map<String, String> options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getWriter -> createWriter
* careful when reading these options in the implementations. | ||
* @return a reader that implements the actual read logic. | ||
*/ | ||
DataSourceV2Reader getReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getReader -> createReader
* sources may have to infer the schema and reject any user specified schemas, they can overwrite | ||
* this method to achieve this. | ||
*/ | ||
default boolean allowUserSpecifiedSchema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allow -> allows, or accepts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or supportsUserDefinedSchema
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to propagate size | ||
* information to Spark. | ||
*/ | ||
public interface SizeEstimationSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe generalize it to statistics support, and look for the useful statistics ....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the name more general? StatsEstimationSupport
* The actual schema of this data source reader, which may be different from the physical schema | ||
* of the underlying storage, as column pruning or other optimizations may happen. | ||
*/ | ||
public abstract StructType schema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's unclear whether this is the returned data's schema, or the full schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about renaming it to actualSchema
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readSchema?
* A generator that returned by a data source reader and is responsible to generate data for an RDD | ||
* partition. | ||
*/ | ||
public interface DataGenerator<T> extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"generator" is not a great name here since it almost implies the data is just generated rather than being scanned (read).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK after reading through all the code I'd call this DataReadTask, or just ReadTask
* at least implement the full scan logic, users can mix in more interfaces to implement scan | ||
* optimizations like column pruning, filter push down, etc. | ||
*/ | ||
public abstract class DataSourceV2Reader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we put the rest of the interfaces for enhancing this reader in one file? we can also get rid of the reader package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually after thinking more about it, i think this is fine.
* output. | ||
*/ | ||
// TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row. | ||
public abstract List<DataGenerator<Row>> readRows(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we deal with the case in which a data source can read some schema in column format, and some cannot? do we require the data source itself to handle that inside and do the conversion from row to column when needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users implement the ColumnarReadSupport
, they have to output columnar data for all required columns. If some columns are not in columnar format, the implementation itself is responsible to handle it inside and do the conversion.
Alternatively, we can add one more method in ColumnarReadSupport
: boolean supportColumnar()
. By default it's true, but users can overwrite it according to the required schema, to decide to do this scan in columnar fashion or row fashion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add the supportsColumnarReads or supportsColumnBatch method
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-partition | ||
* the data and avoid shuffle at Spark side. | ||
*/ | ||
public interface HashPartitionPushDownSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a weird one and i'm not sure it can be useful as is. there are typically two cases:
- the data source is already partitioned by a key, and as a result Spark doesn't need to do a shuffle.
- the data source is capable of doing shuffle by arbitrary column itself.
This interface is designed for case 2, but case 1 is far more common than case 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it's designed for case 1...
At the beginning, I was thinking of doing hash partitioning propagation, which looks more natural to support case 1. However, some data sources may be partitioned in multiple ways, e.g. it's hash partitioned by a, b, and also hash partitioned by c, d. It will be a little messy to ask a data source to propagate all the possible hash partitionings.
Instead, I think it can be simpler if we do hash partitioning push down. The implementation just need to tell us if it can do this hash partitioning or not. That said, this method should return a boolean instead of void.
*/ | ||
DataSourceV2Writer getWriter(SaveMode mode, Map<String, String> options); | ||
|
||
// TODO: shall we force users to implement this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd force them to implement everything. When implementing a data source this is really trivial if it is no-op. Forcing them to implement it would make it less likely for somebody to forget.
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} | ||
import org.apache.spark.sql.sources.v2.reader.{DataSourceV2Reader, SizeEstimationSupport} | ||
|
||
case class DataSourceV2Relation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would we be able to implement v1 using v2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. For data source v1, users give us RDD
directly, and I'm not sure how to turn a RDD
into a list of ReadTask
.
} | ||
} | ||
|
||
class JavaAdvancedDataGenerator implements DataGenerator <Row> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix the space around <Row>
import org.apache.spark.sql.Row; | ||
import org.apache.spark.sql.test.TestSparkSession; | ||
|
||
public class JavaDataSourceV2Suite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to write the test suite itself in scala since scala test has better reporting, etc
|
||
/** | ||
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to only read | ||
* required columns during scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some comment about nested columns so it is clear that this is not just for top level pruning.
b197884
to
f587fb5
Compare
write(encoder.fromRow(row)); | ||
} | ||
|
||
public abstract void commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return a message
* output. | ||
*/ | ||
// TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row. | ||
public abstract List<ReadTask<Row>> readRows(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Array
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readRows
-> getReadTasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get -> create, since it might be expensive. get suggests that it is cheap
public interface LimitPushDownSupport { | ||
// todo: shall we make this method return boolean, so that we can safely avoid to re-do the limit | ||
// at Spark side? Personlly I think we don't need to bother as limit is very cheap at Spark side. | ||
void pushDownLimit(int limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return boolean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add TopKPushDown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need that? if we push both sort and limit down, it is topk isn't it?
@Experimental | ||
@InterfaceStability.Unstable | ||
public interface CatalystFilterPushDownSupport { | ||
void pushDownCatalystFilters(Expression[] filters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add unHandledFilters
* Whether or not this data source can accept user specified schema. When Spark scans a data | ||
* source, users can specify the schema to avoid expensive schema inference. However some data | ||
* sources may have to infer the schema and reject any user specified schemas, they can overwrite | ||
* this method to achieve this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Also, describe the meaning of the returned boolean by adding @return
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the first sentence Whether or not this data source can accept user specified schema
already described the meaning?
|
||
/** | ||
* A mix in interface for `DataSourceV2`. Users can implement this interface to provide data writing | ||
* ability with job level transaction. The writing procedure is: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job-level transaction support
* 3. for each partition, if all data are written successfully, call writer.commit(). | ||
* 4. for each partition, if exception happens during the writing, call writer.abort(). | ||
* 5. if all partitions are written successfully, call WritableDataSourceV2.commit(). | ||
* 6. if some partitions are failed, call WritableDataSourceV2.abort(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are failed
-> failed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean some partitions abort
? We will hold until all the partitions return results (either commit
or abort
)?
|
||
/** | ||
* Inside Spark, the data is `UnsafeRow` and will be converted to `Row` before sending it out of | ||
* Spark and writing to data source. The avoid this conversion, implementations can overwrite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The avoid
-> To avoid
/** | ||
* Inside Spark, the data is `UnsafeRow` and will be converted to `Row` before sending it out of | ||
* Spark and writing to data source. The avoid this conversion, implementations can overwrite | ||
* this method and write `UnsafeRow`s directly. Note that, this is an experimental and unstable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and write
-> writes
import org.apache.spark.sql.types.StructType; | ||
|
||
/** | ||
* A writer which is responsible to write the data for each input partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which is responsible to write
-> that is responsible for writing
* Returns true if the implementation can handle this sample operation, so that we can reduce | ||
* the data size to be read at the very beginning. | ||
*/ | ||
boolean pushDownSample(double fraction); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the other parameters
seed
?
withReplacement
?
lowerBound
?
upperBound
?
Actually, sample has different algorithms. Do we need to pass the method identifier?
For example, https://www.toadworld.com/platforms/ibmdb2/w/wiki/7748.data-sampling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Spark or end users need to care about sample algorithms? I think it's more flexible for data source implementations to implement sample if fraction
is the only requirement.
import java.io.Serializable; | ||
|
||
/** | ||
* A read task that returned by a data source reader and is responsible to output data for an RDD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove that
and is responsible to output
-> is responsible for outputing
/** | ||
* The preferred locations for this read task to run faster, but Spark can't guarantee that this | ||
* task will always be run on these locations, implementations should make sure that it can | ||
* be run on any locations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
locations
-> location
public interface ReadTask<T> extends Serializable { | ||
/** | ||
* The preferred locations for this read task to run faster, but Spark can't guarantee that this | ||
* task will always be run on these locations, implementations should make sure that it can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove be
.
, implementations
-> . Implementations
f587fb5
to
3e14861
Compare
3e14861
to
d1818b6
Compare
## What changes were proposed in this pull request? This PR aims at improving the way physical plans are explained in spark. Currently, the explain output for physical plan may look very cluttered and each operator's string representation can be very wide and wraps around in the display making it little hard to follow. This especially happens when explaining a query 1) Operating on wide tables 2) Has complex expressions etc. This PR attempts to split the output into two sections. In the header section, we display the basic operator tree with a number associated with each operator. In this section, we strictly control what we output for each operator. In the footer section, each operator is verbosely displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be correlated by the originating expression id from its parent plan. To illustrate, here is a simple plan displayed in old vs new way. Example query1 : ``` EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0 ``` Old : ``` *(2) Project [key#2, max(val)#15] +- *(2) Filter (isnotnull(max(val#3)#18) AND (max(val#3)#18 > 0)) +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)#15, max(val#3)#18]) +- Exchange hashpartitioning(key#2, 200) +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21]) +- *(1) Project [key#2, val#3] +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0)) +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int> ``` New : ``` Project (8) +- Filter (7) +- HashAggregate (6) +- Exchange (5) +- HashAggregate (4) +- Project (3) +- Filter (2) +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 [codegen id : 1] Output: [key#2, val#3] (2) Filter [codegen id : 1] Input : [key#2, val#3] Condition : (isnotnull(key#2) AND (key#2 > 0)) (3) Project [codegen id : 1] Output : [key#2, val#3] Input : [key#2, val#3] (4) HashAggregate [codegen id : 1] Input: [key#2, val#3] (5) Exchange Input: [key#2, max#11] (6) HashAggregate [codegen id : 2] Input: [key#2, max#11] (7) Filter [codegen id : 2] Input : [key#2, max(val)#5, max(val#3)#8] Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0)) (8) Project [codegen id : 2] Output : [key#2, max(val)#5] Input : [key#2, max(val)#5, max(val#3)#8] ``` Example Query2 (subquery): ``` SELECT * FROM explain_temp1 WHERE KEY = (SELECT Max(KEY) FROM explain_temp2 WHERE KEY = (SELECT Max(KEY) FROM explain_temp3 WHERE val > 0) AND val = 2) AND val > 3 ``` Old: ``` *(1) Project [key#2, val#3] +- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3)) : +- Subquery scalar-subquery#39 : +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)apache#45]) : +- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47]) : +- *(1) Project [key#26] : +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2)) : : +- Subquery scalar-subquery#38 : : +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)apache#43]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49]) : : +- *(1) Project [key#28] : : +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0)) : : +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int> : +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int> +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int> ``` New: ``` Project (3) +- Filter (2) +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 [codegen id : 1] Output: [key#2, val#3] (2) Filter [codegen id : 1] Input : [key#2, val#3] Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3)) (3) Project [codegen id : 1] Output : [key#2, val#3] Input : [key#2, val#3] ===== Subqueries ===== Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23 HashAggregate (9) +- Exchange (8) +- HashAggregate (7) +- Project (6) +- Filter (5) +- Scan parquet default.explain_temp2 (4) (4) Scan parquet default.explain_temp2 [codegen id : 1] Output: [key#26, val#27] (5) Filter [codegen id : 1] Input : [key#26, val#27] Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2)) (6) Project [codegen id : 1] Output : [key#26] Input : [key#26, val#27] (7) HashAggregate [codegen id : 1] Input: [key#26] (8) Exchange Input: [max#35] (9) HashAggregate [codegen id : 2] Input: [max#35] Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22 HashAggregate (15) +- Exchange (14) +- HashAggregate (13) +- Project (12) +- Filter (11) +- Scan parquet default.explain_temp3 (10) (10) Scan parquet default.explain_temp3 [codegen id : 1] Output: [key#28, val#29] (11) Filter [codegen id : 1] Input : [key#28, val#29] Condition : (isnotnull(val#29) AND (val#29 > 0)) (12) Project [codegen id : 1] Output : [key#28] Input : [key#28, val#29] (13) HashAggregate [codegen id : 1] Input: [key#28] (14) Exchange Input: [max#37] (15) HashAggregate [codegen id : 2] Input: [max#37] ``` Note: I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow would not be able to immediately incorporate the feedback. I will start to work on them as soon as i can. Also, currently this PR provides a basic infrastructure for explain enhancement. The details about individual operators will be implemented in follow-up prs ## How was this patch tested? Added a new test `explain.sql` that tests basic scenarios. Need to add more tests. Closes apache#24759 from dilipbiswal/explain_feature. Authored-by: Dilip Biswal <dbiswal@us.ibm.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This PR aims to disable `to_timestamp('366', 'DD')` to recover `ansi` test suite in Java11+. ### Why are the changes needed? Currently, Daily Java 11 and 17 GitHub Action jobs are broken. - https://github.com/apache/spark/runs/5511239176?check_suite_focus=true - https://github.com/apache/spark/runs/5513540604?check_suite_focus=true **Java 8** ``` $ bin/spark-shell --conf spark.sql.ansi.enabled=true Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/03/12 00:59:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://172.16.0.31:4040 Spark context available as 'sc' (master = local[*], app id = local-1647075572229). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.3.0-SNAPSHOT /_/ Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_322) Type in expressions to have them evaluated. Type :help for more information. scala> sql("select to_timestamp('366', 'DD')").show java.time.format.DateTimeParseException: Text '366' could not be parsed, unparsed text found at index 2. If necessary set spark.sql.ansi.enabled to false to bypass this error. ``` **Java 11+** ``` $ bin/spark-shell --conf spark.sql.ansi.enabled=true Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/03/12 01:00:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://172.16.0.31:4040 Spark context available as 'sc' (master = local[*], app id = local-1647075607932). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.3.0-SNAPSHOT /_/ Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12) Type in expressions to have them evaluated. Type :help for more information. scala> sql("select to_timestamp('366', 'DD')").show java.time.DateTimeException: Invalid date 'DayOfYear 366' as '1970' is not a leap year. If necessary set spark.sql.ansi.enabled to false to bypass this error. ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Test with Java 11+. **BEFORE** ``` $ java -version openjdk version "17.0.2" 2022-01-18 LTS OpenJDK Runtime Environment Zulu17.32+13-CA (build 17.0.2+8-LTS) OpenJDK 64-Bit Server VM Zulu17.32+13-CA (build 17.0.2+8-LTS, mixed mode, sharing) $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z ansi/datetime-parsing-invalid.sql" ... [info] SQLQueryTestSuite: 01:23:00.219 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 01:23:05.209 ERROR org.apache.spark.sql.SQLQueryTestSuite: Error using configs: [info] - ansi/datetime-parsing-invalid.sql *** FAILED *** (267 milliseconds) [info] ansi/datetime-parsing-invalid.sql [info] Expected "java.time.[format.DateTimeParseException [info] Text '366' could not be parsed, unparsed text found at index 2]. If necessary set s...", but got "java.time.[DateTimeException [info] Invalid date 'DayOfYear 366' as '1970' is not a leap year]. If necessary set s..." Result did not match for query #8 [info] select to_timestamp('366', 'DD') (SQLQueryTestSuite.scala:476) ... [info] Run completed in 7 seconds, 389 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0 [info] *** 1 TEST FAILED *** [error] Failed tests: [error] org.apache.spark.sql.SQLQueryTestSuite [error] (sql / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful [error] Total time: 21 s, completed Mar 12, 2022, 1:23:05 AM ``` **AFTER** ``` $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z ansi/datetime-parsing-invalid.sql" ... [info] SQLQueryTestSuite: [info] - ansi/datetime-parsing-invalid.sql (390 milliseconds) ... [info] Run completed in 7 seconds, 673 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 20 s, completed Mar 12, 2022, 1:24:52 AM ``` Closes apache#35825 from dongjoon-hyun/SPARK-38534. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…n properly ### What changes were proposed in this pull request? Make `ResolveRelations` handle plan id properly ### Why are the changes needed? bug fix for Spark Connect, it won't affect classic Spark SQL before this PR: ``` from pyspark.sql import functions as sf spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1") spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2") df1 = spark.read.table("test_table_1") df2 = spark.read.table("test_table_2") df3 = spark.read.table("test_table_1") join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2) join2 = df3.join(join1, how="left", on=join1.index==df3.id) join2.schema ``` fails with ``` AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704 ``` That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations === '[#12]Join LeftOuter, '`==`('index, 'id) '[#12]Join LeftOuter, '`==`('index, 'id) !:- '[#9]UnresolvedRelation [test_table_1], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 !+- '[#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#10]Join Inner, '`==`('id, 'index) +- '[#11]Project ['index, 'value_2] ! :- '[#7]UnresolvedRelation [test_table_1], [], false +- '[#10]Join Inner, '`==`('id, 'index) ! +- '[#8]UnresolvedRelation [test_table_2], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 ! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#8]SubqueryAlias spark_catalog.default.test_table_2 ! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false Can not resolve 'id with plan 7 ``` `[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one ``` :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ``` ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? ci Closes apache#45214 from zhengruifeng/connect_fix_read_join. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…plan properly ### What changes were proposed in this pull request? Make `ResolveRelations` handle plan id properly cherry-pick bugfix apache#45214 to 3.5 ### Why are the changes needed? bug fix for Spark Connect, it won't affect classic Spark SQL before this PR: ``` from pyspark.sql import functions as sf spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1") spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2") df1 = spark.read.table("test_table_1") df2 = spark.read.table("test_table_2") df3 = spark.read.table("test_table_1") join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2) join2 = df3.join(join1, how="left", on=join1.index==df3.id) join2.schema ``` fails with ``` AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704 ``` That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations === '[#12]Join LeftOuter, '`==`('index, 'id) '[#12]Join LeftOuter, '`==`('index, 'id) !:- '[#9]UnresolvedRelation [test_table_1], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 !+- '[#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#10]Join Inner, '`==`('id, 'index) +- '[#11]Project ['index, 'value_2] ! :- '[#7]UnresolvedRelation [test_table_1], [], false +- '[#10]Join Inner, '`==`('id, 'index) ! +- '[#8]UnresolvedRelation [test_table_2], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 ! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#8]SubqueryAlias spark_catalog.default.test_table_2 ! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false Can not resolve 'id with plan 7 ``` `[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one ``` :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ``` ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? ci Closes apache#46291 from zhengruifeng/connect_fix_read_join_35. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
This is a trivial change to replace the loop index from `int` to `long`. Surprisingly, microbenchmark shows more than double performance uplift. Analysis -------- The hot loop of `arrayEquals` method is simplifed as below. Loop index `i` is defined as `int`, it's compared with `length`, which is a `long`, to determine if the loop should end. ``` public static boolean arrayEquals( Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) { ...... int i = 0; while (i <= length - 8) { if (Platform.getLong(leftBase, leftOffset + i) != Platform.getLong(rightBase, rightOffset + i)) { return false; } i += 8; } ...... } ``` Strictly speaking, there's a code bug here. If `length` is greater than 2^31 + 8, this loop will never end because `i` as a 32 bit integer is at most 2^31 - 1. But compiler must consider this behaviour as intentional and generate code strictly match the logic. It prevents compiler from generating optimal code. Defining loop index `i` as `long` corrects this issue. Besides more accurate code logic, JIT is able to optimize this code much more aggressively. From microbenchmark, this trivial change improves performance significantly on both Arm and x86 platforms. Benchmark --------- Source code: https://gist.github.com/cyb70289/258e261f388e22f47e4d961431786d1a Result on Arm Neoverse N2: ``` Benchmark Mode Cnt Score Error Units ArrayEqualsBenchmark.arrayEqualsInt avgt 10 674.313 ± 0.213 ns/op ArrayEqualsBenchmark.arrayEqualsLong avgt 10 313.563 ± 2.338 ns/op ``` Result on Intel Cascake Lake: ``` Benchmark Mode Cnt Score Error Units ArrayEqualsBenchmark.arrayEqualsInt avgt 10 1130.695 ± 0.168 ns/op ArrayEqualsBenchmark.arrayEqualsLong avgt 10 461.979 ± 0.097 ns/op ``` Deep dive --------- Dive deep to the machine code level, we can see why the big gap. Listed below are arm64 assembly generated by Openjdk-17 C2 compiler. For `int i`, the machine code is similar to source code, no deep optimization. Safepoint polling is expensive in this short loop. ``` // jit c2 machine code snippet 0x0000ffff81ba8904: mov w15, wzr // int i = 0 0x0000ffff81ba8908: nop 0x0000ffff81ba890c: nop loop: 0x0000ffff81ba8910: ldr x10, [x13, w15, sxtw] // Platform.getLong(leftBase, leftOffset + i) 0x0000ffff81ba8914: ldr x14, [x12, w15, sxtw] // Platform.getLong(rightBase, rightOffset + i) 0x0000ffff81ba8918: cmp x10, x14 0x0000ffff81ba891c: b.ne 0x0000ffff81ba899c // return false if not equal 0x0000ffff81ba8920: ldr x14, [x28, apache#848] // x14 -> safepoint 0x0000ffff81ba8924: add w15, w15, #0x8 // i += 8 0x0000ffff81ba8928: ldr wzr, [x14] // safepoint polling 0x0000ffff81ba892c: sxtw x10, w15 // extend i to long 0x0000ffff81ba8930: cmp x10, x11 0x0000ffff81ba8934: b.le 0x0000ffff81ba8910 // if (i <= length - 8) goto loop ``` For `long i`, JIT is able to do much more aggressive optimization. E.g, below code snippet unrolls the loop by four. ``` // jit c2 machine code snippet unrolled_loop: 0x0000ffff91de6fe0: sxtw x10, w7 0x0000ffff91de6fe4: add x23, x22, x10 0x0000ffff91de6fe8: add x24, x21, x10 0x0000ffff91de6fec: ldr x13, [x23] // unroll-1 0x0000ffff91de6ff0: ldr x14, [x24] 0x0000ffff91de6ff4: cmp x13, x14 0x0000ffff91de6ff8: b.ne 0x0000ffff91de70a8 0x0000ffff91de6ffc: ldr x13, [x23, #8] // unroll-2 0x0000ffff91de7000: ldr x14, [x24, #8] 0x0000ffff91de7004: cmp x13, x14 0x0000ffff91de7008: b.ne 0x0000ffff91de70b4 0x0000ffff91de700c: ldr x13, [x23, #16] // unroll-3 0x0000ffff91de7010: ldr x14, [x24, #16] 0x0000ffff91de7014: cmp x13, x14 0x0000ffff91de7018: b.ne 0x0000ffff91de70a4 0x0000ffff91de701c: ldr x13, [x23, apache#24] // unroll-4 0x0000ffff91de7020: ldr x14, [x24, apache#24] 0x0000ffff91de7024: cmp x13, x14 0x0000ffff91de7028: b.ne 0x0000ffff91de70b0 0x0000ffff91de702c: add w7, w7, #0x20 0x0000ffff91de7030: cmp w7, w11 0x0000ffff91de7034: b.lt 0x0000ffff91de6fe0 ``` ### What changes were proposed in this pull request? A trivial change to replace loop index `i` of method `arrayEquals` from `int` to `long`. ### Why are the changes needed? To improve performance and fix a possible bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#49568 from cyb70289/arrayEquals. Authored-by: Yibo Cai <cyb70289@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com>
No description provided.