Skip to content

Commit d9670f8

Browse files
chenghao-intelrxin
authored andcommitted
[SPARK-13894][SQL] SqlContext.range return type from DataFrame to DataSet
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13894 Change the return type of the `SQLContext.range` API from `DataFrame` to `Dataset`. ## How was this patch tested? No additional unit test required. Author: Cheng Hao <hao.cheng@intel.com> Closes #11730 from chenghao-intel/range.
1 parent d9e8f26 commit d9670f8

File tree

12 files changed

+38
-37
lines changed

12 files changed

+38
-37
lines changed

mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class StringIndexerSuite
114114
val indexerModel = new StringIndexerModel("indexer", Array("a", "b", "c"))
115115
.setInputCol("label")
116116
.setOutputCol("labelIndex")
117-
val df = sqlContext.range(0L, 10L)
117+
val df = sqlContext.range(0L, 10L).toDF()
118118
assert(indexerModel.transform(df).eq(df))
119119
}
120120

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import org.apache.spark.util.Utils
6060
* @groupname specificdata Specific Data Sources
6161
* @groupname config Configuration
6262
* @groupname dataframes Custom DataFrame Creation
63+
* @groupname dataset Custom DataFrame Creation
6364
* @groupname Ungrouped Support functions for language integrated queries
6465
* @since 1.0.0
6566
*/
@@ -716,53 +717,53 @@ class SQLContext private[sql](
716717

717718
/**
718719
* :: Experimental ::
719-
* Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
720+
* Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
720721
* in an range from 0 to `end` (exclusive) with step value 1.
721722
*
722-
* @since 1.4.1
723-
* @group dataframe
723+
* @since 2.0.0
724+
* @group dataset
724725
*/
725726
@Experimental
726-
def range(end: Long): DataFrame = range(0, end)
727+
def range(end: Long): Dataset[Long] = range(0, end)
727728

728729
/**
729730
* :: Experimental ::
730-
* Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
731+
* Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
731732
* in an range from `start` to `end` (exclusive) with step value 1.
732733
*
733-
* @since 1.4.0
734-
* @group dataframe
734+
* @since 2.0.0
735+
* @group dataset
735736
*/
736737
@Experimental
737-
def range(start: Long, end: Long): DataFrame = {
738+
def range(start: Long, end: Long): Dataset[Long] = {
738739
range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism)
739740
}
740741

741742
/**
742743
* :: Experimental ::
743-
* Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
744+
* Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
744745
* in an range from `start` to `end` (exclusive) with an step value.
745746
*
746747
* @since 2.0.0
747-
* @group dataframe
748+
* @group dataset
748749
*/
749750
@Experimental
750-
def range(start: Long, end: Long, step: Long): DataFrame = {
751+
def range(start: Long, end: Long, step: Long): Dataset[Long] = {
751752
range(start, end, step, numPartitions = sparkContext.defaultParallelism)
752753
}
753754

754755
/**
755756
* :: Experimental ::
756-
* Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
757+
* Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
757758
* in an range from `start` to `end` (exclusive) with an step value, with partition number
758759
* specified.
759760
*
760-
* @since 1.4.0
761-
* @group dataframe
761+
* @since 2.0.0
762+
* @group dataset
762763
*/
763764
@Experimental
764-
def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
765-
Dataset.newDataFrame(this, Range(start, end, step, numPartitions))
765+
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] = {
766+
new Dataset(this, Range(start, end, step, numPartitions), implicits.newLongEncoder)
766767
}
767768

768769
/**

sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ public void testTextLoad() {
328328

329329
@Test
330330
public void testCountMinSketch() {
331-
Dataset<Row> df = context.range(1000);
331+
Dataset df = context.range(1000);
332332

333333
CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42);
334334
Assert.assertEquals(sketch1.totalCount(), 1000);
@@ -353,7 +353,7 @@ public void testCountMinSketch() {
353353

354354
@Test
355355
public void testBloomFilter() {
356-
Dataset<Row> df = context.range(1000);
356+
Dataset df = context.range(1000);
357357

358358
BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03);
359359
Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3);

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
344344

345345
// SPARK-12340: overstep the bounds of Int in SparkPlan.executeTake
346346
checkAnswer(
347-
sqlContext.range(2).limit(2147483638),
347+
sqlContext.range(2).toDF().limit(2147483638),
348348
Row(0) :: Row(1) :: Nil
349349
)
350350
}
@@ -1312,7 +1312,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
13121312

13131313
test("reuse exchange") {
13141314
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") {
1315-
val df = sqlContext.range(100)
1315+
val df = sqlContext.range(100).toDF()
13161316
val join = df.join(df, "id")
13171317
val plan = join.queryExecution.executedPlan
13181318
checkAnswer(join, df)

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1718,7 +1718,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
17181718
}
17191719

17201720
test("run sql directly on files") {
1721-
val df = sqlContext.range(100)
1721+
val df = sqlContext.range(100).toDF()
17221722
withTempPath(f => {
17231723
df.write.json(f.getCanonicalPath)
17241724
checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),

sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
6464
}
6565

6666
test("Sort should be included in WholeStageCodegen") {
67-
val df = sqlContext.range(3, 0, -1).sort(col("id"))
67+
val df = sqlContext.range(3, 0, -1).toDF().sort(col("id"))
6868
val plan = df.queryExecution.executedPlan
6969
assert(plan.find(p =>
7070
p.isInstanceOf[WholeStageCodegen] &&

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -599,11 +599,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
599599
test("null and non-null strings") {
600600
// Create a dataset where the first values are NULL and then some non-null values. The
601601
// number of non-nulls needs to be bigger than the ParquetReader batch size.
602-
val data = sqlContext.range(200).rdd.map { i =>
603-
if (i.getLong(0) < 150) Row(None)
604-
else Row("a")
605-
}
606-
val df = sqlContext.createDataFrame(data, StructType(StructField("col", StringType) :: Nil))
602+
val data: Dataset[String] = sqlContext.range(200).map (i =>
603+
if (i < 150) null
604+
else "a"
605+
)
606+
val df = data.toDF("col")
607607
assert(df.agg("col" -> "count").collect().head.getLong(0) == 50)
608608

609609
withTempPath { dir =>

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
128128
// Assume the execution plan is
129129
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
130130
// TODO: update metrics in generated operators
131-
val df = sqlContext.range(10).filter('id < 5)
132-
testSparkPlanMetrics(df, 1, Map.empty)
131+
val ds = sqlContext.range(10).filter('id < 5)
132+
testSparkPlanMetrics(ds.toDF(), 1, Map.empty)
133133
}
134134

135135
test("TungstenAggregate metrics") {
@@ -157,8 +157,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
157157
test("Sort metrics") {
158158
// Assume the execution plan is
159159
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
160-
val df = sqlContext.range(10).sort('id)
161-
testSparkPlanMetrics(df, 2, Map.empty)
160+
val ds = sqlContext.range(10).sort('id)
161+
testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
162162
}
163163

164164
test("SortMergeJoin metrics") {

sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
2222
import org.apache.spark.sql.test.SQLTestUtils
2323

2424
class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
25-
private lazy val df = sqlContext.range(10).coalesce(1)
25+
private lazy val df = sqlContext.range(10).coalesce(1).toDF()
2626

2727
private def checkTablePath(dbName: String, tableName: String): Unit = {
2828
val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1425,7 +1425,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
14251425
}
14261426

14271427
test("run sql directly on files") {
1428-
val df = sqlContext.range(100)
1428+
val df = sqlContext.range(100).toDF()
14291429
withTempPath(f => {
14301430
df.write.parquet(f.getCanonicalPath)
14311431
checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
@@ -1582,7 +1582,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
15821582
withView("v") {
15831583
sql("CREATE VIEW v AS SELECT * FROM add_col")
15841584
sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
1585-
checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10))
1585+
checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF())
15861586
}
15871587
}
15881588
}

0 commit comments

Comments
 (0)