Skip to content

Commit

Permalink
Fixed toDataFrame.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jan 28, 2015
1 parent 6545c42 commit e351cb2
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object DatasetExample {
println(s"Loaded ${origData.count()} instances from file: ${params.input}")

// Convert input data to DataFrame explicitly.
val df: DataFrame = origData.toDF
val df: DataFrame = origData.toDataFrame
println(s"Inferred schema:\n${df.schema.prettyJson}")
println(s"Converted to DataFrame with ${df.count()} records")

Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class DataFrame protected[sql](
* An implicit conversion function internal to this class for us to avoid doing
* "new DataFrame(...)" everywhere.
*/
private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): DataFrame = {
private implicit def logicalPlanToDataFrame(logicalPlan: LogicalPlan): DataFrame = {
new DataFrame(sqlContext, logicalPlan, true)
}

Expand All @@ -130,7 +130,7 @@ class DataFrame protected[sql](
/**
* Return the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala.
*/
def toDF: DataFrame = this
def toDataFrame: DataFrame = this

/** Return the schema of this [[DataFrame]]. */
override def schema: StructType = queryExecution.analyzed.schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ColumnExpressionSuite extends QueryTest {

ignore("star qualified by data frame object") {
// This is not yet supported.
val df = testData.toDF
val df = testData.toDataFrame
checkAnswer(df.select(df("*")), df.collect().toSeq)
}

Expand Down Expand Up @@ -106,13 +106,13 @@ class ColumnExpressionSuite extends QueryTest {

test("isNull") {
checkAnswer(
nullStrings.toDF.where($"s".isNull),
nullStrings.toDataFrame.where($"s".isNull),
nullStrings.collect().toSeq.filter(r => r.getString(1) eq null))
}

test("isNotNull") {
checkAnswer(
nullStrings.toDF.where($"s".isNotNull),
nullStrings.toDataFrame.where($"s".isNotNull),
nullStrings.collect().toSeq.filter(r => r.getString(1) ne null))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,19 @@ class DataFrameSuite extends QueryTest {

checkAnswer(
arrayData.orderBy('data.getItem(0).asc),
arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq)
arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq)

checkAnswer(
arrayData.orderBy('data.getItem(0).desc),
arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq)
arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq)

checkAnswer(
arrayData.orderBy('data.getItem(1).asc),
arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq)
arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq)

checkAnswer(
arrayData.orderBy('data.getItem(1).desc),
arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq)
arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq)
}

test("limit") {
Expand Down
22 changes: 11 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ case class TestData(key: Int, value: String)

object TestData {
val testData = TestSQLContext.sparkContext.parallelize(
(1 to 100).map(i => TestData(i, i.toString))).toDF
(1 to 100).map(i => TestData(i, i.toString))).toDataFrame
testData.registerTempTable("testData")

val negativeData = TestSQLContext.sparkContext.parallelize(
(1 to 100).map(i => TestData(-i, (-i).toString))).toDF
(1 to 100).map(i => TestData(-i, (-i).toString))).toDataFrame
negativeData.registerTempTable("negativeData")

case class LargeAndSmallInts(a: Int, b: Int)
Expand All @@ -45,7 +45,7 @@ object TestData {
LargeAndSmallInts(2147483645, 1) ::
LargeAndSmallInts(2, 2) ::
LargeAndSmallInts(2147483646, 1) ::
LargeAndSmallInts(3, 2) :: Nil).toDF
LargeAndSmallInts(3, 2) :: Nil).toDataFrame
largeAndSmallInts.registerTempTable("largeAndSmallInts")

case class TestData2(a: Int, b: Int)
Expand All @@ -56,7 +56,7 @@ object TestData {
TestData2(2, 1) ::
TestData2(2, 2) ::
TestData2(3, 1) ::
TestData2(3, 2) :: Nil, 2).toDF
TestData2(3, 2) :: Nil, 2).toDataFrame
testData2.registerTempTable("testData2")

case class DecimalData(a: BigDecimal, b: BigDecimal)
Expand All @@ -68,7 +68,7 @@ object TestData {
DecimalData(2, 1) ::
DecimalData(2, 2) ::
DecimalData(3, 1) ::
DecimalData(3, 2) :: Nil).toDF
DecimalData(3, 2) :: Nil).toDataFrame
decimalData.registerTempTable("decimalData")

case class BinaryData(a: Array[Byte], b: Int)
Expand All @@ -78,14 +78,14 @@ object TestData {
BinaryData("22".getBytes(), 5) ::
BinaryData("122".getBytes(), 3) ::
BinaryData("121".getBytes(), 2) ::
BinaryData("123".getBytes(), 4) :: Nil).toDF
BinaryData("123".getBytes(), 4) :: Nil).toDataFrame
binaryData.registerTempTable("binaryData")

case class TestData3(a: Int, b: Option[Int])
val testData3 =
TestSQLContext.sparkContext.parallelize(
TestData3(1, None) ::
TestData3(2, Some(2)) :: Nil).toDF
TestData3(2, Some(2)) :: Nil).toDataFrame
testData3.registerTempTable("testData3")

val emptyTableData = logical.LocalRelation($"a".int, $"b".int)
Expand All @@ -98,7 +98,7 @@ object TestData {
UpperCaseData(3, "C") ::
UpperCaseData(4, "D") ::
UpperCaseData(5, "E") ::
UpperCaseData(6, "F") :: Nil).toDF
UpperCaseData(6, "F") :: Nil).toDataFrame
upperCaseData.registerTempTable("upperCaseData")

case class LowerCaseData(n: Int, l: String)
Expand All @@ -107,7 +107,7 @@ object TestData {
LowerCaseData(1, "a") ::
LowerCaseData(2, "b") ::
LowerCaseData(3, "c") ::
LowerCaseData(4, "d") :: Nil).toDF
LowerCaseData(4, "d") :: Nil).toDataFrame
lowerCaseData.registerTempTable("lowerCaseData")

case class ArrayData(data: Seq[Int], nestedData: Seq[Seq[Int]])
Expand Down Expand Up @@ -161,7 +161,7 @@ object TestData {
TestSQLContext.sparkContext.parallelize(
NullStrings(1, "abc") ::
NullStrings(2, "ABC") ::
NullStrings(3, null) :: Nil).toDF
NullStrings(3, null) :: Nil).toDataFrame
nullStrings.registerTempTable("nullStrings")

case class TableName(tableName: String)
Expand Down Expand Up @@ -201,6 +201,6 @@ object TestData {
TestSQLContext.sparkContext.parallelize(
ComplexData(Map(1 -> "1"), TestData(1, "1"), Seq(1), true)
:: ComplexData(Map(2 -> "2"), TestData(2, "2"), Seq(2), false)
:: Nil).toDF
:: Nil).toDataFrame
complexData.registerTempTable("complexData")
}
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ class JsonSuite extends QueryTest {

val df1 = applySchema(rowRDD1, schema1)
df1.registerTempTable("applySchema1")
val df2 = df1.toDF
val df2 = df1.toDataFrame
val result = df2.toJSON.collect()
assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
Expand All @@ -842,7 +842,7 @@ class JsonSuite extends QueryTest {

val df3 = applySchema(rowRDD2, schema2)
df3.registerTempTable("applySchema2")
val df4 = df3.toDF
val df4 = df3.toDataFrame
val result2 = df4.toJSON.collect()

assert(result2(1) == "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class InsertIntoHiveTableSuite extends QueryTest {
// Make sure the table has been updated.
checkAnswer(
sql("SELECT * FROM createAndInsertTest"),
testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq
testData.toDataFrame.collect().toSeq ++ testData.toDataFrame.collect().toSeq
)

// Now overwrite.
Expand Down

0 comments on commit e351cb2

Please sign in to comment.