Skip to content

data source v2 prototype #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

data source v2 prototype #8

wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Owner

No description provided.

@cloud-fan cloud-fan force-pushed the data-source-v2 branch 2 times, most recently from 39036b9 to bc078e1 Compare July 24, 2017 12:34
@cloud-fan cloud-fan force-pushed the data-source-v2 branch 2 times, most recently from 8f59eaa to 4d37934 Compare August 8, 2017 17:25

public interface FilterPushDownSupport {
// TODO: shall we introduce the concept of "unhandledFilters"? Personally I feel it's not very
// useful and we can always execute the filter again at Spark side.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine unless the filter is very expensive (for example if it is some kind of UDF).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the public Filter interface, we can only push down very simple filters, so this should not have performance issues. But we may need to add unhandledFilters to CatalystFilterPushDownSupport, which support arbitrary expressions and may have very expensive predicates.

package org.apache.spark.sql.sources.v2.reader;

public interface ColumnPruningSupport {
void pruneColumns(String[] requiredColumns);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work with nested columns?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question. It will be hard/tricky to support nested column pruning with this API, how about we use requiredSchema: StrcutType as parameter?

* @param partitionColumns The columns for hash partition keys.
* @param hashFunction the hash function, can be `murmur3` or `hive`.
*/
void pushDownHashPartition(String[] partitionColumns, String hashFunction);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we keep the hashFunction parameter or have 2 interfaces: DefaultHashPartitionPushDownSupport and HiveHashPartitionPushDownSupport?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* required columns during scan.
*/
public interface ColumnPruningSupport {
void pruneColumns(String[] requiredColumns);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be hard/tricky to support nested column pruning with this API, how about we use requiredSchema: StrcutType as parameter? cc @marmbrus

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could work.


/**
* The main interface and minimal requirement for data source v2 implementations. Users can mix in
* more interfaces to implement more functionalists other than just scan.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functionalists -> functions

*/
DataSourceV2Reader getReader(
StructType schema,
Map<String, String> options);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd create an abstraction to make it clear this is case insensitive.

*/
DataSourceV2Writer getWriter(SaveMode mode, Map<String, String> options);

// TODO: shall we force users to implement this?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should

* careful when reading these options in the implementations.
* @return a writer that implements the actual write logic.
*/
DataSourceV2Writer getWriter(SaveMode mode, Map<String, String> options);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getWriter -> createWriter

* careful when reading these options in the implementations.
* @return a reader that implements the actual read logic.
*/
DataSourceV2Reader getReader(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getReader -> createReader

* sources may have to infer the schema and reject any user specified schemas, they can overwrite
* this method to achieve this.
*/
default boolean allowUserSpecifiedSchema() {
Copy link

@rxin rxin Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow -> allows, or accepts

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or supportsUserDefinedSchema

* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to propagate size
* information to Spark.
*/
public interface SizeEstimationSupport {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe generalize it to statistics support, and look for the useful statistics ....

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the name more general? StatsEstimationSupport

* The actual schema of this data source reader, which may be different from the physical schema
* of the underlying storage, as column pruning or other optimizations may happen.
*/
public abstract StructType schema();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's unclear whether this is the returned data's schema, or the full schema.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about renaming it to actualSchema?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readSchema?

* A generator that returned by a data source reader and is responsible to generate data for an RDD
* partition.
*/
public interface DataGenerator<T> extends Serializable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"generator" is not a great name here since it almost implies the data is just generated rather than being scanned (read).

Copy link

@rxin rxin Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK after reading through all the code I'd call this DataReadTask, or just ReadTask

* at least implement the full scan logic, users can mix in more interfaces to implement scan
* optimizations like column pruning, filter push down, etc.
*/
public abstract class DataSourceV2Reader {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we put the rest of the interfaces for enhancing this reader in one file? we can also get rid of the reader package.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually after thinking more about it, i think this is fine.

* output.
*/
// TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row.
public abstract List<DataGenerator<Row>> readRows();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we deal with the case in which a data source can read some schema in column format, and some cannot? do we require the data source itself to handle that inside and do the conversion from row to column when needed?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users implement the ColumnarReadSupport, they have to output columnar data for all required columns. If some columns are not in columnar format, the implementation itself is responsible to handle it inside and do the conversion.

Alternatively, we can add one more method in ColumnarReadSupport: boolean supportColumnar(). By default it's true, but users can overwrite it according to the required schema, to decide to do this scan in columnar fashion or row fashion.

Copy link

@rxin rxin Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add the supportsColumnarReads or supportsColumnBatch method

* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-partition
* the data and avoid shuffle at Spark side.
*/
public interface HashPartitionPushDownSupport {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a weird one and i'm not sure it can be useful as is. there are typically two cases:

  1. the data source is already partitioned by a key, and as a result Spark doesn't need to do a shuffle.
  2. the data source is capable of doing shuffle by arbitrary column itself.

This interface is designed for case 2, but case 1 is far more common than case 2.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it's designed for case 1...

At the beginning, I was thinking of doing hash partitioning propagation, which looks more natural to support case 1. However, some data sources may be partitioned in multiple ways, e.g. it's hash partitioned by a, b, and also hash partitioned by c, d. It will be a little messy to ask a data source to propagate all the possible hash partitionings.

Instead, I think it can be simpler if we do hash partitioning push down. The implementation just need to tell us if it can do this hash partitioning or not. That said, this method should return a boolean instead of void.

*/
DataSourceV2Writer getWriter(SaveMode mode, Map<String, String> options);

// TODO: shall we force users to implement this?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd force them to implement everything. When implementing a data source this is really trivial if it is no-op. Forcing them to implement it would make it less likely for somebody to forget.

import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.sources.v2.reader.{DataSourceV2Reader, SizeEstimationSupport}

case class DataSourceV2Relation(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we be able to implement v1 using v2?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. For data source v1, users give us RDD directly, and I'm not sure how to turn a RDD into a list of ReadTask.

}
}

class JavaAdvancedDataGenerator implements DataGenerator <Row> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix the space around <Row>

import org.apache.spark.sql.Row;
import org.apache.spark.sql.test.TestSparkSession;

public class JavaDataSourceV2Suite {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to write the test suite itself in scala since scala test has better reporting, etc


/**
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to only read
* required columns during scan.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comment about nested columns so it is clear that this is not just for top level pruning.

write(encoder.fromRow(row));
}

public abstract void commit();
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return a message

* output.
*/
// TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row.
public abstract List<ReadTask<Row>> readRows();
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Array.

Copy link
Owner Author

@cloud-fan cloud-fan Aug 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readRows -> getReadTasks

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get -> create, since it might be expensive. get suggests that it is cheap

public interface LimitPushDownSupport {
// todo: shall we make this method return boolean, so that we can safely avoid to re-do the limit
// at Spark side? Personlly I think we don't need to bother as limit is very cheap at Spark side.
void pushDownLimit(int limit);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return boolean

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add TopKPushDown

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need that? if we push both sort and limit down, it is topk isn't it?

@Experimental
@InterfaceStability.Unstable
public interface CatalystFilterPushDownSupport {
void pushDownCatalystFilters(Expression[] filters);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unHandledFilters

* Whether or not this data source can accept user specified schema. When Spark scans a data
* source, users can specify the schema to avoid expensive schema inference. However some data
* sources may have to infer the schema and reject any user specified schemas, they can overwrite
* this method to achieve this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Also, describe the meaning of the returned boolean by adding @return?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the first sentence Whether or not this data source can accept user specified schema already described the meaning?


/**
* A mix in interface for `DataSourceV2`. Users can implement this interface to provide data writing
* ability with job level transaction. The writing procedure is:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job-level transaction support

* 3. for each partition, if all data are written successfully, call writer.commit().
* 4. for each partition, if exception happens during the writing, call writer.abort().
* 5. if all partitions are written successfully, call WritableDataSourceV2.commit().
* 6. if some partitions are failed, call WritableDataSourceV2.abort().

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are failed -> failed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean some partitions abort? We will hold until all the partitions return results (either commit or abort)?


/**
* Inside Spark, the data is `UnsafeRow` and will be converted to `Row` before sending it out of
* Spark and writing to data source. The avoid this conversion, implementations can overwrite

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The avoid -> To avoid

/**
* Inside Spark, the data is `UnsafeRow` and will be converted to `Row` before sending it out of
* Spark and writing to data source. The avoid this conversion, implementations can overwrite
* this method and write `UnsafeRow`s directly. Note that, this is an experimental and unstable

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and write -> writes

import org.apache.spark.sql.types.StructType;

/**
* A writer which is responsible to write the data for each input partition.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is responsible to write -> that is responsible for writing

* Returns true if the implementation can handle this sample operation, so that we can reduce
* the data size to be read at the very beginning.
*/
boolean pushDownSample(double fraction);
Copy link

@gatorsmile gatorsmile Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the other parameters
seed?
withReplacement?
lowerBound?
upperBound?

Actually, sample has different algorithms. Do we need to pass the method identifier?

For example, https://www.toadworld.com/platforms/ibmdb2/w/wiki/7748.data-sampling

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark or end users need to care about sample algorithms? I think it's more flexible for data source implementations to implement sample if fraction is the only requirement.

import java.io.Serializable;

/**
* A read task that returned by a data source reader and is responsible to output data for an RDD
Copy link

@gatorsmile gatorsmile Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove that

and is responsible to output -> is responsible for outputing

/**
* The preferred locations for this read task to run faster, but Spark can't guarantee that this
* task will always be run on these locations, implementations should make sure that it can
* be run on any locations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

locations -> location

public interface ReadTask<T> extends Serializable {
/**
* The preferred locations for this read task to run faster, but Spark can't guarantee that this
* task will always be run on these locations, implementations should make sure that it can

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove be.

, implementations -> . Implementations

@cloud-fan cloud-fan closed this Aug 17, 2017
cloud-fan pushed a commit that referenced this pull request Aug 27, 2019
## What changes were proposed in this pull request?
This PR aims at improving the way physical plans are explained in spark.

Currently, the explain output for physical plan may look very cluttered and each operator's
string representation can be very wide and wraps around in the display making it little
hard to follow. This especially happens when explaining a query 1) Operating on wide tables
2) Has complex expressions etc.

This PR attempts to split the output into two sections. In the header section, we display
the basic operator tree with a number associated with each operator. In this section, we strictly
control what we output for each operator. In the footer section, each operator is verbosely
displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be
correlated by the originating expression id from its parent plan.

To illustrate, here is a simple plan displayed in old vs new way.

Example query1 :
```
EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0
```

Old :
```
*(2) Project [key#2, max(val)#15]
+- *(2) Filter (isnotnull(max(val#3)#18) AND (max(val#3)#18 > 0))
   +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)#15, max(val#3)#18])
      +- Exchange hashpartitioning(key#2, 200)
         +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21])
            +- *(1) Project [key#2, val#3]
               +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0))
                  +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>
```
New :
```
Project (8)
+- Filter (7)
   +- HashAggregate (6)
      +- Exchange (5)
         +- HashAggregate (4)
            +- Project (3)
               +- Filter (2)
                  +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (isnotnull(key#2) AND (key#2 > 0))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]

(4) HashAggregate [codegen id : 1]
Input: [key#2, val#3]

(5) Exchange
Input: [key#2, max#11]

(6) HashAggregate [codegen id : 2]
Input: [key#2, max#11]

(7) Filter [codegen id : 2]
Input     : [key#2, max(val)#5, max(val#3)#8]
Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0))

(8) Project [codegen id : 2]
Output    : [key#2, max(val)#5]
Input     : [key#2, max(val)#5, max(val#3)#8]
```

Example Query2 (subquery):
```
SELECT * FROM   explain_temp1 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp2 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp3 WHERE  val > 0) AND val = 2) AND val > 3
```
Old:
```
*(1) Project [key#2, val#3]
+- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3))
   :  +- Subquery scalar-subquery#39
   :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)apache#45])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47])
   :              +- *(1) Project [key#26]
   :                 +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2))
   :                    :  +- Subquery scalar-subquery#38
   :                    :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)apache#43])
   :                    :        +- Exchange SinglePartition
   :                    :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49])
   :                    :              +- *(1) Project [key#28]
   :                    :                 +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0))
   :                    :                    +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int>
   :                    +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int>
   +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int>
```
New:
```
Project (3)
+- Filter (2)
   +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]
===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23
HashAggregate (9)
+- Exchange (8)
   +- HashAggregate (7)
      +- Project (6)
         +- Filter (5)
            +- Scan parquet default.explain_temp2 (4)

(4) Scan parquet default.explain_temp2 [codegen id : 1]
Output: [key#26, val#27]

(5) Filter [codegen id : 1]
Input     : [key#26, val#27]
Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2))

(6) Project [codegen id : 1]
Output    : [key#26]
Input     : [key#26, val#27]

(7) HashAggregate [codegen id : 1]
Input: [key#26]

(8) Exchange
Input: [max#35]

(9) HashAggregate [codegen id : 2]
Input: [max#35]

Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22
HashAggregate (15)
+- Exchange (14)
   +- HashAggregate (13)
      +- Project (12)
         +- Filter (11)
            +- Scan parquet default.explain_temp3 (10)

(10) Scan parquet default.explain_temp3 [codegen id : 1]
Output: [key#28, val#29]

(11) Filter [codegen id : 1]
Input     : [key#28, val#29]
Condition : (isnotnull(val#29) AND (val#29 > 0))

(12) Project [codegen id : 1]
Output    : [key#28]
Input     : [key#28, val#29]

(13) HashAggregate [codegen id : 1]
Input: [key#28]

(14) Exchange
Input: [max#37]

(15) HashAggregate [codegen id : 2]
Input: [max#37]
```

Note:
I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow
would not be able to immediately incorporate the feedback. I will start to
work on them as soon as i can. Also, currently this PR provides a basic infrastructure
for explain enhancement. The details about individual operators will be implemented
in follow-up prs
## How was this patch tested?
Added a new test `explain.sql` that tests basic scenarios. Need to add more tests.

Closes apache#24759 from dilipbiswal/explain_feature.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Nov 24, 2020
cloud-fan pushed a commit that referenced this pull request Mar 17, 2022
### What changes were proposed in this pull request?

This PR aims to disable `to_timestamp('366', 'DD')` to recover `ansi` test suite in Java11+.

### Why are the changes needed?

Currently, Daily Java 11 and 17 GitHub Action jobs are broken.
- https://github.com/apache/spark/runs/5511239176?check_suite_focus=true
- https://github.com/apache/spark/runs/5513540604?check_suite_focus=true

**Java 8**
```
$ bin/spark-shell --conf spark.sql.ansi.enabled=true
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/12 00:59:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://172.16.0.31:4040
Spark context available as 'sc' (master = local[*], app id = local-1647075572229).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0-SNAPSHOT
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_322)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sql("select to_timestamp('366', 'DD')").show
java.time.format.DateTimeParseException: Text '366' could not be parsed, unparsed text found at index 2. If necessary set spark.sql.ansi.enabled to false to bypass this error.
```

**Java 11+**
```
$ bin/spark-shell --conf spark.sql.ansi.enabled=true
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/12 01:00:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://172.16.0.31:4040
Spark context available as 'sc' (master = local[*], app id = local-1647075607932).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0-SNAPSHOT
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sql("select to_timestamp('366', 'DD')").show
java.time.DateTimeException: Invalid date 'DayOfYear 366' as '1970' is not a leap year. If necessary set spark.sql.ansi.enabled to false to bypass this error.
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Test with Java 11+.

**BEFORE**
```
$ java -version
openjdk version "17.0.2" 2022-01-18 LTS
OpenJDK Runtime Environment Zulu17.32+13-CA (build 17.0.2+8-LTS)
OpenJDK 64-Bit Server VM Zulu17.32+13-CA (build 17.0.2+8-LTS, mixed mode, sharing)

$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z ansi/datetime-parsing-invalid.sql"
...
[info] SQLQueryTestSuite:
01:23:00.219 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
01:23:05.209 ERROR org.apache.spark.sql.SQLQueryTestSuite: Error using configs:
[info] - ansi/datetime-parsing-invalid.sql *** FAILED *** (267 milliseconds)
[info]   ansi/datetime-parsing-invalid.sql
[info]   Expected "java.time.[format.DateTimeParseException
[info]   Text '366' could not be parsed, unparsed text found at index 2]. If necessary set s...", but got "java.time.[DateTimeException
[info]   Invalid date 'DayOfYear 366' as '1970' is not a leap year]. If necessary set s..." Result did not match for query #8
[info]   select to_timestamp('366', 'DD') (SQLQueryTestSuite.scala:476)
...
[info] Run completed in 7 seconds, 389 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.SQLQueryTestSuite
[error] (sql / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 21 s, completed Mar 12, 2022, 1:23:05 AM
```

**AFTER**
```
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z ansi/datetime-parsing-invalid.sql"
...
[info] SQLQueryTestSuite:
[info] - ansi/datetime-parsing-invalid.sql (390 milliseconds)
...
[info] Run completed in 7 seconds, 673 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 20 s, completed Mar 12, 2022, 1:24:52 AM
```

Closes apache#35825 from dongjoon-hyun/SPARK-38534.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
cloud-fan pushed a commit that referenced this pull request Feb 27, 2024
…n properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#45214 from zhengruifeng/connect_fix_read_join.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
cloud-fan pushed a commit that referenced this pull request May 7, 2024
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.5

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46291 from zhengruifeng/connect_fix_read_join_35.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
cloud-fan pushed a commit that referenced this pull request Feb 6, 2025
This is a trivial change to replace the loop index from `int` to `long`. Surprisingly, microbenchmark shows more than double performance uplift.

Analysis
--------
The hot loop of `arrayEquals` method is simplifed as below. Loop index `i` is defined as `int`, it's compared with `length`, which is a `long`, to determine if the loop should end.
```
public static boolean arrayEquals(
    Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
  ......
  int i = 0;
  while (i <= length - 8) {
    if (Platform.getLong(leftBase, leftOffset + i) !=
        Platform.getLong(rightBase, rightOffset + i)) {
          return false;
    }
    i += 8;
  }
  ......
}
```

Strictly speaking, there's a code bug here. If `length` is greater than 2^31 + 8, this loop will never end because `i` as a 32 bit integer is at most 2^31 - 1. But compiler must consider this behaviour as intentional and generate code strictly match the logic. It prevents compiler from generating optimal code.

Defining loop index `i` as `long` corrects this issue. Besides more accurate code logic, JIT is able to optimize this code much more aggressively. From microbenchmark, this trivial change improves performance significantly on both Arm and x86 platforms.

Benchmark
---------
Source code:
https://gist.github.com/cyb70289/258e261f388e22f47e4d961431786d1a

Result on Arm Neoverse N2:
```
Benchmark                             Mode  Cnt    Score   Error  Units
ArrayEqualsBenchmark.arrayEqualsInt   avgt   10  674.313 ± 0.213  ns/op
ArrayEqualsBenchmark.arrayEqualsLong  avgt   10  313.563 ± 2.338  ns/op
```

Result on Intel Cascake Lake:
```
Benchmark                             Mode  Cnt     Score   Error  Units
ArrayEqualsBenchmark.arrayEqualsInt   avgt   10  1130.695 ± 0.168  ns/op
ArrayEqualsBenchmark.arrayEqualsLong  avgt   10   461.979 ± 0.097  ns/op
```

Deep dive
---------
Dive deep to the machine code level, we can see why the big gap. Listed below are arm64 assembly generated by Openjdk-17 C2 compiler.

For `int i`, the machine code is similar to source code, no deep optimization. Safepoint polling is expensive in this short loop.
```
// jit c2 machine code snippet
  0x0000ffff81ba8904:   mov        w15, wzr              // int i = 0
  0x0000ffff81ba8908:   nop
  0x0000ffff81ba890c:   nop
loop:
  0x0000ffff81ba8910:   ldr        x10, [x13, w15, sxtw] // Platform.getLong(leftBase, leftOffset + i)
  0x0000ffff81ba8914:   ldr        x14, [x12, w15, sxtw] // Platform.getLong(rightBase, rightOffset + i)
  0x0000ffff81ba8918:   cmp        x10, x14
  0x0000ffff81ba891c:   b.ne       0x0000ffff81ba899c    // return false if not equal
  0x0000ffff81ba8920:   ldr        x14, [x28, apache#848]      // x14 -> safepoint
  0x0000ffff81ba8924:   add        w15, w15, #0x8        // i += 8
  0x0000ffff81ba8928:   ldr        wzr, [x14]            // safepoint polling
  0x0000ffff81ba892c:   sxtw       x10, w15              // extend i to long
  0x0000ffff81ba8930:   cmp        x10, x11
  0x0000ffff81ba8934:   b.le       0x0000ffff81ba8910    // if (i <= length - 8) goto loop
```

For `long i`, JIT is able to do much more aggressive optimization. E.g, below code snippet unrolls the loop by four.
```
// jit c2 machine code snippet
unrolled_loop:
  0x0000ffff91de6fe0:   sxtw       x10, w7
  0x0000ffff91de6fe4:   add        x23, x22, x10
  0x0000ffff91de6fe8:   add        x24, x21, x10
  0x0000ffff91de6fec:   ldr        x13, [x23]          // unroll-1
  0x0000ffff91de6ff0:   ldr        x14, [x24]
  0x0000ffff91de6ff4:   cmp        x13, x14
  0x0000ffff91de6ff8:   b.ne       0x0000ffff91de70a8
  0x0000ffff91de6ffc:   ldr        x13, [x23, #8]      // unroll-2
  0x0000ffff91de7000:   ldr        x14, [x24, #8]
  0x0000ffff91de7004:   cmp        x13, x14
  0x0000ffff91de7008:   b.ne       0x0000ffff91de70b4
  0x0000ffff91de700c:   ldr        x13, [x23, #16]     // unroll-3
  0x0000ffff91de7010:   ldr        x14, [x24, #16]
  0x0000ffff91de7014:   cmp        x13, x14
  0x0000ffff91de7018:   b.ne       0x0000ffff91de70a4
  0x0000ffff91de701c:   ldr        x13, [x23, apache#24]     // unroll-4
  0x0000ffff91de7020:   ldr        x14, [x24, apache#24]
  0x0000ffff91de7024:   cmp        x13, x14
  0x0000ffff91de7028:   b.ne       0x0000ffff91de70b0
  0x0000ffff91de702c:   add        w7, w7, #0x20
  0x0000ffff91de7030:   cmp        w7, w11
  0x0000ffff91de7034:   b.lt       0x0000ffff91de6fe0
```

### What changes were proposed in this pull request?

A trivial change to replace loop index `i` of method `arrayEquals` from `int` to `long`.

### Why are the changes needed?

To improve performance and fix a possible bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49568 from cyb70289/arrayEquals.

Authored-by: Yibo Cai <cyb70289@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants