Skip to content

[WIP][SQL] Add DataSourceSuite validating data sources limitations #19060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources

import java.sql.{Date, Timestamp}

import org.apache.orc.OrcConf

import org.apache.spark.sql.{Dataset, QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils

/**
* Data Source qualification as Apache Spark Data Sources.
* - Apache Spark Data Type Value Limits: CSV, JSON, ORC, Parquet
* - Predicate Push Down: ORC
*/
class DataSourceSuite
extends QueryTest
with SQLTestUtils
with TestHiveSingleton {

import testImplicits._

var df: Dataset[Row] = _

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set("spark.sql.session.timeZone", "GMT")

df = ((
false,
true,
Byte.MinValue,
Byte.MaxValue,
Short.MinValue,
Short.MaxValue,
Int.MinValue,
Int.MaxValue,
Long.MinValue,
Long.MaxValue,
Float.MinValue,
Float.MaxValue,
Double.MinValue,
Double.MaxValue,
Date.valueOf("0001-01-01"),
Date.valueOf("9999-12-31"),
new Timestamp(-62135769600000L), // 0001-01-01 00:00:00.000
new Timestamp(253402300799999L) // 9999-12-31 23:59:59.999
) :: Nil).toDF()
}

override def afterAll(): Unit = {
try {
spark.conf.unset("spark.sql.session.timeZone")
} finally {
super.afterAll()
}
}

Seq("parquet", "orc", "json", "csv").foreach { dataSource =>
test(s"$dataSource - data type value limit") {
withTempPath { dir =>
df.write.format(dataSource).save(dir.getCanonicalPath)

// Use the same schema for saving/loading
checkAnswer(
spark.read.format(dataSource).schema(df.schema).load(dir.getCanonicalPath),
df)

// Use schema inference, but skip text-based format due to its limitation
if (Seq("parquet", "orc").contains(dataSource)) {
withTable("tab1") {
sql(s"CREATE TABLE tab1 USING $dataSource LOCATION '${dir.toURI}'")
checkAnswer(sql(s"SELECT ${df.schema.fieldNames.mkString(",")} FROM tab1"), df)
}
}
}
}
}

Seq("orc").foreach { dataSource =>
test(s"$dataSource - predicate push down") {
withSQLConf(
SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
// write 4000 rows with the integer and the string in a single orc file with stride 1000
spark
.range(4000)
.map(i => (i, s"$i"))
.toDF("i", "s")
.repartition(1)
.write
.option(OrcConf.ROW_INDEX_STRIDE.getAttribute, 1000)
// TODO: Add Parquet option, too.
.format(dataSource)
.save(dir.getCanonicalPath)

val df = spark.read.format(dataSource).load(dir.getCanonicalPath)
.where(s"i BETWEEN 1500 AND 1999")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been tested in OrcFilterSuite and ParquetFilterSuite, right? What is the goal of this test case?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcFilterSuite and ParquetFilterSuite does not test stripe level filtering in a single file. It only filters out the file-level filtering, doesn't it?

This test suite is designed to prove that PPD on ORC data source doesn't filter out wrongly. Since ORC does not filter row-by-row, this PR check the all values of the returned stripe. All values in the stripe exist correctly in this test case.

IIRC, this is what you requested in SPARK-21783 Turn on ORC filter push-down by default. If OrcFilterSuite has been complete, I'm wondering what is the reason to block turning on ORC PPD by default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know why we did not turn it on by default. To me, to turn it on, we need to improve the coverage.

So far, the coverage of OrcFilterSuite and ParquetFilterSuite are not good. They only have very basic checks. Could you improve them?

For example, adding the boundary values for these predicate pushdown in both sides? We need to ensure whether the predicates are pushed down, executed in the underlying data sources, and the filters work properly (value comparison).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you on improving the coverage, @gatorsmile . So, I've started this and have been trying to improve this in order to find a way what you want.

Given that all the situations are the same with Parquet and ORC, what I suggested on #18991 is to turn on ORC PPD by default at least before 2.3.0 release. How do you think about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet is the default format. It is being used by most our Spark users. We already got many related JIRA issues and then fixed/blocked them. You were also involved in some of these PRs.

To avoid repeating the same issue in ORC, we should improve the coverage before turning on ORC predicate pushdown by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Every JIRA issue in Apache Spark should have a test case to be resolved. If then, we can focus on ORC test case parity for Parquet. Adding more test cases is beyond of that scope.

    We already got many related JIRA issues and then fixed/blocked them.

  2. Yes. ORC is not a default format. Even worse, although a user try to use '.orc(...)`, PPD is not used by default, too. We cannot find any issues on PPD. It doesn't form a virtuous cycle at all. To be honest with you, I trust the efforts on Apache ORC community.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have different philosophy. I worked for mission-critical enterprise software. I do not trust anybody's codes unless they can pass the corresponding tests.

The initial ORC PPD does not have enough test coverage. To enable it by default, we should improve the test coverage now. The new test case framework for PPD should cover all the sources. We can have source-speicific ones, if it is not applicable to the other sources.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. This PR is based on the following your guidelines and I wish that it will continue to evolve. :)

  • Handle all data sources (For value ranges, I tested on Spark min/max instead of data source min/max)
  • Use high-level end-to-end test case

For PPD, Parquet and ORC are the only ones supporting PPD, aren't they?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we didn't agree is only the corresponding tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, Parquet and ORC are the only built-in sources that support PPD. From the viewpoints of software development, we should make the test framework easily extensible.

Sorry, I do not think it covers what we need. First, to verify whether the underlying data source works properly, we need to have the corresponding (boundary) data (which are inserted to the sources), the predicates (which are using boundary values), and also need to ensure these predicates are pushed down and not evaluated by Spark.

// skip over the rows except 1000 to 2000
val answer = spark.range(1000, 2000).map(i => (i, s"$i")).toDF("i", "s")
checkAnswer(stripSparkFilter(df), answer)
}
}
}
}
}