-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-30289][SQL][TEST] Partitioned by Nested Column for InMemoryTable
#26929
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
39bdcf1
to
98fec47
Compare
@@ -59,8 +60,11 @@ class InMemoryTable( | |||
|
|||
def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq | |||
|
|||
private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nested columns were flatten out here, and then we looked them up against top level columns resulting IllegalArgumentException
.
Test build #115475 has finished for PR 26929 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
Outdated
Show resolved
Hide resolved
I'll take a look at this tomorrow, but I think that not allowing partitioning by nested columns isn't the right solution. Iceberg can partition by columns nested in structs, but not columns inside lists or maps. Since it is just a logical grouping, I see no reason why it shouldn't be allowed. I think we just need to update the analysis check. |
@@ -78,6 +78,8 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table { | |||
partitions += spec.asTransform | |||
} | |||
|
|||
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper | |||
partitions.validatePartitionColumns() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that v1 tables should throw an exception if nested columns are used because it wasn't supported in v1, but there is no need to disallow nested columns in all v2 sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Digging the v2 codebase a bit, and I feel currently, we need some work to actually get v2 sources supporting using nested columns in transform. For example, in FileTable
, override def partitioning: Array[Transform]
is converted from PartitionSpec
which doesn't support nested column at all. Since we are mixing v1 and v2 code here and there, do we have a plan to untangle them so it will be easier to extend nested column support in v2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileTable doesn't support partitioning by nested columns, and that's okay because it is optional. Sources should just reject partitioning that is not supported.
@dbtsai, is the intent of this to disable nested columns for just the sources that can't handle them (In-memory, file sources, and v1) or is it to disable them more broadly? |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
val t1 = s"${catalogAndNamespace}tbl" | ||
withTable(t1) { | ||
val e = intercept[IllegalArgumentException] { | ||
sql(s"CREATE TABLE $t1 (nested struct<id:bigint, data:string>) " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't support CREATE TABLE USING fileSourceV2
now, we only need to fix InMemoryTable
.
@@ -39,6 +40,9 @@ abstract class FileTable( | |||
userSpecifiedSchema: Option[StructType]) | |||
extends Table with SupportsRead with SupportsWrite { | |||
|
|||
// If `partitioning` contains nested columns, an `AnalysisException` will be thrown | |||
partitioning.toSeq.validatePartitionColumns() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The partitioning
here is always inferred, so this check always passes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah seems like so. Even after we support passing the partitioning from TableProvider
, the partitioning columns should be validated in Catalog already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL queries might be weird if partitioning by nested columns, e.g.,
We can use top columns like:
INSERT OVERWRITE TABLE table_with_partition PARTITION (p1='a', p2='b') SELECT 'blarr' FROM tmp_table
For nested columns:
INSERT OVERWRITE TABLE table_with_partition PARTITION (p1='a', struct.sub1='b') SELECT 'blarr' FROM tmp_table
I was on paternity leave, and sorry for the late reply. @rdblue the intent of this was to disable partition by nested columns because I saw some inconsistencies in the v2 codebase when I tried to create a new v2 filter api, and I thought partition by nested columns is not supported at all. For example, in @cloud-fan do you mean fix |
@dbtsai, Iceberg supports partitioning by fields in structs. We think of structs as a logical grouping of columns because values are still 1-to-1 with the row. Hive tables don't support partitioning by nested fields, which is why Hive tables should reject partition expressions that use nested fields. It's up to the catalog and table implementation to determine what is supported. The parser will recognize any multi-part identifier as a column in a partition expression. It will split on |
val (idTransforms, nonIdTransforms) = transforms.partition(_.isInstanceOf[IdentityTransform]) | ||
|
||
if (nonIdTransforms.nonEmpty) { | ||
throw new AnalysisException("Transforms cannot be converted to partition columns: " + | ||
nonIdTransforms.map(_.describe).mkString(", ")) | ||
nonIdTransforms.map(_.describe).mkString(", ")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue do we allow to use bucket transform as a partition column? It's not allowed in ResolveSessionCatalog.scala
, but there is a test in DataFrameWriterV2Suite.scala
testing test("Create: partitioned by bucket(4, id)")
.
Note that in that test, there is a table property to enable "allow-unsupported-transforms"
, what's the usecase here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That table property allows is used to make the test table implementation accept configuration that it doesn't support when writing. It's used to test that the table was passed the right Transform
, even though the InMemoryTable
only supports identity transforms.
ResolveSessionCatalog should convert bucket Transforms to and from BucketSpec.
I think Spark should support all kinds of PARTITION BY expressions as long as it can be translated to v2 For the particular test failure, I think we should fix |
InMemoryTable
This comment has been minimized.
This comment has been minimized.
Test build #118193 has finished for PR 26929 at commit
|
Ping @cloud-fan @rdblue @dongjoon-hyun @viirya @gengliangwang again. I fixed |
+1 Thanks for updating tests, @dbtsai. This looks good to me and it's great to have cases for partitioning by nested fields. |
value | ||
} | ||
} | ||
partCols.map(filedNames => extractor(filedNames, schema, row)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filedNames? fieldNames?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks.
Looks good and thanks for working on this. |
Test build #118379 has finished for PR 26929 at commit
|
if (fieldNames.length > 1) { | ||
(value, schema(index).dataType) match { | ||
case (row: InternalRow, nestedSchema: StructType) => | ||
extractor(fieldNames.slice(1, fieldNames.length), nestedSchema, row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fieldNames.drop(1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Addressed.
Test build #118408 has finished for PR 26929 at commit
|
Test build #118411 has finished for PR 26929 at commit
|
### What changes were proposed in this pull request? 1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct. This PR implements partitioned by nested column for `InMemoryTable`. ### Why are the changes needed? This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests and new tests. Closes #26929 from dbtsai/addTests. Authored-by: DB Tsai <d_tsai@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit d0f9614) Signed-off-by: DB Tsai <d_tsai@apple.com>
Thanks. Merged into master and branch-3.0. |
InMemoryTable
InMemoryTable
InMemoryTable
InMemoryTable
### What changes were proposed in this pull request? 1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct. This PR implements partitioned by nested column for `InMemoryTable`. ### Why are the changes needed? This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests and new tests. Closes apache#26929 from dbtsai/addTests. Authored-by: DB Tsai <d_tsai@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
What changes were proposed in this pull request?
InMemoryTable
was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct.This PR implements partitioned by nested column for
InMemoryTable
.Why are the changes needed?
This PR implements partitioned by nested column for
InMemoryTable
, so we can test this features in DSv2Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing unit tests and new tests.