File tree Expand file tree Collapse file tree 1 file changed +8
-2
lines changed
sql/core/src/main/scala/org/apache/spark/sql Expand file tree Collapse file tree 1 file changed +8
-2
lines changed Original file line number Diff line number Diff line change @@ -326,8 +326,14 @@ class SparkSession private(
326
326
* @since 2.0.0
327
327
*/
328
328
def createDataFrame (rdd : RDD [_], beanClass : Class [_]): DataFrame = {
329
- val encoder = Encoders .bean(beanClass).asInstanceOf [Encoder [AnyRef ]]
330
- Dataset .ofRows(self, ExistingRDD (rdd.asInstanceOf [RDD [AnyRef ]])(self)(encoder))
329
+ val attributeSeq : Seq [AttributeReference ] = getSchema(beanClass)
330
+ val className = beanClass.getName
331
+ val rowRdd = rdd.mapPartitions { iter =>
332
+ // BeanInfo is not serializable so we must rediscover it remotely for each partition.
333
+ val localBeanInfo = Introspector .getBeanInfo(Utils .classForName(className))
334
+ SQLContext .beansToRows(iter, localBeanInfo, attributeSeq)
335
+ }
336
+ Dataset .ofRows(self, LogicalRDD (attributeSeq, rowRdd)(self))
331
337
}
332
338
333
339
/**
You can’t perform that action at this time.
0 commit comments