Skip to content

Commit 1c70b76

Browse files
committed
[SPARK-14145][SQL] Remove the untyped version of Dataset.groupByKey
## What changes were proposed in this pull request? Dataset has two variants of groupByKey, one for untyped and the other for typed. It actually doesn't make as much sense to have an untyped API here, since apps that want to use untyped APIs should just use the groupBy "DataFrame" API. ## How was this patch tested? This patch removes a method, and removes the associated tests. Author: Reynold Xin <rxin@databricks.com> Closes #11949 from rxin/SPARK-14145.
1 parent 3619fec commit 1c70b76

File tree

4 files changed

+1
-99
lines changed

4 files changed

+1
-99
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,32 +1178,6 @@ class Dataset[T] private[sql](
11781178
withGroupingKey.newColumns)
11791179
}
11801180

1181-
/**
1182-
* :: Experimental ::
1183-
* Returns a [[KeyValueGroupedDataset]] where the data is grouped by the given [[Column]]
1184-
* expressions.
1185-
*
1186-
* @group typedrel
1187-
* @since 2.0.0
1188-
*/
1189-
@Experimental
1190-
@scala.annotation.varargs
1191-
def groupByKey(cols: Column*): KeyValueGroupedDataset[Row, T] = {
1192-
val withKeyColumns = logicalPlan.output ++ cols.map(_.expr).map(UnresolvedAlias(_))
1193-
val withKey = Project(withKeyColumns, logicalPlan)
1194-
val executed = sqlContext.executePlan(withKey)
1195-
1196-
val dataAttributes = executed.analyzed.output.dropRight(cols.size)
1197-
val keyAttributes = executed.analyzed.output.takeRight(cols.size)
1198-
1199-
new KeyValueGroupedDataset(
1200-
RowEncoder(keyAttributes.toStructType),
1201-
encoderFor[T],
1202-
executed,
1203-
dataAttributes,
1204-
keyAttributes)
1205-
}
1206-
12071181
/**
12081182
* :: Experimental ::
12091183
* (Java-specific)

sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -245,29 +245,6 @@ public Iterator<String> call(Integer key, Iterator<String> left, Iterator<Intege
245245
Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"), toSet(cogrouped.collectAsList()));
246246
}
247247

248-
@Test
249-
public void testGroupByColumn() {
250-
List<String> data = Arrays.asList("a", "foo", "bar");
251-
Dataset<String> ds = context.createDataset(data, Encoders.STRING());
252-
KeyValueGroupedDataset<Integer, String> grouped =
253-
ds.groupByKey(length(col("value"))).keyAs(Encoders.INT());
254-
255-
Dataset<String> mapped = grouped.mapGroups(
256-
new MapGroupsFunction<Integer, String, String>() {
257-
@Override
258-
public String call(Integer key, Iterator<String> data) throws Exception {
259-
StringBuilder sb = new StringBuilder(key.toString());
260-
while (data.hasNext()) {
261-
sb.append(data.next());
262-
}
263-
return sb.toString();
264-
}
265-
},
266-
Encoders.STRING());
267-
268-
Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList()));
269-
}
270-
271248
@Test
272249
public void testSelect() {
273250
List<Integer> data = Arrays.asList(2, 6);

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
6363

6464
test("persist and then groupBy columns asKey, map") {
6565
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
66-
val grouped = ds.groupByKey($"_1").keyAs[String]
66+
val grouped = ds.groupByKey(_._1)
6767
val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) }
6868
agged.persist()
6969

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -322,55 +322,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
322322
)
323323
}
324324

325-
test("groupBy columns, map") {
326-
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
327-
val grouped = ds.groupByKey($"_1")
328-
val agged = grouped.mapGroups { case (g, iter) => (g.getString(0), iter.map(_._2).sum) }
329-
330-
checkDataset(
331-
agged,
332-
("a", 30), ("b", 3), ("c", 1))
333-
}
334-
335-
test("groupBy columns, count") {
336-
val ds = Seq("a" -> 1, "b" -> 1, "a" -> 2).toDS()
337-
val count = ds.groupByKey($"_1").count()
338-
339-
checkDataset(
340-
count,
341-
(Row("a"), 2L), (Row("b"), 1L))
342-
}
343-
344-
test("groupBy columns asKey, map") {
345-
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
346-
val grouped = ds.groupByKey($"_1").keyAs[String]
347-
val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) }
348-
349-
checkDataset(
350-
agged,
351-
("a", 30), ("b", 3), ("c", 1))
352-
}
353-
354-
test("groupBy columns asKey tuple, map") {
355-
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
356-
val grouped = ds.groupByKey($"_1", lit(1)).keyAs[(String, Int)]
357-
val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) }
358-
359-
checkDataset(
360-
agged,
361-
(("a", 1), 30), (("b", 1), 3), (("c", 1), 1))
362-
}
363-
364-
test("groupBy columns asKey class, map") {
365-
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
366-
val grouped = ds.groupByKey($"_1".as("a"), lit(1).as("b")).keyAs[ClassData]
367-
val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) }
368-
369-
checkDataset(
370-
agged,
371-
(ClassData("a", 1), 30), (ClassData("b", 1), 3), (ClassData("c", 1), 1))
372-
}
373-
374325
test("typed aggregation: expr") {
375326
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
376327

0 commit comments

Comments
 (0)