-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-17153][SQL] Should read partition data when reading new files in filestream without globbing #14803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @marmbrus |
Test build #64405 has finished for PR 14803 at commit
|
…option Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -129,13 +129,20 @@ class FileStreamSource( | |||
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2) | |||
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId") | |||
logTrace(s"Files are:\n\t" + files.mkString("\n\t")) | |||
val newOptions = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend putting this check for globs into the initialization code at the top of this file that sets qualifiedBasePath
(currently lines 47-50). That way all the code that interprets the meaning of the path
parameter will be in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
Test build #64565 has finished for PR 14803 at commit
|
Test build #64650 has finished for PR 14803 at commit
|
LGTM |
…option Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
I'm not so familiar with the logic here but tend to trust your analysis and the other positive review. |
format.inferSchema( | ||
sparkSession, | ||
caseInsensitiveOptions, | ||
fileCatalog.allFiles()) | ||
fileCatalog.allFiles()).map { inferredSchema => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wrapping threw me off for a minute. You might consider introducing an extra variable for the result of inferSchema. Also does .map(partitionCols.foldLeft(_)((struct, field) => struct.add(field)))
work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change it to use @marmbrus's version below.
@@ -49,6 +49,14 @@ class FileStreamSource( | |||
fs.makeQualified(new Path(path)) // can contains glob patterns | |||
} | |||
|
|||
private val optionsWithPartitionBasePath = if (!SparkHadoopUtil.get.isGlobPath(new Path(path))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also just a style thing but is this more direct?
private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++
if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path") {
Map("basePath" -> path)
} else {
Map()
}
Not sure, just avoided some repetition but is about the same amount of code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Looks better.
Test build #65595 has finished for PR 14803 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this seems reasonable to me. I am a little concerned about the user experience in a few edge cases, so we should probably add a section in the programming guide. In particular:
- If the partition directories are not present when the stream starts then I believe this breaks.
- I think that for all but
text
you have to include the partition columns in the schema if inference is turned off (which it is by default).
Are there any others you can think of?
format.inferSchema( | ||
sparkSession, | ||
caseInsensitiveOptions, | ||
fileCatalog.allFiles()) | ||
fileCatalog.allFiles()).map { inferredSchema => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I think this is much easier to understand as:
val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
val inferred = format.inferSchema(
sparkSession,
caseInsensitiveOptions,
fileCatalog.allFiles())
inferred.map { inferredSchema =>
StructType(inferredSchema ++ partitionCols)
}
@@ -608,6 +608,34 @@ class FileStreamSourceSuite extends FileStreamSourceTest { | |||
|
|||
// =============== other tests ================ | |||
|
|||
test("read new files in partitioned table without globbing, should read partition data") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably also have an explicit test for the case where schema inference is turned on (you implicitly test it some with the code changed below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test for it.
Yes. Schema inference only happens when starting the stream.
I will update this to the programming guide. |
For text format, when inference is turned off but there is user provided schema, we will use the schema. In this case, I think user should also include the partition columns in the schema, right? |
Test build #65623 has finished for PR 14803 at commit
|
Test build #65625 has finished for PR 14803 at commit
|
Test build #65627 has finished for PR 14803 at commit
|
ping @marmbrus Can you take another look? And I have a question about the suggested programming guide change. Can you clarify it too? Thanks! |
Test build #65764 has finished for PR 14803 at commit
|
@@ -512,6 +512,10 @@ csvDF = spark \ | |||
|
|||
These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the [SQL Programming Guide](sql-programming-guide.html) for more details. Additionally, more details on the supported streaming sources are discussed later in the document. | |||
|
|||
### Schema inference and partition of streaming DataFrames/Datasets | |||
|
|||
You can specify the schema for streaming DataFrames/Datasets to create with the API as shown in above example (i.e., `userSchema`). Alternatively, for file-based streaming source, you can config it to infer the schema. By default, the configure of streaming schema inference `spark.sql.streaming.schemaInference` is turned off. If the streaming DataFrame/Dataset is partitioned, the partition columns will only be inferred if the partition directories are present when the stream starts. When schema inference is turned off, for all file-based streaming sources except for `text` format, you have to include partition columns in the user provided schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference
to true
.
Partition discovery does occur when subdirectories that are named /key=value/
are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/
when /data/year=2015/
was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/
).
Mostly looks good, I've also asked @tdas to take a look since he wrote this initially. A few more cases came to mind while while I was rephrasing your documentation. Specifically,
|
The error is:
I enhanced the added test to test this. It okay, if I understand your point correctly here. |
Test build #65812 has finished for PR 14803 at commit
|
@@ -512,6 +512,12 @@ csvDF = spark \ | |||
|
|||
These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the [SQL Programming Guide](sql-programming-guide.html) for more details. Additionally, more details on the supported streaming sources are discussed later in the document. | |||
|
|||
### Schema inference and partition of streaming DataFrames/Datasets | |||
|
|||
By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting `spark.sql.streaming.schemaInference` to `true`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema inference can lead to many corner cases regarding if the inferred schema is different after restart. So I think we should use a stronger language that schema inference is not advisable in production uses.
Thanks, I'm going to merge this to master. |
I'm going to merge this one to branch 2.0 since it only changes Structured Streaming. |
…in filestream without globbing ## What changes were proposed in this pull request? When reading file stream with non-globbing path, the results return data with all `null`s for the partitioned columns. E.g., case class A(id: Int, value: Int) val data = spark.createDataset(Seq( A(1, 1), A(2, 2), A(2, 3)) ) val url = "/tmp/test" data.write.partitionBy("id").parquet(url) spark.read.parquet(url).show +-----+---+ |value| id| +-----+---+ | 2| 2| | 3| 2| | 1| 1| +-----+---+ val s = spark.readStream.schema(spark.read.load(url).schema).parquet(url) s.writeStream.queryName("test").format("memory").start() sql("SELECT * FROM test").show +-----+----+ |value| id| +-----+----+ | 2|null| | 3|null| | 1|null| +-----+----+ ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #14803 from viirya/filestreamsource-option.
CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")), | ||
|
||
// Delete the two partition dirs | ||
DeleteFile(partitionFooSubDir), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya why need to delete dirs in this test? It's flaky since the source maybe is listing files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed them in #15699
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing I remember it is used to simulate the partition is deleted and re-inserted data. Thanks for fixing this!
What changes were proposed in this pull request?
When reading file stream with non-globbing path, the results return data with all
null
s for thepartitioned columns. E.g.,
How was this patch tested?
Jenkins tests.