-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[WIP][SQL] Add DataSourceSuite validating data sources limitations #19060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
0bf9ad2
[SPARK-XXX][SQL] Add DataSource suite validating data sources limitat…
dongjoon-hyun 16031ac
Use TestHiveSingleton.
dongjoon-hyun 104f24c
Add JSON/CSV format and split user-defined schema and inferSchema.
dongjoon-hyun 4466325
Update ORC PPD test case more general.
dongjoon-hyun fb72c89
Address comments.
dongjoon-hyun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
125 changes: 125 additions & 0 deletions
125
sql/hive/src/test/scala/org/apache/spark/sql/sources/DataSourceSuite.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources | ||
|
||
import java.sql.{Date, Timestamp} | ||
|
||
import org.apache.orc.OrcConf | ||
|
||
import org.apache.spark.sql.{Dataset, QueryTest, Row} | ||
import org.apache.spark.sql.hive.test.TestHiveSingleton | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.test.SQLTestUtils | ||
|
||
/** | ||
* Data Source qualification as Apache Spark Data Sources. | ||
* - Apache Spark Data Type Value Limits: CSV, JSON, ORC, Parquet | ||
* - Predicate Push Down: ORC | ||
*/ | ||
class DataSourceSuite | ||
extends QueryTest | ||
with SQLTestUtils | ||
with TestHiveSingleton { | ||
|
||
import testImplicits._ | ||
|
||
var df: Dataset[Row] = _ | ||
|
||
override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
spark.conf.set("spark.sql.session.timeZone", "GMT") | ||
|
||
df = (( | ||
false, | ||
true, | ||
Byte.MinValue, | ||
Byte.MaxValue, | ||
Short.MinValue, | ||
Short.MaxValue, | ||
Int.MinValue, | ||
Int.MaxValue, | ||
Long.MinValue, | ||
Long.MaxValue, | ||
Float.MinValue, | ||
Float.MaxValue, | ||
Double.MinValue, | ||
Double.MaxValue, | ||
Date.valueOf("0001-01-01"), | ||
Date.valueOf("9999-12-31"), | ||
new Timestamp(-62135769600000L), // 0001-01-01 00:00:00.000 | ||
new Timestamp(253402300799999L) // 9999-12-31 23:59:59.999 | ||
) :: Nil).toDF() | ||
} | ||
|
||
override def afterAll(): Unit = { | ||
try { | ||
spark.conf.unset("spark.sql.session.timeZone") | ||
} finally { | ||
super.afterAll() | ||
} | ||
} | ||
|
||
Seq("parquet", "orc", "json", "csv").foreach { dataSource => | ||
test(s"$dataSource - data type value limit") { | ||
withTempPath { dir => | ||
df.write.format(dataSource).save(dir.getCanonicalPath) | ||
|
||
// Use the same schema for saving/loading | ||
checkAnswer( | ||
spark.read.format(dataSource).schema(df.schema).load(dir.getCanonicalPath), | ||
df) | ||
|
||
// Use schema inference, but skip text-based format due to its limitation | ||
if (Seq("parquet", "orc").contains(dataSource)) { | ||
withTable("tab1") { | ||
sql(s"CREATE TABLE tab1 USING $dataSource LOCATION '${dir.toURI}'") | ||
checkAnswer(sql(s"SELECT ${df.schema.fieldNames.mkString(",")} FROM tab1"), df) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
Seq("orc").foreach { dataSource => | ||
test(s"$dataSource - predicate push down") { | ||
withSQLConf( | ||
SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true", | ||
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { | ||
withTempPath { dir => | ||
// write 4000 rows with the integer and the string in a single orc file with stride 1000 | ||
spark | ||
.range(4000) | ||
.map(i => (i, s"$i")) | ||
.toDF("i", "s") | ||
.repartition(1) | ||
.write | ||
.option(OrcConf.ROW_INDEX_STRIDE.getAttribute, 1000) | ||
// TODO: Add Parquet option, too. | ||
.format(dataSource) | ||
.save(dir.getCanonicalPath) | ||
|
||
val df = spark.read.format(dataSource).load(dir.getCanonicalPath) | ||
.where(s"i BETWEEN 1500 AND 1999") | ||
// skip over the rows except 1000 to 2000 | ||
val answer = spark.range(1000, 2000).map(i => (i, s"$i")).toDF("i", "s") | ||
checkAnswer(stripSparkFilter(df), answer) | ||
} | ||
} | ||
} | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been tested in
OrcFilterSuite
andParquetFilterSuite
, right? What is the goal of this test case?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OrcFilterSuite
andParquetFilterSuite
does not teststripe
level filtering in a single file. It only filters out the file-level filtering, doesn't it?This test suite is designed to prove that PPD on ORC data source doesn't filter out wrongly. Since ORC does not filter row-by-row, this PR check the all values of the returned stripe. All values in the stripe exist correctly in this test case.
IIRC, this is what you requested in SPARK-21783 Turn on ORC filter push-down by default. If
OrcFilterSuite
has been complete, I'm wondering what is the reason to block turning onORC PPD
by default.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not know why we did not turn it on by default. To me, to turn it on, we need to improve the coverage.
So far, the coverage of
OrcFilterSuite
andParquetFilterSuite
are not good. They only have very basic checks. Could you improve them?For example, adding the boundary values for these predicate pushdown in both sides? We need to ensure whether the predicates are pushed down, executed in the underlying data sources, and the filters work properly (value comparison).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you on improving the coverage, @gatorsmile . So, I've started this and have been trying to improve this in order to find a way what you want.
Given that all the situations are the same with Parquet and ORC, what I suggested on #18991 is to turn on ORC PPD by default at least before 2.3.0 release. How do you think about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet is the default format. It is being used by most our Spark users. We already got many related JIRA issues and then fixed/blocked them. You were also involved in some of these PRs.
To avoid repeating the same issue in ORC, we should improve the coverage before turning on ORC predicate pushdown by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every JIRA issue in Apache Spark should have a test case to be resolved. If then, we can focus on ORC test case parity for Parquet. Adding more test cases is beyond of that scope.
Yes. ORC is not a default format. Even worse, although a user try to use '.orc(...)`, PPD is not used by default, too. We cannot find any issues on PPD. It doesn't form a virtuous cycle at all. To be honest with you, I trust the efforts on Apache ORC community.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have different philosophy. I worked for mission-critical enterprise software. I do not trust anybody's codes unless they can pass the corresponding tests.
The initial ORC PPD does not have enough test coverage. To enable it by default, we should improve the test coverage now. The new test case framework for PPD should cover all the sources. We can have source-speicific ones, if it is not applicable to the other sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. This PR is based on the following your guidelines and I wish that it will continue to evolve. :)
For PPD, Parquet and ORC are the only ones supporting PPD, aren't they?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we didn't agree is only the corresponding tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far, Parquet and ORC are the only built-in sources that support PPD. From the viewpoints of software development, we should make the test framework easily extensible.
Sorry, I do not think it covers what we need. First, to verify whether the underlying data source works properly, we need to have the corresponding (boundary) data (which are inserted to the sources), the predicates (which are using boundary values), and also need to ensure these predicates are pushed down and not evaluated by Spark.