@@ -22,7 +22,10 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
22
22
import scala .util .parsing .combinator .PackratParsers
23
23
24
24
import org .apache .spark .Logging
25
+ import org .apache .spark .sql .SQLContext
25
26
import org .apache .spark .sql .catalyst .types ._
27
+ import org .apache .spark .sql .execution .RunnableCommand
28
+ import org .apache .spark .util .Utils
26
29
import org .apache .spark .sql .catalyst .plans .logical ._
27
30
import org .apache .spark .sql .catalyst .SqlLexical
28
31
@@ -179,6 +182,44 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
179
182
}
180
183
}
181
184
185
+ private [sql] case class CreateTableUsing (
186
+ tableName : String ,
187
+ tableCols : Seq [StructField ],
188
+ provider : String ,
189
+ options : Map [String , String ]) extends RunnableCommand {
190
+
191
+ def run (sqlContext : SQLContext ) = {
192
+ val loader = Utils .getContextOrSparkClassLoader
193
+ val clazz : Class [_] = try loader.loadClass(provider) catch {
194
+ case cnf : java.lang.ClassNotFoundException =>
195
+ try loader.loadClass(provider + " .DefaultSource" ) catch {
196
+ case cnf : java.lang.ClassNotFoundException =>
197
+ sys.error(s " Failed to load class for data source: $provider" )
198
+ }
199
+ }
200
+ val relation = clazz.newInstance match {
201
+ case dataSource : org.apache.spark.sql.sources.RelationProvider =>
202
+ dataSource
203
+ .asInstanceOf [org.apache.spark.sql.sources.RelationProvider ]
204
+ .createRelation(sqlContext, new CaseInsensitiveMap (options))
205
+ case dataSource : org.apache.spark.sql.sources.SchemaRelationProvider =>
206
+ if (tableCols.isEmpty) {
207
+ dataSource
208
+ .asInstanceOf [org.apache.spark.sql.sources.SchemaRelationProvider ]
209
+ .createRelation(sqlContext, new CaseInsensitiveMap (options))
210
+ } else {
211
+ dataSource
212
+ .asInstanceOf [org.apache.spark.sql.sources.SchemaRelationProvider ]
213
+ .createRelation(
214
+ sqlContext, new CaseInsensitiveMap (options), Some (StructType (tableCols)))
215
+ }
216
+ }
217
+
218
+ sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
219
+ Seq .empty
220
+ }
221
+ }
222
+
182
223
/**
183
224
* Builds a map in which keys are case insensitive
184
225
*/
0 commit comments