Skip to content

Commit 1eeb769

Browse files
committed
Merge pull request #20 from yhuai/pr3431yin
Minor refactoring and test update.
2 parents b621c8f + f5c22b0 commit 1eeb769

File tree

7 files changed

+231
-234
lines changed

7 files changed

+231
-234
lines changed

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ private[sql] class DefaultSource extends SchemaRelationProvider {
2626
override def createRelation(
2727
sqlContext: SQLContext,
2828
parameters: Map[String, String],
29-
schema: Option[StructType] = None): BaseRelation = {
29+
schema: Option[StructType]): BaseRelation = {
3030
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
3131
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
3232

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class DefaultSource extends SchemaRelationProvider {
4848
override def createRelation(
4949
sqlContext: SQLContext,
5050
parameters: Map[String, String],
51-
schema: Option[StructType] = None): BaseRelation = {
51+
schema: Option[StructType]): BaseRelation = {
5252
val path =
5353
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))
5454

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 0 additions & 61 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
2222
import scala.util.parsing.combinator.PackratParsers
2323

2424
import org.apache.spark.Logging
25+
import org.apache.spark.sql.SQLContext
2526
import org.apache.spark.sql.catalyst.types._
27+
import org.apache.spark.sql.execution.RunnableCommand
28+
import org.apache.spark.util.Utils
2629
import org.apache.spark.sql.catalyst.plans.logical._
2730
import org.apache.spark.sql.catalyst.SqlLexical
2831

@@ -61,14 +64,14 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
6164

6265
// Data types.
6366
protected val STRING = Keyword("STRING")
64-
protected val FLOAT = Keyword("FLOAT")
65-
protected val INT = Keyword("INT")
67+
protected val BINARY = Keyword("BINARY")
68+
protected val BOOLEAN = Keyword("BOOLEAN")
6669
protected val TINYINT = Keyword("TINYINT")
6770
protected val SMALLINT = Keyword("SMALLINT")
68-
protected val DOUBLE = Keyword("DOUBLE")
71+
protected val INT = Keyword("INT")
6972
protected val BIGINT = Keyword("BIGINT")
70-
protected val BINARY = Keyword("BINARY")
71-
protected val BOOLEAN = Keyword("BOOLEAN")
73+
protected val FLOAT = Keyword("FLOAT")
74+
protected val DOUBLE = Keyword("DOUBLE")
7275
protected val DECIMAL = Keyword("DECIMAL")
7376
protected val DATE = Keyword("DATE")
7477
protected val TIMESTAMP = Keyword("TIMESTAMP")
@@ -102,8 +105,8 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
102105
CREATE ~ TEMPORARY ~ TABLE ~> ident
103106
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
104107
case tableName ~ columns ~ provider ~ opts =>
105-
val tblColumns = if(columns.isEmpty) Seq.empty else columns.get
106-
CreateTableUsing(tableName, tblColumns, provider, opts)
108+
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
109+
CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
107110
}
108111
)
109112

@@ -179,6 +182,37 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
179182
}
180183
}
181184

185+
private[sql] case class CreateTableUsing(
186+
tableName: String,
187+
userSpecifiedSchema: Option[StructType],
188+
provider: String,
189+
options: Map[String, String]) extends RunnableCommand {
190+
191+
def run(sqlContext: SQLContext) = {
192+
val loader = Utils.getContextOrSparkClassLoader
193+
val clazz: Class[_] = try loader.loadClass(provider) catch {
194+
case cnf: java.lang.ClassNotFoundException =>
195+
try loader.loadClass(provider + ".DefaultSource") catch {
196+
case cnf: java.lang.ClassNotFoundException =>
197+
sys.error(s"Failed to load class for data source: $provider")
198+
}
199+
}
200+
val relation = clazz.newInstance match {
201+
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
202+
dataSource
203+
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
204+
.createRelation(sqlContext, new CaseInsensitiveMap(options))
205+
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
206+
dataSource
207+
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
208+
.createRelation(sqlContext, new CaseInsensitiveMap(options), userSpecifiedSchema)
209+
}
210+
211+
sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
212+
Seq.empty
213+
}
214+
}
215+
182216
/**
183217
* Builds a map in which keys are case insensitive
184218
*/

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ trait SchemaRelationProvider {
6868
def createRelation(
6969
sqlContext: SQLContext,
7070
parameters: Map[String, String],
71-
schema: Option[StructType] = None): BaseRelation
71+
schema: Option[StructType]): BaseRelation
7272
}
7373

7474
/**

sql/core/src/test/scala/org/apache/spark/sql/sources/NewTableScanSuite.scala

Lines changed: 0 additions & 163 deletions
This file was deleted.

0 commit comments

Comments
 (0)