-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-27181][SQL]: Add public transform API #24117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9ef6f8e
6906c5d
42d89f5
4841770
76a4067
a4a87ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,7 +92,7 @@ statement | |
| SHOW DATABASES (LIKE? pattern=STRING)? #showDatabases | ||
| createTableHeader ('(' colTypeList ')')? tableProvider | ||
((OPTIONS options=tablePropertyList) | | ||
(PARTITIONED BY partitionColumnNames=identifierList) | | ||
(PARTITIONED BY partitioning=transformList) | | ||
bucketSpec | | ||
locationSpec | | ||
(COMMENT comment=STRING) | | ||
|
@@ -587,6 +587,21 @@ namedExpressionSeq | |
: namedExpression (',' namedExpression)* | ||
; | ||
|
||
transformList | ||
: '(' transforms+=transform (',' transforms+=transform)* ')' | ||
; | ||
|
||
transform | ||
: qualifiedName #identityTransform | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's used in CREATE TABLE statement only, do we really need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is better to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about the transform arguments? Do they need to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, arguments need to be |
||
| transformName=identifier | ||
'(' argument+=transformArgument (',' argument+=transformArgument)* ')' #applyTransform | ||
; | ||
|
||
transformArgument | ||
: qualifiedName | ||
| constant | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
; | ||
|
||
expression | ||
: booleanExpression | ||
; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2.expressions; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. an issue with using the sql.catalog namespace is that it is not clear this is a "source" or "connector". In general, I think we should have separate API packages for library / data source developers vs end users. We already use sql.catalog for end user facing catalog functionalities. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think it is better to create a namespace for data sources (or connectors). as an example: org.apache.spark.sql.sources or org.apache.spark.sql.connectors and within it you have catalog and table ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sounds like you're suggesting a larger reorganization than just where the expressions go. Let's discuss this at tonight's sync. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When are we going to do this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First, we have to decide on what all we need to move. I don't think that is clear yet. The next concern is getting #24246 in because it is a blocker for a lot of work that can be done in parallel. After that, it would be nice to coordinate to avoid breaking lots of PRs, but that's less of a concern. So to answer your question, I think we should do this after #24246 and after we've decided what needs to move and what the new organization should be. The actual changes should be simple and quick to review, but would cause too much delay combined into a PR with other changes. |
||
|
||
import org.apache.spark.annotation.Experimental; | ||
|
||
/** | ||
* Base class of the public logical expression API. | ||
*/ | ||
@Experimental | ||
public interface Expression { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am confused why we should expose a custom API which is only used in transform API currently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to give the semantic to data source instead of a concrete UDF. Data sources can implement the partitioning semantic efficiently if they don't need to call a java UDF here and there. |
||
/** | ||
* Format the expression as a human readable SQL-like string. | ||
*/ | ||
String describe(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not toSQL? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't want to require that implementations of I can change this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only Spark internally will do this right? If that's the case, we should just make it clear it will be SQL strings. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean by "do this"?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, we could add a helper that does the formatting and remove this from the API entirely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer toSQL, given it will basically be toSQL, and it'd be useful to enforce this across the board. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2.expressions; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
import org.apache.spark.sql.types.DataType; | ||
import scala.collection.JavaConverters; | ||
|
||
import java.util.Arrays; | ||
|
||
/** | ||
* Helper methods to create logical transforms to pass into Spark. | ||
*/ | ||
@Experimental | ||
public class Expressions { | ||
private Expressions() { | ||
} | ||
|
||
/** | ||
* Create a logical transform for applying a named transform. | ||
* <p> | ||
* This transform can represent applying any named transform. | ||
* | ||
* @param name the transform name | ||
* @param args expression arguments to the transform | ||
* @return a logical transform | ||
*/ | ||
public Transform apply(String name, Expression... args) { | ||
return LogicalExpressions.apply(name, | ||
JavaConverters.asScalaBuffer(Arrays.asList(args)).toSeq()); | ||
} | ||
|
||
/** | ||
* Create a named reference expression for a column. | ||
* | ||
* @param name a column name | ||
* @return a named reference for the column | ||
*/ | ||
public NamedReference column(String name) { | ||
return LogicalExpressions.reference(name); | ||
} | ||
|
||
/** | ||
* Create a literal from a value. | ||
* <p> | ||
* The JVM type of the value held by a literal must be the type used by Spark's InternalRow API | ||
* for the literal's {@link DataType SQL data type}. | ||
* | ||
* @param value a value | ||
* @param <T> the JVM type of the value | ||
* @return a literal expression for the value | ||
*/ | ||
public <T> Literal<T> literal(T value) { | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return LogicalExpressions.literal(value); | ||
} | ||
|
||
/** | ||
* Create a bucket transform for one or more columns. | ||
* <p> | ||
* This transform represents a logical mapping from a value to a bucket id in [0, numBuckets) | ||
* based on a hash of the value. | ||
* <p> | ||
* The name reported by transforms created with this method is "bucket". | ||
* | ||
* @param numBuckets the number of output buckets | ||
* @param columns input columns for the bucket transform | ||
* @return a logical bucket transform with name "bucket" | ||
*/ | ||
public Transform bucket(int numBuckets, String... columns) { | ||
return LogicalExpressions.bucket(numBuckets, | ||
JavaConverters.asScalaBuffer(Arrays.asList(columns)).toSeq()); | ||
} | ||
|
||
/** | ||
* Create an identity transform for a column. | ||
* <p> | ||
* This transform represents a logical mapping from a value to itself. | ||
* <p> | ||
* The name reported by transforms created with this method is "identity". | ||
* | ||
* @param column an input column | ||
* @return a logical identity transform with name "identity" | ||
*/ | ||
public Transform identity(String column) { | ||
return LogicalExpressions.identity(column); | ||
} | ||
|
||
/** | ||
* Create a yearly transform for a timestamp or date column. | ||
* <p> | ||
* This transform represents a logical mapping from a timestamp or date to a year, such as 2018. | ||
* <p> | ||
* The name reported by transforms created with this method is "years". | ||
* | ||
* @param column an input timestamp or date column | ||
* @return a logical yearly transform with name "years" | ||
*/ | ||
public Transform years(String column) { | ||
return LogicalExpressions.years(column); | ||
} | ||
|
||
/** | ||
* Create a monthly transform for a timestamp or date column. | ||
* <p> | ||
* This transform represents a logical mapping from a timestamp or date to a month, such as | ||
* 2018-05. | ||
* <p> | ||
* The name reported by transforms created with this method is "months". | ||
* | ||
* @param column an input timestamp or date column | ||
* @return a logical monthly transform with name "months" | ||
*/ | ||
public Transform months(String column) { | ||
return LogicalExpressions.months(column); | ||
} | ||
|
||
/** | ||
* Create a daily transform for a timestamp or date column. | ||
* <p> | ||
* This transform represents a logical mapping from a timestamp or date to a date, such as | ||
* 2018-05-13. | ||
* <p> | ||
* The name reported by transforms created with this method is "days". | ||
* | ||
* @param column an input timestamp or date column | ||
* @return a logical daily transform with name "days" | ||
*/ | ||
public Transform days(String column) { | ||
return LogicalExpressions.days(column); | ||
} | ||
|
||
/** | ||
* Create an hourly transform for a timestamp column. | ||
* <p> | ||
* This transform represents a logical mapping from a timestamp to a date and hour, such as | ||
* 2018-05-13, hour 19. | ||
* <p> | ||
* The name reported by transforms created with this method is "hours". | ||
* | ||
* @param column an input timestamp column | ||
* @return a logical hourly transform with name "hours" | ||
*/ | ||
public Transform hours(String column) { | ||
return LogicalExpressions.hours(column); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2.expressions; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
import org.apache.spark.sql.types.DataType; | ||
|
||
/** | ||
* Represents a constant literal value in the public expression API. | ||
* <p> | ||
* The JVM type of the value held by a literal must be the type used by Spark's InternalRow API for | ||
* the literal's {@link DataType SQL data type}. | ||
* | ||
* @param <T> the JVM type of a value held by the literal | ||
*/ | ||
@Experimental | ||
public interface Literal<T> extends Expression { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little hesitant to add type parameter to an expression interface. I'm not sure how useful it is. When I deal with expressions, my method parameter and return type is usually There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the down side to using this? We have a typed literal in Iceberg and it is useful for maintaining type safety. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The downside is, we may need to add cast to read value from this literal, e.g.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it will be good to see some examples. In general, my feeling is that, adding type parameter to a sub-class but not the base class is not going to be very useful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The alternative is to cast the value instead, so you have to cast either way. You can't get around casting when the type is discarded. I don't think it is a good idea to throw away type information in all cases just because it isn't useful in some cases. Here's an example of how it is used in Iceberg in expression evaluation: public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
Comparator<T> cmp = lit.comparator();
return cmp.compare(ref.get(struct), lit.value()) < 0;
} In Iceberg, expression binding guarantees that the literal's type matches the reference's type. With that information, this code knows that the value returned by the reference's get method matches the type of the comparator and of the literal value. |
||
/** | ||
* Returns the literal value. | ||
*/ | ||
T value(); | ||
|
||
/** | ||
* Returns the SQL data type of the literal. | ||
*/ | ||
DataType dataType(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2.expressions; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
|
||
/** | ||
* Represents a field or column reference in the public logical expression API. | ||
*/ | ||
@Experimental | ||
public interface NamedReference extends Expression { | ||
/** | ||
* Returns the referenced field name as an array of String parts. | ||
* <p> | ||
* Each string in the returned array represents a field name. | ||
*/ | ||
String[] fieldNames(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2.expressions; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
|
||
/** | ||
* Represents a transform function in the public logical expression API. | ||
* <p> | ||
* For example, the transform date(ts) is used to derive a date value from a timestamp column. The | ||
* transform name is "date" and its argument is a reference to the "ts" column. | ||
*/ | ||
@Experimental | ||
public interface Transform extends Expression { | ||
/** | ||
* Returns the transform function name. | ||
*/ | ||
String name(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how would a library developer know what names are out there? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There isn't a restriction on names. Using the current implementation, it would be possible to express a custom partition function like this:
Spark defines commonly used transformations like bucketing and date transforms. Those transforms will be well-defined so they can eventually be used to pass values to Spark for metadata-only queries. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if I asked the question clearly. Say I'm a developer that wants to built a new data source, will I need to know what the possible transform functions are? Or are those completely hidden from me? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When it catalog API is added, it will link to the |
||
|
||
/** | ||
* Returns all field references in the transform arguments. | ||
*/ | ||
NamedReference[] references(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for validation. Spark should validate that there aren't any unresolved references. |
||
|
||
/** | ||
* Returns the arguments passed to the transform function. | ||
*/ | ||
Expression[] arguments(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this not a general expression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because in this context, only transforms are allowed. You can't partition by an arbitrary expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the parser be generic, and we just throw errors on things that are not supported? that'd give better error messages (typical technique used by more mature databases) and also reduce the number of parsing rules in the grammar file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can definitely do that and you're right about being able to produce better error messages.
Is it okay with you if we do this in a follow-up?
This PR is blocking the addition of
TableCatalog
in PR #24246, and that PR is blocking many PRs that we can do in parallel (Select from SQL, CreateTable, CTAS, DropTable, etc.). If we are going to get this done in time, I'd like to get these changes in and work on improvements like this later. I'd rather have a functional v2 implementation than friendly error messages right now.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good idea to make the parser rule general and produce a better error message. However this is an existing problem in our parser, and it's also a non-trivial work for this case. We need to catch
UnresolvedFunction
and valid the arguments, the arguments can only be attributes orExtractValues
or literal. I'm ok to do it in a followup.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened an issue for this follow-up: https://issues.apache.org/jira/browse/SPARK-27386