Skip to content

[SPARK-18661] [SQL] Creating a partitioned datasource table should not scan all files for table #16090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Nov 30, 2016

What changes were proposed in this pull request?

Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason.

We should avoid doing this when the user specifies a schema.

How was this patch tested?

Perf stat tests.

@@ -58,13 +58,20 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val pathOption = table.storage.locationUri.map("path" -> _)
val uncreatedTable = table.copy(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan is there a better way to do this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this before we pass the CatalogTable? e.g. in the parser and DataFrameWriter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, that seems more brittle since you'd have to duplicate the logic. I added a comment describing why we need to do this here.

@ericl
Copy link
Contributor Author

ericl commented Nov 30, 2016

@rxin

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69435 has finished for PR 16090 at commit 51d2a41.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69437 has finished for PR 16090 at commit 275f6b9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.provider == Some("parquet"))
assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
// a is ordered last since it is a user-specified partitioning column
assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai this is the minor behavior change we discussed about create table in 2.1

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69514 has finished for PR 16090 at commit 89b0a64.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val pathOption = table.storage.locationUri.map("path" -> _)
// Fill in some default table options from the session conf
val uncreatedTable = table.copy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about tableWithDefaultOptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

identifier = table.identifier.copy(
database = Some(
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically we don't know the value of tracksPartitionsInCatalog here, as the partition columns are not inferred yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is true for all new tables. If the table is unpartitioned the flag is harmless.

className = table.provider.get,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption).resolveRelation()
options = table.storage.properties ++ pathOption,
catalogTable = Some(uncreatedTable)).resolveRelation()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to pass in the catalogTable here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, we will construct an InMemoryFileIndex which scans the filesystem.

@@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
pathToNonPartitionedTable,
userSpecifiedSchema = Option("num int, str string"),
userSpecifiedPartitionCols = partitionCols,
expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
expectedSchema = if (partitionCols.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just change the test to use str as partition column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be testing something slightly different.

val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks to me that this line and https://github.com/apache/spark/pull/16090/files#diff-7a6cb188d2ae31eb3347b5629a679cecR135 are the key change. Did I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to pass catalogTable in, so that on line 390 of DataSource we create a CatalogFileIndex instead of an InMemoryFileIndex.

Copy link
Contributor

@cloud-fan cloud-fan Dec 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to create an InMemoryFileIndex for this case, as we call DataSource.resolveRelation here just to infer the schema and partition columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want to do that though. Resolve relation also does not always scan the filesystem if you pass in a user defined schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, if passing the catalogTable or not doesn't affect the correctness(or performance), we can remove the tableWithDefaultOptions and make the code simpler right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do need to pass it in though.

        val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
            catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
          new CatalogFileIndex(
            sparkSession,
            catalogTable.get,
            catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
        } else {
          new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
        }

Otherwise, this code will perform a full filesystem scan, independent of the other change to prevent getOrInferFileFormatSchema from performing a scan as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then can we just make the InMemoryFileIndex scan the file lazily? If we only need to infer the schema and partition columns, it should not do the scan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a pretty big change, considering how many classes depend on the eager behavior of InMemoryFileIndex.

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69534 has finished for PR 16090 at commit 2940d55.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69536 has finished for PR 16090 at commit b405635.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

My main concern is that, in CreateDataSourceTableCommand, we call DataSource.resolveRelation to infer the schema and partition columns. At that time, the table is not created yet, so logically we should not pass a CatalogTable to DataSource and create CatalogFileIndex inside it, which looks like a hack.

It seems to me that it's more logical to tweak the InMemoryFileIndex to scan the files lazily, to avoid unnecessary file scan for cases like this one.

@ericl
Copy link
Contributor Author

ericl commented Dec 2, 2016

I looked at avoiding the creation of a CatalogFileIndex, but the way table resolution works right now, the only way is to create some sort of dummy file index class that does not support scans. It's not clear to me that is any better than just creating a CatalogFileIndex, even if the table is not yet ready.

We can probably clean this up so it is not necessary to create a file index for table creation, but that would be a pretty big change to land for 2.1.

@ericl
Copy link
Contributor Author

ericl commented Dec 3, 2016

Seems like we also create InMemoryFileIndex twice for non-catalog tables. Let me try to fix that too.

@ericl
Copy link
Contributor Author

ericl commented Dec 3, 2016

Fixed by adding a private cache to Datasource, which is used to avoid the duplicate file reads with InMemoryIndex.

@ericl
Copy link
Contributor Author

ericl commented Dec 3, 2016

cc @rxin please merge unless wenchen gets to it first

@SparkQA
Copy link

SparkQA commented Dec 3, 2016

Test build #69600 has finished for PR 16090 at commit 5a250ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor Author

ericl commented Dec 3, 2016 via email

@cloud-fan
Copy link
Contributor

If we are going to hack it, how about this?

val dataSource = DataSource(...)
if (classOf[FileFormat].isAssignableFrom(dataSource.providingClass)) {
  dataSource.getOrInferFileFormatSchema()
} else {
  dataSoure.resolveRelation().schema -> new StructType
}

Then we don't need to create FileIndex and scan files.

@ericl
Copy link
Contributor Author

ericl commented Dec 4, 2016

Not sure I follow - could you explain more on why that would resolve the issue?

Btw, I reverted this pr to b405635, which passes all tests.

@SparkQA
Copy link

SparkQA commented Dec 4, 2016

Test build #69630 has finished for PR 16090 at commit b405635.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

After looking more at the code, now I agree with your approach. One question, seems we still scan the files when creating a unpartitioned external data source table?

@ericl
Copy link
Contributor Author

ericl commented Dec 4, 2016 via email

asfgit pushed a commit that referenced this pull request Dec 4, 2016
… scan all files for table

## What changes were proposed in this pull request?

Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason.

We should avoid doing this when the user specifies a schema.

## How was this patch tested?

Perf stat tests.

Author: Eric Liang <ekl@databricks.com>

Closes #16090 from ericl/spark-18661.

(cherry picked from commit d9eb4c7)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

LGTM, merging to master/2.1!
@ericl please create tickets for the other 2 issues

@asfgit asfgit closed this in d9eb4c7 Dec 4, 2016
@ericl
Copy link
Contributor Author

ericl commented Dec 5, 2016

robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
… scan all files for table

## What changes were proposed in this pull request?

Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason.

We should avoid doing this when the user specifies a schema.

## How was this patch tested?

Perf stat tests.

Author: Eric Liang <ekl@databricks.com>

Closes apache#16090 from ericl/spark-18661.
asfgit pushed a commit that referenced this pull request Jan 13, 2017
…the saved files

### What changes were proposed in this pull request?
`DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.

The related PR: #16090

### How was this patch tested?
Updated the existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16481 from gatorsmile/saveFileScan.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… scan all files for table

## What changes were proposed in this pull request?

Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason.

We should avoid doing this when the user specifies a schema.

## How was this patch tested?

Perf stat tests.

Author: Eric Liang <ekl@databricks.com>

Closes apache#16090 from ericl/spark-18661.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…the saved files

### What changes were proposed in this pull request?
`DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.

The related PR: apache#16090

### How was this patch tested?
Updated the existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#16481 from gatorsmile/saveFileScan.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
…the saved files

### What changes were proposed in this pull request?
`DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.

The related PR: apache#16090

### How was this patch tested?
Updated the existing test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#16481 from gatorsmile/saveFileScan.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants