@@ -22,37 +22,37 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
22
22
import org .apache .hadoop .conf .{Configurable , Configuration }
23
23
import org .apache .hadoop .io .Writable
24
24
import org .apache .hadoop .mapreduce .{JobContext , InputSplit , Job }
25
+ import org .apache .spark .sql .catalyst .expressions .codegen .GeneratePredicate
26
+
25
27
import parquet .hadoop .ParquetInputFormat
26
28
import parquet .hadoop .util .ContextUtil
27
29
28
30
import org .apache .spark .annotation .DeveloperApi
29
31
import org .apache .spark .{Partition => SparkPartition , Logging }
30
32
import org .apache .spark .rdd .{NewHadoopPartition , RDD }
31
- import org .apache .spark .sql .catalyst .expressions .codegen .GeneratePredicate
33
+
34
+ import org .apache .spark .sql .{SQLConf , Row , SQLContext }
32
35
import org .apache .spark .sql .catalyst .expressions ._
33
- import org .apache .spark .sql .catalyst .types .{IntegerType , StructField , StructType }
36
+ import org .apache .spark .sql .catalyst .types .{StringType , IntegerType , StructField , StructType }
34
37
import org .apache .spark .sql .sources ._
35
- import org .apache .spark .sql .{SQLConf , SQLContext }
36
38
37
39
import scala .collection .JavaConversions ._
38
40
39
-
40
41
/**
41
42
* Allows creation of parquet based tables using the syntax
42
43
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
43
44
* required is `path`, which should be the location of a collection of, optionally partitioned,
44
45
* parquet files.
45
46
*/
46
- class DefaultSource extends SchemaRelationProvider {
47
+ class DefaultSource extends RelationProvider {
47
48
/** Returns a new base relation with the given parameters. */
48
49
override def createRelation (
49
50
sqlContext : SQLContext ,
50
- parameters : Map [String , String ],
51
- schema : Option [StructType ]): BaseRelation = {
51
+ parameters : Map [String , String ]): BaseRelation = {
52
52
val path =
53
53
parameters.getOrElse(" path" , sys.error(" 'path' must be specified for parquet tables." ))
54
54
55
- ParquetRelation2 (path, schema )(sqlContext)
55
+ ParquetRelation2 (path)(sqlContext)
56
56
}
57
57
}
58
58
@@ -82,9 +82,7 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files:
82
82
* discovery.
83
83
*/
84
84
@ DeveloperApi
85
- case class ParquetRelation2 (
86
- path : String ,
87
- userSpecifiedSchema : Option [StructType ])(@ transient val sqlContext : SQLContext )
85
+ case class ParquetRelation2 (path : String )(@ transient val sqlContext : SQLContext )
88
86
extends CatalystScan with Logging {
89
87
90
88
def sparkContext = sqlContext.sparkContext
@@ -135,13 +133,12 @@ case class ParquetRelation2(
135
133
136
134
override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum
137
135
138
- val dataSchema = userSpecifiedSchema.getOrElse(
139
- StructType .fromAttributes( // TODO: Parquet code should not deal with attributes.
140
- ParquetTypesConverter .readSchemaFromFile(
141
- partitions.head.files.head.getPath,
142
- Some (sparkContext.hadoopConfiguration),
143
- sqlContext.isParquetBinaryAsString))
144
- )
136
+ val dataSchema = StructType .fromAttributes( // TODO: Parquet code should not deal with attributes.
137
+ ParquetTypesConverter .readSchemaFromFile(
138
+ partitions.head.files.head.getPath,
139
+ Some (sparkContext.hadoopConfiguration),
140
+ sqlContext.isParquetBinaryAsString))
141
+
145
142
val dataIncludesKey =
146
143
partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true )
147
144
0 commit comments