Skip to content

[SPARK-30289][SQL][TEST] Partitioned by Nested Column for InMemoryTable #26929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

dbtsai
Copy link
Member

@dbtsai dbtsai commented Dec 18, 2019

What changes were proposed in this pull request?

  1. InMemoryTable was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for InMemoryTable.

Why are the changes needed?

This PR implements partitioned by nested column for InMemoryTable, so we can test this features in DSv2

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing unit tests and new tests.

@dbtsai dbtsai added the SQL label Dec 18, 2019
@dbtsai dbtsai force-pushed the addTests branch 2 times, most recently from 39bdcf1 to 98fec47 Compare December 18, 2019 00:42
@dbtsai
Copy link
Member Author

dbtsai commented Dec 18, 2019

@@ -59,8 +60,11 @@ class InMemoryTable(

def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq

private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested columns were flatten out here, and then we looked them up against top level columns resulting IllegalArgumentException.

@SparkQA
Copy link

SparkQA commented Dec 18, 2019

Test build #115475 has finished for PR 26929 at commit 98fec47.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2019

I'll take a look at this tomorrow, but I think that not allowing partitioning by nested columns isn't the right solution. Iceberg can partition by columns nested in structs, but not columns inside lists or maps. Since it is just a logical grouping, I see no reason why it shouldn't be allowed. I think we just need to update the analysis check.

@@ -78,6 +78,8 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table {
partitions += spec.asTransform
}

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
partitions.validatePartitionColumns()
Copy link
Contributor

@rdblue rdblue Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that v1 tables should throw an exception if nested columns are used because it wasn't supported in v1, but there is no need to disallow nested columns in all v2 sources.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Digging the v2 codebase a bit, and I feel currently, we need some work to actually get v2 sources supporting using nested columns in transform. For example, in FileTable, override def partitioning: Array[Transform] is converted from PartitionSpec which doesn't support nested column at all. Since we are mixing v1 and v2 code here and there, do we have a plan to untangle them so it will be easier to extend nested column support in v2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileTable doesn't support partitioning by nested columns, and that's okay because it is optional. Sources should just reject partitioning that is not supported.

@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2019

@dbtsai, is the intent of this to disable nested columns for just the sources that can't handle them (In-memory, file sources, and v1) or is it to disable them more broadly?

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
val e = intercept[IllegalArgumentException] {
sql(s"CREATE TABLE $t1 (nested struct<id:bigint, data:string>) " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't support CREATE TABLE USING fileSourceV2 now, we only need to fix InMemoryTable.

@@ -39,6 +40,9 @@ abstract class FileTable(
userSpecifiedSchema: Option[StructType])
extends Table with SupportsRead with SupportsWrite {

// If `partitioning` contains nested columns, an `AnalysisException` will be thrown
partitioning.toSeq.validatePartitionColumns()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partitioning here is always inferred, so this check always passes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah seems like so. Even after we support passing the partitioning from TableProvider, the partitioning columns should be validated in Catalog already.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL queries might be weird if partitioning by nested columns, e.g.,

We can use top columns like:
INSERT OVERWRITE TABLE table_with_partition PARTITION (p1='a', p2='b') SELECT 'blarr' FROM tmp_table

For nested columns:
INSERT OVERWRITE TABLE table_with_partition PARTITION (p1='a', struct.sub1='b') SELECT 'blarr' FROM tmp_table

@dbtsai
Copy link
Member Author

dbtsai commented Jan 14, 2020

I was on paternity leave, and sorry for the late reply.

@rdblue the intent of this was to disable partition by nested columns because I saw some inconsistencies in the v2 codebase when I tried to create a new v2 filter api, and I thought partition by nested columns is not supported at all.

For example, in org.apache.spark.sql.catalyst.catalog.CatalogTable, we have partitionColumnNames: Seq[String] as partition columns instead of using NamedReference, how do we properly support using nested column as partitioning? Are we paring the string that contains . as a nested column? Can you give me an example?

@cloud-fan do you mean fix InMemoryTable so it supports it properly?

@rdblue
Copy link
Contributor

rdblue commented Jan 15, 2020

@dbtsai, Iceberg supports partitioning by fields in structs. We think of structs as a logical grouping of columns because values are still 1-to-1 with the row.

Hive tables don't support partitioning by nested fields, which is why Hive tables should reject partition expressions that use nested fields. It's up to the catalog and table implementation to determine what is supported.

The parser will recognize any multi-part identifier as a column in a partition expression. It will split on . and also supports back-ticks for quoting like normal column and table identifiers.

val (idTransforms, nonIdTransforms) = transforms.partition(_.isInstanceOf[IdentityTransform])

if (nonIdTransforms.nonEmpty) {
throw new AnalysisException("Transforms cannot be converted to partition columns: " +
nonIdTransforms.map(_.describe).mkString(", "))
nonIdTransforms.map(_.describe).mkString(", "))
Copy link
Member Author

@dbtsai dbtsai Jan 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue do we allow to use bucket transform as a partition column? It's not allowed in ResolveSessionCatalog.scala, but there is a test in DataFrameWriterV2Suite.scala testing test("Create: partitioned by bucket(4, id)").

Note that in that test, there is a table property to enable "allow-unsupported-transforms", what's the usecase here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That table property allows is used to make the test table implementation accept configuration that it doesn't support when writing. It's used to test that the table was passed the right Transform, even though the InMemoryTable only supports identity transforms.

ResolveSessionCatalog should convert bucket Transforms to and from BucketSpec.

@cloud-fan
Copy link
Contributor

I think Spark should support all kinds of PARTITION BY expressions as long as it can be translated to v2 Transform. The catalog implementation should decide if they support it or not. For examaple, Hive catalog doesn't support partition by nested columns.

For the particular test failure, I think we should fix InMemoryTable that, when flatten the fields, we should keep the full column path not just the name.

@dbtsai dbtsai changed the title [SPARK-30289][SQL] DSv2's partitioning should not accept nested columns [SPARK-30289][SQL] Partitioned by Nested Column for InMemoryTable Feb 7, 2020
@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Feb 11, 2020

Test build #118193 has finished for PR 26929 at commit cad1ab7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dbtsai
Copy link
Member Author

dbtsai commented Feb 13, 2020

Ping @cloud-fan @rdblue @dongjoon-hyun @viirya @gengliangwang again. I fixed InMemoryTable so it accepts nested cols as partition cols with tests.

@rdblue
Copy link
Contributor

rdblue commented Feb 14, 2020

+1

Thanks for updating tests, @dbtsai. This looks good to me and it's great to have cases for partitioning by nested fields.

value
}
}
partCols.map(filedNames => extractor(filedNames, schema, row))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filedNames? fieldNames?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

@viirya
Copy link
Member

viirya commented Feb 14, 2020

Looks good and thanks for working on this.

@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Test build #118379 has finished for PR 26929 at commit 259d9d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (fieldNames.length > 1) {
(value, schema(index).dataType) match {
case (row: InternalRow, nestedSchema: StructType) =>
extractor(fieldNames.slice(1, fieldNames.length), nestedSchema, row)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fieldNames.drop(1)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Addressed.

@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Test build #118408 has finished for PR 26929 at commit 21ebd26.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 14, 2020

Test build #118411 has finished for PR 26929 at commit e2cd87f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dbtsai dbtsai closed this in d0f9614 Feb 14, 2020
dbtsai added a commit that referenced this pull request Feb 14, 2020
### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes #26929 from dbtsai/addTests.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
(cherry picked from commit d0f9614)
Signed-off-by: DB Tsai <d_tsai@apple.com>
@dbtsai
Copy link
Member Author

dbtsai commented Feb 14, 2020

Thanks. Merged into master and branch-3.0.

@dbtsai dbtsai deleted the addTests branch February 18, 2020 23:40
@gatorsmile gatorsmile changed the title [SPARK-30289][SQL] Partitioned by Nested Column for InMemoryTable [SPARK-30289][TEST] Partitioned by Nested Column for InMemoryTable Feb 19, 2020
@gatorsmile gatorsmile changed the title [SPARK-30289][TEST] Partitioned by Nested Column for InMemoryTable [SPARK-30289][SQL][TEST] Partitioned by Nested Column for InMemoryTable Feb 19, 2020
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes apache#26929 from dbtsai/addTests.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants