Skip to content

Prototype: Data Source V2 #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed

Conversation

cloud-fan
Copy link
Owner

TODO:

  • write path
  • session-level options
  • various operator push down support

* interface to specify the partition columns of the reader/writer, to improve performance.
*/
public interface PartitioningSupport {
void setPartitionColumns(String[] partitionColumns);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good but would this be separate from bucketing? I want to be able to dictate that a column, or group of columns a which, when equal between two rows will be found in the same task. I can't make guarantees about ordering, I can only guarantee clustering (re: Cassandra)

public interface DataSourceV2Writer {
WriteTask createWriteTask();

void commit(WriterCommitMessage[] messages);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about the implication in the api "commit", again from the Cassandra angle I can't provide a "commit" level of confidence that a write has been performed unless the end user is using a consistency level and replication factor that would insure durable writes. Basically this means an end user could request a level of write acknowledgement which wouldn't be durable and I wouldn't want to signal to any downstream apis that a "transaction" had occurred. Just a nit though

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can treat this as a Spark-level transaction. Spark successfully writing data to the data source, doesn't mean the data is persistent. Maybe it's an in-memory data source and all the data will go away when the application is stopped, or a Cassandra data source that doesn't always ensure durable writes.

But at the view of Spark, the data are successfully written.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me, I just want to make sure with all of the streaming interactions we are having we don't necessarily equate "successful" with "committed"

Copy link

@VincentPoncet VincentPoncet Nov 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan If this is about a transaction management, then the datasourceV2 should be notified before sending the data that the data are part of one transaction, by calling a beginTransaction method.

@RussellSpitzer Can't you infer a consistency level QUORUM / LOCAL_QUORUM if the Cassandra DataSourceV2 is used inside a Spark transaction through that beginTransaction/commit call ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VincentPoncet That still wouldn't really mean committed :) since there are a variety of read CL's that could miss the written data. The only way to be sure would be to force "ALL".

* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-partition
* the data and avoid shuffle at Spark side.
*
* Note that this interface is marked as unstable, as the implementation needs to be consistent

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a qualification that most sources (can any?) cannot accomplish. Very few consistent hashing sources will use the SparkSql Shuffle hash function.

Copy link
Owner Author

@cloud-fan cloud-fan Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make Spark SQL's shuffle hash function configurable, so that this interface can be more useful.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that would be neccessary? Within the existing infrastructure there is already the notion of "clustered" data which would fit the criterion that I would want to support. Consistently hashed data whose ordering is basically unknown. We could provide a "getTask/Partition function" given a row return the task?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the intent here is to expose the underlying data's structure. For example, tables that were stored with Hive bucketing could return true if partitionColumns matches how the table was bucketed, and then reduce-side tasks could take advantage of that.

If that's the intent, then I would rather see an API that allows the data source report its structure, rather than a way for the planner to ask about a specific partitioning. Also, using the term "partition" here is somewhat confusing since we're talking about RDD partitions, not table partitions, right?

* task will always run on these locations. Implementations should make sure that it can
* be run on any location.
*/
default String[] preferredLocations() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent addition

* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;
Copy link

@dongjoon-hyun dongjoon-hyun Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is v2 going to be there at the final commit?
I'm just wondering because already the names has V2 (e.g. DataSourceV2Reader, DataSourceV2Writer, and DataSourceV2) and the package name org.apache.spark.sql.sources. is different from the old one.

*/
DataSourceV2Reader createReader(
StructType schema,
CaseInsensitiveMap<String> options);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not a bit deal at this stage but I wonder if this should be just Map as I guess this is going to be exposed to public interface.

Copy link

@rxin rxin Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use a Map. It's a huge disaster in v1.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might also be data sources that expects case-sensitive key names, I think it will be good idea to pass options as user specified and let the data-source implementation handle the options as appropriate for the data source.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has bitten me in design. Not realizing all the options come through lowercased

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't do that (let the source handle case sensitivity). It will be the most confusing thing to the end user.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin, what about using an option map was a disaster in v1?

For case sensitivity, what happens elsewhere? I didn't think that options were case insensitive in Spark.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some sources interpret the options as case sensitive, and some as case insensitive. It's really confusing to the end users.

On top of that, Scala Map has really bad binary compatibility across Scala versions.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense. What are you proposing instead of a map? A configuration object that handles resolution internally?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A configuration object that handles resolution internally?

CaseInsensitiveMap does it, but we can also create a new class for this purpose

*/
@Experimental
@InterfaceStability.Unstable
public interface CatalystFilterPushDownSupport {
Copy link

@dongjoon-hyun dongjoon-hyun Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In V2, can we introduce PlanPushDownSupport, too? Oops, sorry. I found that it's documented as out of scope.

Copy link

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments on the read interfaces. I haven't looked much at the write side yet.

* A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this
* interface to specify the bucket/sort columns of the reader/writer, to improve performance.
*/
public interface BucketingSupport {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is used to set sort columns, why is it called BucketingSupport?

* false otherwise. This method might be called many times if more than one filter need to be
* pushed down.
*
* TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for allowing residuals.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this - see conclusion comment but I don't think we want to allow the current method.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both the current method and the one in TODO support residuals, it's just one-by-one style vs batch style.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But one by one is potentially far harder to implement. Suppose I have a notional table which looks like

a b
---
1 2
1 2
---
1 3
1 3
---
2 2
2 2

and I've kept some amount of metadata per two rows which allows me to skip blocks of two rows which don't contain any data.

SELECT * FROM table WHERE a = 1 and b = 2, I can either push down the first filter first, then the second filter.

If I apply the filters 1 at a time, I see

after a = 1

some rows match
some rows match
no rows match

so I cannot say I can handle it.

Now after b = 2, I see

some rows match
no rows match
some rows match

so I cannot say I can handle it either.

However, if I apply a = 1, b = 1 at the same time, I find that I get

all rows match
no rows match
no rows match

and I can handle the filter and avoid performance overheads. It is strictly more powerful to allow me to pick and choose which filters I can handle, rather than forcing me to build it up incrementally in an order spark determines.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you get similar things when you start considering combinations of filters and aggregations or sorts. For example, if I have a filter, I may be able to assert a sort order depending on what order the sort is applied.

E.g. if I do (at the moment)

.repartition("col1").sortWithinPartitions("col2", "col3") and then filter on equality in col1, I can assert that I am sorted on col1, col2, col3, or col2, col3, or col2 easily - but depending on what order you give me this information I might not be able to push this down.

*/
public interface ColumnPruningSupport {
/**
* Returns true if the implementation can apple this column pruning optimization, so that we can
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should be "apply" not "apple"

* Returns true if the implementation can apple this column pruning optimization, so that we can
* reduce the data size to be read at the very beginning.
*/
boolean pruneColumns(StructType requiredSchema);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why allow the implementation to reject the required schema? If the data source supports column pruning, then it should throw an exception if the columns passed by this method are invalid (with more context). Otherwise, it should do the pruning. The only case where I think this might be valid is if there are some columns that can't be pruned, in which case it should return the actual schema that will be produced instead of rejecting by returning false. But then, the actual read schema is accessible from DataSourceV2Reader#readSchema.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed here!

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't have a use case for rejecting the required schema, just wanna follow other push down interface to return a boolean. I'm fine to return void here.

* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-partition
* the data and avoid shuffle at Spark side.
*
* Note that this interface is marked as unstable, as the implementation needs to be consistent
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the intent here is to expose the underlying data's structure. For example, tables that were stored with Hive bucketing could return true if partitionColumns matches how the table was bucketed, and then reduce-side tasks could take advantage of that.

If that's the intent, then I would rather see an API that allows the data source report its structure, rather than a way for the planner to ask about a specific partitioning. Also, using the term "partition" here is somewhat confusing since we're talking about RDD partitions, not table partitions, right?

* A read task returned by a data source reader and is responsible for outputting data for an RDD
* partition.
*/
public interface ReadTask<T> extends Serializable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use standard interfaces like Iterable and Iterator or at least the same pattern? This interface mixes the two together by having both open and next/get. That makes it easy to forget to call open and prevents the ReadTask from being used more than once (unless it has been serialized to two places). I would expect ReadTask to be like an Iterable so that it can be reused if needed, and for it to return an equivalent of Iterator that actually does the read and tracks state. Requesting the Iterator from a task opens it, so you never forget.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these kinds of Serializable interfaces, it can be a pain to implement because you end up needing to make all of your data access objects serializable (or construct them all in the open method, which is also quite sad, for reasons @rdblue notes). In datasources V1, we've used a pattern where you include a serializable datastructure that contains enough information to construct your objects properly (so for example, the params map is serializable).

Ideally we could have something similar here; what if ReadTask<T> extends Serializable and has a method which returns a closeable Java 8 spliterator - a datastructure which has a similar interface to your Iterator-like thing, but is easier to implement safely (don't need to do a while (it.next()) { doSomething(it.get() }, can just do split.forEachRemaining(this::doSomething) or easily convert it into an iterator (with very little overhead) at a higher level.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should make ReadTask like an Iterable, and can return an Iterator. But I can't find a standard closable iterator interface in JDK, shall we create one here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just return an object with an interface like:

interface Reader<T> extends Spliterator<T>, Closeable {}

(spliterator is a much nicer interface than iterator to implement)

or is the issue here that we can't use Java 8?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use Closeable, which requires that multiple calls to close are safe. The iterator should close itself when it is exhausted, hasNext should return false, and NoSuchElementException should be thrown by next as required by Iterator.

* Returns true if the implementation can handle this sample operation, so that we can reduce
* the data size to be read at the very beginning.
*
* TODO: shall we add more parameters like seed, lowerBound, upperBound, etc.?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for seed.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seed is a must. i wouldn't do lowerbound / upperbound ... in virtually all cases people just do sample without replacement, so a fraction would be sufficient.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also document the value range of "fraction".

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also document the assumptions that Spark has for a sample.

You can imagine a situation where I know the number of records ahead of time, so I turn pushDownSample into a limit(fraction*numRecords), which wouldn't be correct. Maybe this should instead rely on reporting bucketing to the planner?

public interface SortPushDown {
/**
* Returns true if the implementation can handle this sorting requirement and save a sort
* operation at Spark side.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the discussion on hash support, I would rather see this the other way around, where the data source can reports its underlying sort order. Maybe there are some sources that can perform a sort before passing the data to Spark, but its hard to know when it should. I think the more useful case is when the data is already sorted.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I agree, shall we distinguish global sort and per-partition sort?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave this out of the first iteration and add it later, but some thoughts: specify the preferred sort orders, and the data source can return the sortedness.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-partition Sort would be great, We usually can't handle a Global sort but there are some per-partition sorts (By partition here I mean SQL Style Partitioning) we can handle.

SELECT pkey, RANK() OVER (PARTITION BY pkey ORDER BY clustCol) FROM ks.tab

Like in this case if pkey is the C* partition key then there is a natural ordering we can take advantage of when returning values.

Copy link

@RussellSpitzer RussellSpitzer Sep 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note @brianmhess wrote that sql, I don't know SQL like him and he is great.

*/
public interface StatisticsSupport {
// todo: shall we add more statistics? what do we want?
long sizeInBytes();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be more clear that the statistics returned should be for the reader as configured with partition pruning or predicate push-down. That would make this more useful for things like deciding whether a read should be used for a broadcast join.

I'd also include # of rows for the read, plus a few stats for each column: # values, # nulls, NDV, and some form of histogram (TBD).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method ideally would return an object - as it is we can't add more statistics without them having default methods which returns optional. It'd be good to have a Statistics object (like in catalyst), which is at least as expressive as the Catalyst one (so we can where possible plug it through to Catalyst) and then when we add new methods we can add them to the object to avoid polluting the main implementation (if you add 30 potential types of statistics your base datasource will have 30 methods).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we create a Statistics object and use it in the interface, then it becomes public and changing the Statistics object will break binary compatibility. I suggest we add some basic but useful statistics here via different methods, and create new interfaces if we wanna add more in the future.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's quite correct. In Java 8 we can add default methods which do not affect binary compatibility, if it's an interface. In all versions of Java, you can add a bean implementation which takes optional values.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, put differently, I don't see what the difference is for binary compat between:

interface MyReader {
    OptionalLong getSize();
    OptionalLong getRows();
    OptionalLong getDistinctValues(String columnName);
}
interface MyReader {
    Statistics getStatistics();
}

interface Statistics {
    OptionalLong getSize();
    OptionalLong getRows();
    OptionalLong getDistinctValues(String columnName);
}

in terms of backcompat?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it would avoid interface bloat

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's also the implementation detail that it's probably simpler for me to get you all the statistics than it is to compute them one at a time. E.g. if I store my statistics in a remote store, than if I want to implement the stats methods efficiently I need to write a cache for statistics so I only get them once (which changes if anyone adds another filter, so that's not so easy), whilst if I return a stats object I can return you an immutable object which is valid at the time it was created.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@cloud-fan
Copy link
Owner Author

@RussellSpitzer @rdblue thanks for your suggestion! I generalized hash partition push down to clustering push down. Spark doesn't really care how the data is partitioned, hash or range, the only requirement is about clustering.

/**
* Return the inferred schema of this data source given these options.
*/
StructType inferSchema(CaseInsensitiveMap<String> options);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should have a better name. Data sources with a metadata store won't infer anything, this should come from the table's metadata.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about getSchema?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for getSchema

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use a different name other than "get", since get is usually very cheap, whereas here it can potentially be very expensive. Maybe computeSchema ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about get. If we add an interface to signal schema-on-read behavior, we could also move inferSchema there. So this would only be used for data sources with a fixed schema. The down side to that approach is that tables have to implement one interface or the other.

@cloud-fan
Copy link
Owner Author

The last comment adds more colors to the clustering push down, thinking about joining a Cassandra table and an HBase table in Spark, assuming both Cassandra table and HBase table can satisfy the clustering requirement according to the join keys, Spark still can't perform the join because I believe Cassandra and HBase have different partitioners.

For this case, we can just re-shuffle both join sides and cancel the clustering push down, or we could introduce the Partitioner concept, and shuffle the Cassandra table with HBase partitioner or in reverse, so that we can still avoid shuffle for one side.

Any thoughts?

@RussellSpitzer
Copy link

I'm fine with adding a full "partitioner" concept. Doing a one sided shuffle seems better than shuffling both sides. I thought this is how the basic "RDD" join works. It checks whether one side has defined a partitioner and shuffles the other side if it doesn't.

We already did a lot of this work for the RDD world so it wouldn't be all that different for Dataframes.

Copy link

@j-baker j-baker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general conclusion I’ve come to here is that this is very hard to actually implement (in a similar but more aggressive way than DataSource V1, because of the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the filters with the buildScan method, and then passed in again with the unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the current API does not handle well. Suppose I can handle filter A some of the time, and filter B some of the time. If I’m passed in both, then either A and B are unhandled, or A, or B, or neither. The work I have to do to work this out is essentially the same as I have to do while actually generating my RDD (essentially I have to generate my partitions), so I end up doing some weird caching work.

This V2 API proposal has the same issues, but perhaps moreso. In PrunedFilteredScan, there is essentially one degree of freedom for pruning (filters), so you just have to implement caching between unhandledFilters and buildScan. However, here we have many degrees of freedom; sorts, individual filters, clustering, sampling, maybe aggregations eventually - and these operations are not all commutative, and computing my support one-by-one can easily end up being more expensive than computing all in one go.

For some trivial examples:

  • After filtering, I might be sorted, whilst before filtering I might not be.
  • Filtering with certain filters might affect my ability to push down others.
  • Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query plan to the datasource which enables this kind of replacement. Explicitly don’t want to give the whole query plan - that sounds painful - would prefer we push down only the parts of the query plan we deem to be stable. With the mix-in approach, I don’t think we can guarantee the properties we want without a two-phase thing - I’d really love to be able to just define a straightforward union type which is our supported pushdown stuff, and then the user can transform and return it.

I think this ends up being a more elegant API for consumers, and also far more intuitive.

*/
@Experimental
@InterfaceStability.Unstable
public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd continue to be able to return InternalRows here. It's sometimes possible (with knowledge of filters, etc) to provide a more efficient implementation than using UnsafeRow; I should be able to provide my own (more efficient) implementation.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside Spark the input rows must be UnsafeRow. If we allow users to provide InternalRow here, we need an extra project to turn InternalRow to UnsafeRow at Spark side.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for InternalRow then? It's better to operate in terms of interfaces instead of final classes. I am trying to find the place where spark returns UnsafeRow explicitly on read/write and can only find InternalRow in signatures.

Should we make ExpressionEncoder#toRow return UnsafeRow? It's quite misleading otherwise

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that you indicate whether you need the conversion into unsafe rows by having your BaseRelation (in datasources v1) say that needsConversion is true (and then spark will copy the rows if a shuffle is required). Am I reading this incorrectly?

I can see in GenerateUnsafeProjection that we are returning UnsafeRows, but this seems like a different approach which always involves a copy. In what case (with the current APIs) will my returning InternalRows result in an additional copy over my returning UnsafeRows.

An additional thing is that it's far easier for me to create an InternalRow than it is for me to create an UnsafeRow in my codebase.

Imagine I have a collection of list like things representing columns - it is trivial for me to create an InternalRow implementation that efficiently access them and avoids copies. It is much less trivial for me to create an unsafe row with them all in, as in, the code ends up being harder to implement (I have to start worrying about memory allocation, buffer sizes, batching).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark makes a contract that, the data exchanged between operators must be UnsafeRow, except for some special typed operators, this is the reason why we still use InternalRow in the signature, and I agree it's confusing.

To avoid copy, you have to return UnsafeRow, returning InternalRow is same as returning the Row.

For data source v1, needConversion can not avoid copy, you can look at RowDataSourceScanExec.doExecute, we have an extra conversion to turn it into unsafe rows.

* A read task returned by a data source reader and is responsible for outputting data for an RDD
* partition.
*/
public interface ReadTask<T> extends Serializable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these kinds of Serializable interfaces, it can be a pain to implement because you end up needing to make all of your data access objects serializable (or construct them all in the open method, which is also quite sad, for reasons @rdblue notes). In datasources V1, we've used a pattern where you include a serializable datastructure that contains enough information to construct your objects properly (so for example, the params map is serializable).

Ideally we could have something similar here; what if ReadTask<T> extends Serializable and has a method which returns a closeable Java 8 spliterator - a datastructure which has a similar interface to your Iterator-like thing, but is easier to implement safely (don't need to do a while (it.next()) { doSomething(it.get() }, can just do split.forEachRemaining(this::doSomething) or easily convert it into an iterator (with very little overhead) at a higher level.

import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;

public class RowToUnsafeRowReadTask implements ReadTask<UnsafeRow> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, again, could this be InternalRow?


@Override
public UnsafeRow get() {
return (UnsafeRow) encoder.toRow(rowGenerator.get());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already an InternalRow :)

* Returns true if the implementation can handle this sorting requirement and save a sort
* operation at Spark side.
*/
boolean pushDownSort(String[] sortingColumns);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want more specification here - ascending, descending, nulls first, last

public interface BucketingSupport {
void setBucketColumns(String[] bucketColumns, int numBuckets);

void setSortColumns(String[] sortColumns);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see elsewhere, want asc/desc, nulls first/last.


/**
* A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this
* interface to specify the partition columns of the reader/writer, to improve performance.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this on the reader side. It feels like this is valuable when we have a FileBasedDataSource, but with the rest of this stuff this should easily be satisfiable with the rest of the APIs, right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a data source may have metadata and keep the partitioning/bucketing information, but it may not have metadata and need to do expensive inference to retrieve partitioning/bucketing information. By mixing BucketingSupport and PartitioningSupport, a data source can get the partitioning/bucketing information directly, to avoid potential expensive inference.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird. Yeah, this feels very retrofitted towards FileBasedDataSources where you don't have any kind of metastore.

Ideally we'd be able to put this a level lower, right (provide a class which can handle storing and retrieving this information in the right places)? Like, this doesn't obviously feel like the right separation of concerns, the notion of Spark providing you with optional extra information is a little weird IMO.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd be able to put this a level lower

+1, I think it's doable to ask the parquet data source to talk with the metastore and get the partitioning/bucketing information.

* false otherwise. This method might be called many times if more than one filter need to be
* pushed down.
*
* TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this - see conclusion comment but I don't think we want to allow the current method.

* Cancel this clustering push down. This will be called if Spark finds out that we can't avoid
* the shuffle after we push down the clustering.
*/
void cancel();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we'd just have a 'setClustering' and 'getSupportedClusterings' methods

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea looks better.

*/
public interface LimitPushDownSupport {
/**
* Returns true if the implementation can handle the limit operation, so that we can reduce
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should clear up the language around 'can handle' here. After saying I can handle stuff, is that saying I should? It's clear from context, but the Javadoc is super ambiguous.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can handle means the data source will do the limit itself, Spark doesn't need to do limit again. I'll polish these documents.

*/
@Experimental
@InterfaceStability.Unstable
public interface CatalystFilterPushDownSupport {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean pushDownCatalystFilter(Expression filter);

This interface is very nice. Just wondering, for data source to implement this method , is it ok for implementation to access sub types of Expression in Spark , for example functions like Abs ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementations can pattern match the given expression and access any subtype you like.

@cloud-fan
Copy link
Owner Author

cloud-fan commented Aug 29, 2017

@j-baker thanks for your feedback! I think we need more discussion about the current interface mix-in framework vs the query plan push down framework.

current interface mix-in framework
Pros:

  1. easy to extend, just add new interfaces for new functionalities.
  2. easy to maintain, we can still keep binary compatibility while adding more interfaces.
  3. easy to use it at Spark side. By looking at which interfaces a data source implements, Spark can easily decide which part of the query plan can be pushed down and which part should remain at Spark side.
  4. java friendly.

Cons:

  1. like @j-baker pointed, it's hard to implement if you mix in a lot of interfaces, because you need to cache a lot of stuff. It can get worse if different push downs interact with each other and you need to go back and revert something.

query plan push down framework
Pros:

  1. push down everything at once, so it's easier to deal with the case that different push downs interact with each other.
  2. the interface will be very simple.

Cons:

  1. Hard to keep backward compatibility. Ideally we don't want to push down the whole query plan to data source, and only push down parts of it we deem to be stable. But later if we think more plans can be stable and pushed to data source, we can't just push it, because old data source implementations may not expect to see these new plans.
  2. java friendly? It's easy to transform query plan in scala, but for java, I think we can only come up with something like the visitor pattern. Using a visitor, I think we will face the same issue as the interface mix-in framework. In the visitor, you visit query plan node one by one, so the visitor need to maintain a state during the transformation.


/**
* A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this
* interface to specify the bucket/sort columns of the reader/writer, to improve performance.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhhhhh, so on write and read this is letting Spark assert that the data are bucketed in a certain way.


/**
* A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this
* interface to specify the partition columns of the reader/writer, to improve performance.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird. Yeah, this feels very retrofitted towards FileBasedDataSources where you don't have any kind of metastore.

Ideally we'd be able to put this a level lower, right (provide a class which can handle storing and retrieving this information in the right places)? Like, this doesn't obviously feel like the right separation of concerns, the notion of Spark providing you with optional extra information is a little weird IMO.

* false otherwise. This method might be called many times if more than one filter need to be
* pushed down.
*
* TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But one by one is potentially far harder to implement. Suppose I have a notional table which looks like

a b
---
1 2
1 2
---
1 3
1 3
---
2 2
2 2

and I've kept some amount of metadata per two rows which allows me to skip blocks of two rows which don't contain any data.

SELECT * FROM table WHERE a = 1 and b = 2, I can either push down the first filter first, then the second filter.

If I apply the filters 1 at a time, I see

after a = 1

some rows match
some rows match
no rows match

so I cannot say I can handle it.

Now after b = 2, I see

some rows match
no rows match
some rows match

so I cannot say I can handle it either.

However, if I apply a = 1, b = 1 at the same time, I find that I get

all rows match
no rows match
no rows match

and I can handle the filter and avoid performance overheads. It is strictly more powerful to allow me to pick and choose which filters I can handle, rather than forcing me to build it up incrementally in an order spark determines.

*/
@Experimental
@InterfaceStability.Unstable
public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that you indicate whether you need the conversion into unsafe rows by having your BaseRelation (in datasources v1) say that needsConversion is true (and then spark will copy the rows if a shuffle is required). Am I reading this incorrectly?

I can see in GenerateUnsafeProjection that we are returning UnsafeRows, but this seems like a different approach which always involves a copy. In what case (with the current APIs) will my returning InternalRows result in an additional copy over my returning UnsafeRows.

An additional thing is that it's far easier for me to create an InternalRow than it is for me to create an UnsafeRow in my codebase.

Imagine I have a collection of list like things representing columns - it is trivial for me to create an InternalRow implementation that efficiently access them and avoids copies. It is much less trivial for me to create an unsafe row with them all in, as in, the code ends up being harder to implement (I have to start worrying about memory allocation, buffer sizes, batching).

*/
public interface StatisticsSupport {
// todo: shall we add more statistics? what do we want?
long sizeInBytes();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's quite correct. In Java 8 we can add default methods which do not affect binary compatibility, if it's an interface. In all versions of Java, you can add a bean implementation which takes optional values.

* The actual write logic should be implemented here. To correctly implement transaction,
* implementations should stage the writing or have a way to rollback.
*/
public abstract void write(Row row);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thing - I feel like the 'initialize' method is literally present to construct the converter.

In practice, StructType is serializable and we know it when producing the tasks. I think that we should not have an initialize method and should instead have it be a constructor arg of RowWriteTask.

The other thing is related to a concern I had wrt ReadTask - where it's typically a lot easier to ship the config required to construct something than it is to ship the actual writer objects (and I don't really want an initialize method - it makes it harder to guarantee that my objects are in a safe state).

What would be awesome is if WriteTask were more of a factory class, and if it could be made to return something that contains the 'write, commit, abort' methods (maybe have WriteTask have a method which returns a Writer?).

/**
* Similar to `DataSourceV2Reader.createReadTasks`, but return data in columnar format.
*/
List<ReadTask<ColumnarBatch>> createColumnarReadTasks();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if I can support columnar read for all columns, then why make me implement the row based interface?

* false otherwise. This method might be called many times if more than one filter need to be
* pushed down.
*
* TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you get similar things when you start considering combinations of filters and aggregations or sorts. For example, if I have a filter, I may be able to assert a sort order depending on what order the sort is applied.

E.g. if I do (at the moment)

.repartition("col1").sortWithinPartitions("col2", "col3") and then filter on equality in col1, I can assert that I am sorted on col1, col2, col3, or col2, col3, or col2 easily - but depending on what order you give me this information I might not be able to push this down.

* This method will be called after finishing this read task, users can overwrite this method
* and put clean up logic here.
*/
default void close() {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally this interface would extend closeable so that in tests we can do try-with-resources.

* A read task returned by a data source reader and is responsible for outputting data for an RDD
* partition.
*/
public interface ReadTask<T> extends Serializable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just return an object with an interface like:

interface Reader<T> extends Spliterator<T>, Closeable {}

(spliterator is a much nicer interface than iterator to implement)

or is the issue here that we can't use Java 8?

@j-baker
Copy link

j-baker commented Aug 29, 2017

One thing I'd like to note here is that we don't consider the pain of serialization, which leads to sad init logic.

At the moment, the read and write tasks need to be serializable, and also directly do the work of getting and putting data. In practice, making an object graph be serializable is unpleasant (especially in Java 8, where lambdas end up being a frequent source of bugs), and so in general we've ended up serializing the objects required in order to construct the object graph - i.e. instead of serializing a complicated client, serialize () -> ComplicatedClient.construct(serializableParams) - the information required to create it.

However, this would make our impl of the read and write tasks look something like

class MyReader {
    private final ImmutableMap<String, String> params;
    private MyActualReader actualReader;

    public void initialize(StructType schema) {
        actualReader = MyActualReader.construct(params);
    }

    public T next() {
         return actualReader.next();
    }
}

which to me is a code smell. I'd much prefer that the tasks have no init method, and have a method which returns the actual reader, e.g.

class MyReader {
    private final ImmutableMap<String, String> params;

    public ActualReader reader() {
         return MyActualReader.construct(params);
    }
}

which both reduces the ability for my class to end up in a broken state because someone forgot to call initialise, and reduces the extent to which business logic ends up intertwined with the (essentially) scheduling information.

This is just a thing we've found generally useful as a pattern when interacting with the current Spark code.

* this method to achieve this.
*/
default boolean acceptsUserDefinedSchema() {
return true;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a boolean method instead of a UserDefinedSchema interface like the other traits? Also, doesn't the createReader method assume this since it passes the physical schema to the data source?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained in the doc, we have 3 requirements for the schema inference feature:

  1. the user-specified schema is required
  2. a user specified schema is not allowed and the schema is automatically inferred
  3. the user-specified schema is respected, and if unavailable, the schema can also be automatically inferred.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a longer comment above. I didn't get what you were doing with these interfaces at first since I think of most data sources having a schema instead of being schema-on-read.

/**
* The main entrance for read interface.
*
* @param schema the full schema of this data source reader. Full schema usually maps to the
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the schema should be handled a little differently. We should assume that a data source has a schema that applies to all of its data, instead of passing one in. That's the case with most datasets, like Hive tables, relational tables, and Cassandra tables. I'd also argue that it is a best practice so we don't have expensive operations to infer the schema, like we do for non-Metastore tables.

So instead of passing the full schema here and having an interface to infer a schema, the data source should be expected to report its schema while analyzing the query (which may require inference). Then the schema passed to create a reader should be the expected schema, or projection schema after the query is optimized.

Also, I'd like to understand how column IDs should be handled. I'm building a data source that uses column IDs to implement schema evolution that supports, add, delete, and rename. The column IDs for a table are only unique within that table (another table can reuse them), so it doesn't necessarily make sense to expose them to Spark. Spark will still need its own unique attribute IDs. But, I'd prefer to have Spark request columns by ID rather than by name so that Spark handles name resolution between a query and the data source.

I think what makes sense is for the data source to create a Spark schema with attributes that Spark will use in the projection schema. So if my data source has columns 0:x and 1:y, I create a Spark schema with those columns and they are assigned Spark attribute IDs in the process, say x#67 and y#68. When Spark requests a reader, the projection schema uses those attributes, which I can then map back to internal IDs. Spark should call createReader with a schema containing x#67, not x#104. Sound reasonable?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of passing the full schema here and having an interface to infer a schema, the data source should be expected to report its schema while analyzing the query (which may require inference).

I'm +1 on it if we can rewrite the data source part from scratch...
But now, since users can specify a schema in DataFrameReader, which means the data source should be able to take a user specified schema.

For the column id stuff, I don't totally understand it. Attribute id is internal to Spark, Spark only uses column names when interacting with data sources, that's why we only use StructType in various APIs.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue

"We should assume that a data source has a schema that applies to all of its data, instead of passing one in. That's the case with most datasets, like Hive tables, relational tables, and Cassandra tables."

This is not true though. Hive table, for example, the schema is specified in the catalog, which means it has to be passed into the underlying file-based data source. The file-based data source itself is actually not dictating the schema.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true though. Hive table, for example, the schema is specified in the catalog, which means it has to be passed into the underlying file-based data source.

I think what I didn't understand is that the datasource API is between the metastore and the files in a table. So the metastore is queried to get a schema, then the data files are treated as schema-on-read. But why should that be the case? I would rather be able to have Hive tables entirely behind the DataSource API, so they can be handled the same way as a JDBC table or another source with a pre-determined schema for its data.

With that missing context, I can see how the interface proposed here looks designed for schema-on-read, rather than for tables that already have a fixed schema: it assumes that the data source will commonly use an externally provided schema, can't necessarily infer one (e.g. JSON), and the projection is separate from the data source schema. I think it's reasonable to consider whether this API should be based on schema-on-read or should assume that the data source has a schema along the lines of what I was suggesting before.

The proposal doc lists 3 cases: an external schema is required, schema is fixed and a user schema is not allowed, and a user schema is optional because it can be inferred.

For sources with fixed schemas, DataSourceV2SchemaProvider#inferSchema is required, but that also pulls in acceptsUserDefinedSchema that defaults to true. A fixed-schema source also has to implement createReader with a "full schema of the data source" passed in. Those two imply that fixed-schema sources should support schema-on-read, but I think that's a bad idea. The problem is similar to why we don't want to pass a map of options: it makes the source implementation decide how to reconcile the user-defined schema with the source schema, and that probably won't be consistent across sources. I think that's a good reason to separate out the methods needed for schema-on-read to a different interface.

What about a SchemaOnRead interface that can be used to set the schema instead of passing it to the createReader method? Then acceptsUserDefinedSchema can be dropped from the provider interface because it is signalled by SchemaOnRead. (I'd also rename inferSchema to getSchema.) Then, createReader here would drop the schema argument. We could also have a createReader with a schema argument that passes the requested projection; that's where I would expect to pass the requested schema.

For the other two cases, the proposed API works okay. It's a little strange that those that require a user schema won't implement DataSourceV2SchemaProvider, which is what signals that a user-defined schema is accepted even though one can be passed to any data source. A SchemaOnRead interface would be slightly better for the required case as well by cleaning this up.

@rxin
Copy link

rxin commented Aug 31, 2017

Hey so Wenchen - rather than implementing every single feature in one go, I think you should push a bare minimal one upstream first, and then think about implementing other ones. For example, I'd leave out sort pushdown, partitioning, clustering, etc.

In general we should design things so they are compatible, but with the current framework it is pretty easy to add those. Otherwise this will just keep going on and on and on and on forever. For such a large change it'd be better to break it down in piecemeal, and merge the critical infrastructure early on, rather than having them merged a week or two before code freeze and then finding out bigger issues that end up delaying the releases.

* Note that, if the implementation always return true here, then he can throw exception in
* the row based `DataSourceV2Reader.createReadTasks`, as it will never be called.
*/
default boolean supportsColumnarReads() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit strange - shouldn't this return value sometimes depend on schema?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema is a state of the reader, so when a reader mix-in this interface, it should know what the current schema is, after column pruning or something.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so this is only called after all push downs are done? we should specify that.

* A data reader returned by a read task and is responsible for outputting data for an RDD
* partition.
*/
public interface DataReader<T> extends Iterator<T>, Closeable {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this an "Iterator"? Don't do this ...

Use explicit next(), with close().

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to use something like Iterable and Iterator instead of requiring an open method that can be easily forgotten. What's the rationale behind not using Iterator? The remove method that's on the interface?

* Returns true if the implementation can handle the limit operation, so that we can reduce
* the data size to be read at the very beginning.
*/
boolean pushDownLimit(int limit);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make it slightly shorter, perhaps just call the method pushLimit, pushFilters, ...

@cloud-fan cloud-fan closed this Nov 6, 2017
cloud-fan pushed a commit that referenced this pull request Jul 12, 2019
…nput of UDF as double in the failed test in udf-aggregate_part1.sql

## What changes were proposed in this pull request?

It still can be flaky on certain environments due to float limitation described at apache#25110 . See apache#25110 (comment)

- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/6584/testReport/org.apache.spark.sql/SQLQueryTestSuite/udf_pgSQL_udf_aggregates_part1_sql___Regular_Python_UDF/

```
Expected "700000000000[6] 1", but got "700000000000[5] 1" Result did not match for query apache#33&#10;SELECT CAST(avg(udf(CAST(x AS DOUBLE))) AS long), CAST(udf(var_pop(CAST(x AS DOUBLE))) AS decimal(10,3))&#10;FROM (VALUES (7000000000005), (7000000000007)) v(x)
```

Here;s what's going on: apache#25110 (comment)

```
scala> Seq("7000000000004.999", "7000000000006.999").toDF().selectExpr("CAST(avg(value) AS long)").show()
+--------------------------+
|CAST(avg(value) AS BIGINT)|
+--------------------------+
|             7000000000005|
+--------------------------+
```

Therefore, this PR just avoid to cast in the specific test.

This is a temp fix. We need more robust way to avoid such cases.

## How was this patch tested?

It passes with Maven in my local before/after this PR. I believe the problem seems similarly the Python or OS installed in the machine. I should test this against PR builder with `test-maven` for sure..

Closes apache#25128 from HyukjinKwon/SPARK-28270-2.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
cloud-fan pushed a commit that referenced this pull request Nov 18, 2019
… Arrow on JDK9+

### What changes were proposed in this pull request?

This PR aims to add `io.netty.tryReflectionSetAccessible=true` to the testing configuration for JDK11 because this is an officially documented requirement of Apache Arrow.

Apache Arrow community documented this requirement at `0.15.0` ([ARROW-6206](apache/arrow#5078)).
> #### For java 9 or later, should set "-Dio.netty.tryReflectionSetAccessible=true".
> This fixes `java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available`. thrown by netty.

### Why are the changes needed?

After ARROW-3191, Arrow Java library requires the property `io.netty.tryReflectionSetAccessible` to be set to true for JDK >= 9. After apache#26133, JDK11 Jenkins job seem to fail.

- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/676/
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/677/
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/678/

```scala
Previous exception in task:
sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available&#10;
io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:473)&#10;
io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)&#10;
io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)&#10;
io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)&#10;
org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)&#10;
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins with JDK11.

Closes apache#26552 from dongjoon-hyun/SPARK-ARROW-JDK11.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
cloud-fan pushed a commit that referenced this pull request Dec 22, 2020
…mand

### What changes were proposed in this pull request?

This PR proposes to sort table properties in DESCRIBE TABLE command. This is consistent with DSv2 command as well:
https://github.com/apache/spark/blob/e3058ba17cb4512537953eb4ded884e24ee93ba2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala#L63

This PR fixes the test case in Scala 2.13 build as well where the table properties have different order in the map.

### Why are the changes needed?

To keep the deterministic and pretty output, and fix the tests in Scala 2.13 build.
See https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-3.2-scala-2.13/49/testReport/junit/org.apache.spark.sql/SQLQueryTestSuite/describe_sql/

```
describe.sql&#10;Expected "...spark_catalog, view.[query.out.col.2=c, view.referredTempFunctionsNames=[], view.catalogAndNamespace.part.1=default]]", but got "...spark_catalog, view.[catalogAndNamespace.part.1=default, view.query.out.col.2=c, view.referredTempFunctionsNames=[]]]" Result did not match for query apache#29&#10;DESC FORMATTED v
```

### Does this PR introduce _any_ user-facing change?

Yes, it will change the text output from `DESCRIBE [EXTENDED|FORMATTED] table_name`.
Now the table properties are sorted by its key.

### How was this patch tested?

Related unittests were fixed accordingly.

Closes apache#30799 from HyukjinKwon/SPARK-33803.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jan 25, 2021
…mand

### What changes were proposed in this pull request?

This PR proposes to sort table properties in DESCRIBE TABLE command. This is consistent with DSv2 command as well:
https://github.com/apache/spark/blob/e3058ba17cb4512537953eb4ded884e24ee93ba2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala#L63

This PR fixes the test case in Scala 2.13 build as well where the table properties have different order in the map.

### Why are the changes needed?

To keep the deterministic and pretty output, and fix the tests in Scala 2.13 build.
See https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-3.2-scala-2.13/49/testReport/junit/org.apache.spark.sql/SQLQueryTestSuite/describe_sql/

```
describe.sql&#10;Expected "...spark_catalog, view.[query.out.col.2=c, view.referredTempFunctionsNames=[], view.catalogAndNamespace.part.1=default]]", but got "...spark_catalog, view.[catalogAndNamespace.part.1=default, view.query.out.col.2=c, view.referredTempFunctionsNames=[]]]" Result did not match for query apache#29&#10;DESC FORMATTED v
```

### Does this PR introduce _any_ user-facing change?

Yes, it will change the text output from `DESCRIBE [EXTENDED|FORMATTED] table_name`.
Now the table properties are sorted by its key.

### How was this patch tested?

Related unittests were fixed accordingly.

Closes apache#30799 from HyukjinKwon/SPARK-33803.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7845865)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Mar 17, 2022
…aceable

### What changes were proposed in this pull request?

This PR uses a manual recursion to replace `RuntimeReplaceable` expressions instead of `transformAllExpressionsWithPruning`. The problem of `transformAllExpressionsWithPruning` is it will automatically make the replacement expression inherit  the function alias name from the parent node, which is quite misleading. For example, `select date_part('month', c) from t`, the optimized plan in EXPLAIN before this PR is
```
Project [date_part(cast(c#18 as date)) AS date_part(month, c)#19]
+- Relation default.t[c#18] parquet
```
Now it's
```
Project [month(cast(c#9 as date)) AS date_part(month, c)#10]
+- Relation default.t[c#9] parquet
```

### Why are the changes needed?

fix misleading EXPLAIN result

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes apache#35821 from cloud-fan/follow2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
cloud-fan pushed a commit that referenced this pull request Feb 27, 2024
…n properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#45214 from zhengruifeng/connect_fix_read_join.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
cloud-fan pushed a commit that referenced this pull request May 7, 2024
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.5

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46291 from zhengruifeng/connect_fix_read_join_35.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants