Skip to content

Commit 1f39a61

Browse files
lazyman500marmbrus
authored andcommitted
[Spark-5068][SQL]Fix bug query data when path doesn't exist for HiveContext
This PR follow up PR apache#3907 & apache#3891 & apache#4356. According to marmbrus liancheng 's comments, I try to use fs.globStatus to retrieve all FileStatus objects under path(s), and then do the filtering locally. [1]. get pathPattern by path, and put it into pathPatternSet. (hdfs://cluster/user/demo/2016/08/12 -> hdfs://cluster/user/demo/*/*/*) [2]. retrieve all FileStatus objects ,and cache them by undating existPathSet. [3]. do the filtering locally [4]. if we have new pathPattern,do 1,2 step again. (external table maybe have more than one partition pathPattern) chenghao-intel jeanlyn Author: lazymam500 <lazyman500@gmail.com> Author: lazyman <lazyman500@gmail.com> Closes apache#5059 from lazyman500/SPARK-5068 and squashes the following commits: 5bfcbfd [lazyman] move spark.sql.hive.verifyPartitionPath to SQLConf,fix scala style e1d6386 [lazymam500] fix scala style f23133f [lazymam500] bug fix 47e0023 [lazymam500] fix scala style,add config flag,break the chaining 04c443c [lazyman] SPARK-5068: fix bug when partition path doesn't exists #2 41f60ce [lazymam500] Merge pull request #1 from apache/master
1 parent 2f53588 commit 1f39a61

File tree

3 files changed

+110
-1
lines changed

3 files changed

+110
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ private[spark] object SQLConf {
3939
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
4040
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
4141

42+
val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
43+
4244
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
4345
val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
4446

@@ -119,6 +121,10 @@ private[sql] class SQLConf extends Serializable {
119121
private[spark] def parquetUseDataSourceApi =
120122
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
121123

124+
/** When true uses verifyPartitionPath to prune the path which is not exists. */
125+
private[spark] def verifyPartitionPath =
126+
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
127+
122128
/** When true the planner will use the external sort, which may spill to disk. */
123129
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
124130

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,46 @@ class HadoopTableReader(
142142
partitionToDeserializer: Map[HivePartition,
143143
Class[_ <: Deserializer]],
144144
filterOpt: Option[PathFilter]): RDD[Row] = {
145-
val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
145+
146+
// SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
147+
def verifyPartitionPath(
148+
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
149+
Map[HivePartition, Class[_ <: Deserializer]] = {
150+
if (!sc.conf.verifyPartitionPath) {
151+
partitionToDeserializer
152+
} else {
153+
var existPathSet = collection.mutable.Set[String]()
154+
var pathPatternSet = collection.mutable.Set[String]()
155+
partitionToDeserializer.filter {
156+
case (partition, partDeserializer) =>
157+
def updateExistPathSetByPathPattern(pathPatternStr: String) {
158+
val pathPattern = new Path(pathPatternStr)
159+
val fs = pathPattern.getFileSystem(sc.hiveconf)
160+
val matches = fs.globStatus(pathPattern)
161+
matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
162+
}
163+
// convert /demo/data/year/month/day to /demo/data/*/*/*/
164+
def getPathPatternByPath(parNum: Int, tempPath: Path): String = {
165+
var path = tempPath
166+
for (i <- (1 to parNum)) path = path.getParent
167+
val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/")
168+
path.toString + tails
169+
}
170+
171+
val partPath = HiveShim.getDataLocationPath(partition)
172+
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
173+
var pathPatternStr = getPathPatternByPath(partNum, partPath)
174+
if (!pathPatternSet.contains(pathPatternStr)) {
175+
pathPatternSet += pathPatternStr
176+
updateExistPathSetByPathPattern(pathPatternStr)
177+
}
178+
existPathSet.contains(partPath.toString)
179+
}
180+
}
181+
}
182+
183+
val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
184+
.map { case (partition, partDeserializer) =>
146185
val partDesc = Utilities.getPartitionDesc(partition)
147186
val partPath = HiveShim.getDataLocationPath(partition)
148187
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import java.io.File
21+
22+
import com.google.common.io.Files
23+
import org.apache.spark.sql.{QueryTest, _}
24+
import org.apache.spark.sql.hive.test.TestHive
25+
import org.apache.spark.util.Utils
26+
/* Implicits */
27+
import org.apache.spark.sql.hive.test.TestHive._
28+
29+
30+
31+
class QueryPartitionSuite extends QueryTest {
32+
import org.apache.spark.sql.hive.test.TestHive.implicits._
33+
34+
test("SPARK-5068: query data when path doesn't exists"){
35+
val testData = TestHive.sparkContext.parallelize(
36+
(1 to 10).map(i => TestData(i, i.toString))).toDF()
37+
testData.registerTempTable("testData")
38+
39+
val tmpDir = Files.createTempDir()
40+
//create the table for test
41+
sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
42+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")
43+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') SELECT key,value FROM testData")
44+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') SELECT key,value FROM testData")
45+
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') SELECT key,value FROM testData")
46+
47+
//test for the exist path
48+
checkAnswer(sql("select key,value from table_with_partition"),
49+
testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect
50+
++ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect)
51+
52+
//delect the path of one partition
53+
val folders = tmpDir.listFiles.filter(_.isDirectory)
54+
Utils.deleteRecursively(folders(0))
55+
56+
//test for affter delete the path
57+
checkAnswer(sql("select key,value from table_with_partition"),
58+
testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect
59+
++ testData.toSchemaRDD.collect)
60+
61+
sql("DROP TABLE table_with_partition")
62+
sql("DROP TABLE createAndInsertTest")
63+
}
64+
}

0 commit comments

Comments
 (0)