Skip to content

[SPARK-27181][SQL]: Add public transform API #24117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ statement
| SHOW DATABASES (LIKE? pattern=STRING)? #showDatabases
| createTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitionColumnNames=identifierList) |
(PARTITIONED BY partitioning=transformList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
Expand Down Expand Up @@ -587,6 +587,21 @@ namedExpressionSeq
: namedExpression (',' namedExpression)*
;

transformList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not a general expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in this context, only transforms are allowed. You can't partition by an arbitrary expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the parser be generic, and we just throw errors on things that are not supported? that'd give better error messages (typical technique used by more mature databases) and also reduce the number of parsing rules in the grammar file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can definitely do that and you're right about being able to produce better error messages.

Is it okay with you if we do this in a follow-up?

This PR is blocking the addition of TableCatalog in PR #24246, and that PR is blocking many PRs that we can do in parallel (Select from SQL, CreateTable, CTAS, DropTable, etc.). If we are going to get this done in time, I'd like to get these changes in and work on improvements like this later. I'd rather have a functional v2 implementation than friendly error messages right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea to make the parser rule general and produce a better error message. However this is an existing problem in our parser, and it's also a non-trivial work for this case. We need to catch UnresolvedFunction and valid the arguments, the arguments can only be attributes or ExtractValues or literal. I'm ok to do it in a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened an issue for this follow-up: https://issues.apache.org/jira/browse/SPARK-27386

: '(' transforms+=transform (',' transforms+=transform)* ')'
;

transform
: qualifiedName #identityTransform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's used in CREATE TABLE statement only, do we really need qualifiedName? I think identifier is good enough here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to use qualifiedName. This may be a logical name in the current use, but later Spark may need to resolve the transform using this name. For example, this could be set to builtin.bucket to tell Spark that it is the built-in bucket transform function. Using that information, Spark would know it can run a bucketed join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the transform arguments? Do they need to be qualifiedName as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, arguments need to be qualifiedName because they may reference nested fields.

| transformName=identifier
'(' argument+=transformArgument (',' argument+=transformArgument)* ')' #applyTransform
;

transformArgument
: qualifiedName
| constant
;

expression
: booleanExpression
;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2.expressions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an issue with using the sql.catalog namespace is that it is not clear this is a "source" or "connector". In general, I think we should have separate API packages for library / data source developers vs end users. We already use sql.catalog for end user facing catalog functionalities.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about org.apache.spark.sql.logical.expressions then? It doesn't matter to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it is better to create a namespace for data sources (or connectors). as an example:

org.apache.spark.sql.sources

or

org.apache.spark.sql.connectors

and within it you have

catalog

and

table ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like you're suggesting a larger reorganization than just where the expressions go. Let's discuss this at tonight's sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the org.apache.spark.sql.connectors packge, but moving classes will cause code conflict for all the DS v2 PRs. Maybe do it latter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When are we going to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, we have to decide on what all we need to move. I don't think that is clear yet. The next concern is getting #24246 in because it is a blocker for a lot of work that can be done in parallel. After that, it would be nice to coordinate to avoid breaking lots of PRs, but that's less of a concern.

So to answer your question, I think we should do this after #24246 and after we've decided what needs to move and what the new organization should be.

The actual changes should be simple and quick to review, but would cause too much delay combined into a PR with other changes.


import org.apache.spark.annotation.Experimental;

/**
* Base class of the public logical expression API.
*/
@Experimental
public interface Expression {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused why we should expose a custom API which is only used in transform API currently.
Maybe there was a discussion about this plan. Do we plan to switch the current Spark expressions to this expression entirely in the future and how is it different from using a UDF?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to give the semantic to data source instead of a concrete UDF. Data sources can implement the partitioning semantic efficiently if they don't need to call a java UDF here and there.

/**
* Format the expression as a human readable SQL-like string.
*/
String describe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not toSQL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to require that implementations of Expression produce SQL that can be parsed. Spark should do that to ensure it is reliable when Spark needs to embed these in SQL strings. But it is good to have readable a string representation.

I can change this to toSQL if you think that implementations will be able to produce reliable SQL strings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only Spark internally will do this right? If that's the case, we should just make it clear it will be SQL strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "do this"?

Transform is a public API that can be implemented to pass transforms into Spark. I don't think we can avoid that. As a result, Spark should produce the SQL representation itself using the transform name and arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could add a helper that does the formatting and remove this from the API entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer toSQL, given it will basically be toSQL, and it'd be useful to enforce this across the board.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2.expressions;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.types.DataType;
import scala.collection.JavaConverters;

import java.util.Arrays;

/**
* Helper methods to create logical transforms to pass into Spark.
*/
@Experimental
public class Expressions {
private Expressions() {
}

/**
* Create a logical transform for applying a named transform.
* <p>
* This transform can represent applying any named transform.
*
* @param name the transform name
* @param args expression arguments to the transform
* @return a logical transform
*/
public Transform apply(String name, Expression... args) {
return LogicalExpressions.apply(name,
JavaConverters.asScalaBuffer(Arrays.asList(args)).toSeq());
}

/**
* Create a named reference expression for a column.
*
* @param name a column name
* @return a named reference for the column
*/
public NamedReference column(String name) {
return LogicalExpressions.reference(name);
}

/**
* Create a literal from a value.
* <p>
* The JVM type of the value held by a literal must be the type used by Spark's InternalRow API
* for the literal's {@link DataType SQL data type}.
*
* @param value a value
* @param <T> the JVM type of the value
* @return a literal expression for the value
*/
public <T> Literal<T> literal(T value) {
return LogicalExpressions.literal(value);
}

/**
* Create a bucket transform for one or more columns.
* <p>
* This transform represents a logical mapping from a value to a bucket id in [0, numBuckets)
* based on a hash of the value.
* <p>
* The name reported by transforms created with this method is "bucket".
*
* @param numBuckets the number of output buckets
* @param columns input columns for the bucket transform
* @return a logical bucket transform with name "bucket"
*/
public Transform bucket(int numBuckets, String... columns) {
return LogicalExpressions.bucket(numBuckets,
JavaConverters.asScalaBuffer(Arrays.asList(columns)).toSeq());
}

/**
* Create an identity transform for a column.
* <p>
* This transform represents a logical mapping from a value to itself.
* <p>
* The name reported by transforms created with this method is "identity".
*
* @param column an input column
* @return a logical identity transform with name "identity"
*/
public Transform identity(String column) {
return LogicalExpressions.identity(column);
}

/**
* Create a yearly transform for a timestamp or date column.
* <p>
* This transform represents a logical mapping from a timestamp or date to a year, such as 2018.
* <p>
* The name reported by transforms created with this method is "years".
*
* @param column an input timestamp or date column
* @return a logical yearly transform with name "years"
*/
public Transform years(String column) {
return LogicalExpressions.years(column);
}

/**
* Create a monthly transform for a timestamp or date column.
* <p>
* This transform represents a logical mapping from a timestamp or date to a month, such as
* 2018-05.
* <p>
* The name reported by transforms created with this method is "months".
*
* @param column an input timestamp or date column
* @return a logical monthly transform with name "months"
*/
public Transform months(String column) {
return LogicalExpressions.months(column);
}

/**
* Create a daily transform for a timestamp or date column.
* <p>
* This transform represents a logical mapping from a timestamp or date to a date, such as
* 2018-05-13.
* <p>
* The name reported by transforms created with this method is "days".
*
* @param column an input timestamp or date column
* @return a logical daily transform with name "days"
*/
public Transform days(String column) {
return LogicalExpressions.days(column);
}

/**
* Create an hourly transform for a timestamp column.
* <p>
* This transform represents a logical mapping from a timestamp to a date and hour, such as
* 2018-05-13, hour 19.
* <p>
* The name reported by transforms created with this method is "hours".
*
* @param column an input timestamp column
* @return a logical hourly transform with name "hours"
*/
public Transform hours(String column) {
return LogicalExpressions.hours(column);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2.expressions;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.types.DataType;

/**
* Represents a constant literal value in the public expression API.
* <p>
* The JVM type of the value held by a literal must be the type used by Spark's InternalRow API for
* the literal's {@link DataType SQL data type}.
*
* @param <T> the JVM type of a value held by the literal
*/
@Experimental
public interface Literal<T> extends Expression {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little hesitant to add type parameter to an expression interface. I'm not sure how useful it is. When I deal with expressions, my method parameter and return type is usually Expression. Because of type erasure, I won't get the type parameter of literal, unless the method deals with literal only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the down side to using this? We have a typed literal in Iceberg and it is useful for maintaining type safety.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside is, we may need to add cast to read value from this literal, e.g.

def func(e: Expression) = e match {
  case lit: Literal[_] => lit.asInstanceOf[Literal[Any]].value
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it will be good to see some examples. In general, my feeling is that, adding type parameter to a sub-class but not the base class is not going to be very useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative is to cast the value instead, so you have to cast either way. You can't get around casting when the type is discarded. I don't think it is a good idea to throw away type information in all cases just because it isn't useful in some cases.

Here's an example of how it is used in Iceberg in expression evaluation:

    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
      Comparator<T> cmp = lit.comparator();
      return cmp.compare(ref.get(struct), lit.value()) < 0;
    }

In Iceberg, expression binding guarantees that the literal's type matches the reference's type. With that information, this code knows that the value returned by the reference's get method matches the type of the comparator and of the literal value.

/**
* Returns the literal value.
*/
T value();

/**
* Returns the SQL data type of the literal.
*/
DataType dataType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2.expressions;

import org.apache.spark.annotation.Experimental;

/**
* Represents a field or column reference in the public logical expression API.
*/
@Experimental
public interface NamedReference extends Expression {
/**
* Returns the referenced field name as an array of String parts.
* <p>
* Each string in the returned array represents a field name.
*/
String[] fieldNames();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2.expressions;

import org.apache.spark.annotation.Experimental;

/**
* Represents a transform function in the public logical expression API.
* <p>
* For example, the transform date(ts) is used to derive a date value from a timestamp column. The
* transform name is "date" and its argument is a reference to the "ts" column.
*/
@Experimental
public interface Transform extends Expression {
/**
* Returns the transform function name.
*/
String name();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would a library developer know what names are out there?

Copy link
Contributor Author

@rdblue rdblue Apr 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a restriction on names. Using the current implementation, it would be possible to express a custom partition function like this:

PARTITION BY (range(last_name, 'A', 'C', 'G', ...))

Spark defines commonly used transformations like bucketing and date transforms. Those transforms will be well-defined so they can eventually be used to pass values to Spark for metadata-only queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I asked the question clearly. Say I'm a developer that wants to built a new data source, will I need to know what the possible transform functions are? Or are those completely hidden from me?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it catalog API is added, it will link to the Expressions class that includes known transforms that have meaning to Spark. It will also note that any transform can be expressed, like I said above. The range transform example can be parsed and passed to the source, it just isn't one defined by Spark.


/**
* Returns all field references in the transform arguments.
*/
NamedReference[] references();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for validation. Spark should validate that there aren't any unresolved references.


/**
* Returns the arguments passed to the transform function.
*/
Expression[] arguments();
}
Loading