Skip to content

[SPARK-17153][SQL] Should read partition data when reading new files in filestream without globbing #14803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Aug 25, 2016

What changes were proposed in this pull request?

When reading file stream with non-globbing path, the results return data with all nulls for the
partitioned columns. E.g.,

case class A(id: Int, value: Int)
val data = spark.createDataset(Seq(
  A(1, 1), 
  A(2, 2), 
  A(2, 3))
) 
val url = "/tmp/test"
data.write.partitionBy("id").parquet(url)
spark.read.parquet(url).show

+-----+---+
|value| id|
+-----+---+
|    2|  2|
|    3|  2|
|    1|  1|
+-----+---+

val s = spark.readStream.schema(spark.read.load(url).schema).parquet(url)
s.writeStream.queryName("test").format("memory").start()

sql("SELECT * FROM test").show

+-----+----+
|value|  id|
+-----+----+
|    2|null|
|    3|null|
|    1|null|
+-----+----+

How was this patch tested?

Jenkins tests.

@viirya
Copy link
Member Author

viirya commented Aug 25, 2016

cc @marmbrus

@SparkQA
Copy link

SparkQA commented Aug 25, 2016

Test build #64405 has finished for PR 14803 at commit 2771d71.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…option

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@viirya
Copy link
Member Author

viirya commented Aug 29, 2016

ping @zsxwing @tdas Can you help review this? Thanks.

@@ -129,13 +129,20 @@ class FileStreamSource(
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
val newOptions = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) {
Copy link
Contributor

@frreiss frreiss Aug 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend putting this check for globs into the initialization code at the top of this file that sets qualifiedBasePath (currently lines 47-50). That way all the code that interprets the meaning of the path parameter will be in one place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

@SparkQA
Copy link

SparkQA commented Aug 29, 2016

Test build #64565 has finished for PR 14803 at commit 0d841e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64650 has finished for PR 14803 at commit 6adf2e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@frreiss
Copy link
Contributor

frreiss commented Aug 30, 2016

LGTM

@viirya
Copy link
Member Author

viirya commented Sep 5, 2016

ping @marmbrus @zsxwing Can you take a quick look? Thanks.

@viirya
Copy link
Member Author

viirya commented Sep 19, 2016

ping @marmbrus @zsxwing Would you mind to take a look this and provide your feedback? If this is not going to be fixed, please let me know too. This is a small change and I don't think it should be waiting for such long time. Thanks.

…option

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@srowen
Copy link
Member

srowen commented Sep 19, 2016

I'm not so familiar with the logic here but tend to trust your analysis and the other positive review.

format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
fileCatalog.allFiles()).map { inferredSchema =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wrapping threw me off for a minute. You might consider introducing an extra variable for the result of inferSchema. Also does .map(partitionCols.foldLeft(_)((struct, field) => struct.add(field))) work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it to use @marmbrus's version below.

@@ -49,6 +49,14 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val optionsWithPartitionBasePath = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just a style thing but is this more direct?

private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++
  if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path") {
    Map("basePath" -> path)
  } else {
    Map()
  }

Not sure, just avoided some repetition but is about the same amount of code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Looks better.

@SparkQA
Copy link

SparkQA commented Sep 19, 2016

Test build #65595 has finished for PR 14803 at commit 04b61c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@marmbrus marmbrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this seems reasonable to me. I am a little concerned about the user experience in a few edge cases, so we should probably add a section in the programming guide. In particular:

  • If the partition directories are not present when the stream starts then I believe this breaks.
  • I think that for all but text you have to include the partition columns in the schema if inference is turned off (which it is by default).

Are there any others you can think of?

format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
fileCatalog.allFiles()).map { inferredSchema =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I think this is much easier to understand as:

val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
val inferred = format.inferSchema(
  sparkSession,
  caseInsensitiveOptions,
  fileCatalog.allFiles())

inferred.map { inferredSchema =>
  StructType(inferredSchema ++ partitionCols)
}

@@ -608,6 +608,34 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

// =============== other tests ================

test("read new files in partitioned table without globbing, should read partition data") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also have an explicit test for the case where schema inference is turned on (you implicitly test it some with the code changed below)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for it.

@viirya
Copy link
Member Author

viirya commented Sep 20, 2016

  • If the partition directories are not present when the stream starts then I believe this breaks.

Yes. Schema inference only happens when starting the stream.

  • I think that for all but text you have to include the partition columns in the schema if inference is turned off (which it is by default).

I will update this to the programming guide.

@viirya
Copy link
Member Author

viirya commented Sep 20, 2016

@marmbrus

  • I think that for all but text you have to include the partition columns in the schema if inference is turned off (which it is by default).

For text format, when inference is turned off but there is user provided schema, we will use the schema. In this case, I think user should also include the partition columns in the schema, right?

@SparkQA
Copy link

SparkQA commented Sep 20, 2016

Test build #65623 has finished for PR 14803 at commit 23ba9a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2016

Test build #65625 has finished for PR 14803 at commit 5b101ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2016

Test build #65627 has finished for PR 14803 at commit 541dfdc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 22, 2016

ping @marmbrus Can you take another look?

And I have a question about the suggested programming guide change. Can you clarify it too? Thanks!

@SparkQA
Copy link

SparkQA commented Sep 22, 2016

Test build #65764 has finished for PR 14803 at commit 9d16631.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -512,6 +512,10 @@ csvDF = spark \

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the [SQL Programming Guide](sql-programming-guide.html) for more details. Additionally, more details on the supported streaming sources are discussed later in the document.

### Schema inference and partition of streaming DataFrames/Datasets

You can specify the schema for streaming DataFrames/Datasets to create with the API as shown in above example (i.e., `userSchema`). Alternatively, for file-based streaming source, you can config it to infer the schema. By default, the configure of streaming schema inference `spark.sql.streaming.schemaInference` is turned off. If the streaming DataFrame/Dataset is partitioned, the partition columns will only be inferred if the partition directories are present when the stream starts. When schema inference is turned off, for all file-based streaming sources except for `text` format, you have to include partition columns in the user provided schema.
Copy link
Contributor

@marmbrus marmbrus Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).

@marmbrus
Copy link
Contributor

Mostly looks good, I've also asked @tdas to take a look since he wrote this initially.

A few more cases came to mind while while I was rephrasing your documentation. Specifically,

  • What error is printed (if any) if an invalid partition directory is created midstream.
  • Are we okay if all of the data disappears (that has already been processed) and then new data arrives?

@viirya
Copy link
Member Author

viirya commented Sep 23, 2016

  • What error is printed (if any) if an invalid partition directory is created midstream.

The error is:

[info]   org.apache.spark.sql.streaming.StreamingQueryException: Query query-14 terminated with exception: assertio
n failed: Conflicting partition column names detected:
[info] 
[info]  Partition column name list #0: partition2
[info]  Partition column name list #1: partition
[info] 
[info] For partitioned table directories, data files should only live in leaf directories.
[info] And directories at the same level should have the same partition column name.
[info] Please check the following directories for unexpected files or inconsistent partition column names:
[info] 
[info]  file:/root/repos/spark-1/target/tmp/streaming.src-c3a9895d-7be1-4ded-9154-7a24026513d7/partition2=bar
[info]  file:/root/repos/spark-1/target/tmp/streaming.src-c3a9895d-7be1-4ded-9154-7a24026513d7/partition=bar
[info]  file:/root/repos/spark-1/target/tmp/streaming.src-c3a9895d-7be1-4ded-9154-7a24026513d7/partition=foo
[info]   at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$Strea
mExecution$$runBatches(StreamExecution.scala:211)
[info]   at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)
[info]   Cause: java.lang.AssertionError: assertion failed: Conflicting partition column names detected:
[info] 
[info]  Partition column name list #0: partition2
[info]  Partition column name list #1: partition
  • Are we okay if all of the data disappears (that has already been processed) and then new data arrives?

I enhanced the added test to test this. It okay, if I understand your point correctly here.

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65812 has finished for PR 14803 at commit e21536e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class DeleteFile(file: File) extends ExternalAction
    • trait ExternalAction extends StreamAction

@@ -512,6 +512,12 @@ csvDF = spark \

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the [SQL Programming Guide](sql-programming-guide.html) for more details. Additionally, more details on the supported streaming sources are discussed later in the document.

### Schema inference and partition of streaming DataFrames/Datasets

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting `spark.sql.streaming.schemaInference` to `true`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema inference can lead to many corner cases regarding if the inferred schema is different after restart. So I think we should use a stronger language that schema inference is not advisable in production uses.

@marmbrus
Copy link
Contributor

Thanks, I'm going to merge this to master.

@asfgit asfgit closed this in 8135e0e Sep 26, 2016
@viirya
Copy link
Member Author

viirya commented Sep 27, 2016

@marmbrus @frreiss @srowen @tdas Thanks for review!

@zsxwing
Copy link
Member

zsxwing commented Oct 24, 2016

I'm going to merge this one to branch 2.0 since it only changes Structured Streaming.

asfgit pushed a commit that referenced this pull request Oct 24, 2016
…in filestream without globbing

## What changes were proposed in this pull request?

When reading file stream with non-globbing path, the results return data with all `null`s for the
partitioned columns. E.g.,

    case class A(id: Int, value: Int)
    val data = spark.createDataset(Seq(
      A(1, 1),
      A(2, 2),
      A(2, 3))
    )
    val url = "/tmp/test"
    data.write.partitionBy("id").parquet(url)
    spark.read.parquet(url).show

    +-----+---+
    |value| id|
    +-----+---+
    |    2|  2|
    |    3|  2|
    |    1|  1|
    +-----+---+

    val s = spark.readStream.schema(spark.read.load(url).schema).parquet(url)
    s.writeStream.queryName("test").format("memory").start()

    sql("SELECT * FROM test").show

    +-----+----+
    |value|  id|
    +-----+----+
    |    2|null|
    |    3|null|
    |    1|null|
    +-----+----+

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #14803 from viirya/filestreamsource-option.
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")),

// Delete the two partition dirs
DeleteFile(partitionFooSubDir),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya why need to delete dirs in this test? It's flaky since the source maybe is listing files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed them in #15699

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing I remember it is used to simulate the partition is deleted and re-inserted data. Thanks for fixing this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants