Skip to content

Commit cc745c5

Browse files
committed
Tue Apr 5 13:16:56 PDT 2016
1 parent bc36df1 commit cc745c5

File tree

3 files changed

+118
-41
lines changed

3 files changed

+118
-41
lines changed

external/java8-tests/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,18 @@
5858
<type>test-jar</type>
5959
<scope>test</scope>
6060
</dependency>
61+
<dependency>
62+
<groupId>org.apache.spark</groupId>
63+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
64+
<version>${project.version}</version>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.apache.spark</groupId>
68+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
69+
<version>${project.version}</version>
70+
<type>test-jar</type>
71+
<scope>test</scope>
72+
</dependency>
6173
<dependency>
6274
<groupId>org.apache.spark</groupId>
6375
<artifactId>spark-test-tags_${scala.binary.version}</artifactId>
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package test.org.apache.spark.sql.sources;
19+
20+
import java.util.Arrays;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
import scala.Tuple2;
25+
26+
import org.apache.spark.sql.Dataset;
27+
import org.apache.spark.sql.KeyValueGroupedDataset;
28+
import org.apache.spark.sql.expressions.java.typed;
29+
30+
/**
31+
* Suite that replicates tests in JavaDatasetAggregatorSuite using lambda syntax.
32+
*/
33+
public class Java8DatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase {
34+
@Test
35+
public void testTypedAggregationAverage() {
36+
KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
37+
Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(v -> (double)(v._2() * 2)));
38+
Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList());
39+
}
40+
41+
@Test
42+
public void testTypedAggregationCount() {
43+
KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
44+
Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count(v -> v));
45+
Assert.assertEquals(Arrays.asList(tuple2("a", 2), tuple2("b", 1)), agged.collectAsList());
46+
}
47+
48+
@Test
49+
public void testTypedAggregationSumDouble() {
50+
KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
51+
Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum(v -> (double)v._2()));
52+
Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList());
53+
}
54+
55+
@Test
56+
public void testTypedAggregationSumLong() {
57+
KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
58+
Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong(v -> (long)v._2()));
59+
Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList());
60+
}
61+
}

sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaDatasetAggregatorSuite.java

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -41,46 +41,7 @@
4141
/**
4242
* Suite for testing the aggregate functionality of Datasets in Java.
4343
*/
44-
public class JavaDatasetAggregatorSuite implements Serializable {
45-
private transient JavaSparkContext jsc;
46-
private transient TestSQLContext context;
47-
48-
@Before
49-
public void setUp() {
50-
// Trigger static initializer of TestData
51-
SparkContext sc = new SparkContext("local[*]", "testing");
52-
jsc = new JavaSparkContext(sc);
53-
context = new TestSQLContext(sc);
54-
context.loadTestData();
55-
}
56-
57-
@After
58-
public void tearDown() {
59-
context.sparkContext().stop();
60-
context = null;
61-
jsc = null;
62-
}
63-
64-
private <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) {
65-
return new Tuple2<>(t1, t2);
66-
}
67-
68-
private KeyValueGroupedDataset<String, Tuple2<String, Integer>> generateGroupedDataset() {
69-
Encoder<Tuple2<String, Integer>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT());
70-
List<Tuple2<String, Integer>> data =
71-
Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3));
72-
Dataset<Tuple2<String, Integer>> ds = context.createDataset(data, encoder);
73-
74-
return ds.groupByKey(
75-
new MapFunction<Tuple2<String, Integer>, String>() {
76-
@Override
77-
public String call(Tuple2<String, Integer> value) throws Exception {
78-
return value._1();
79-
}
80-
},
81-
Encoders.STRING());
82-
}
83-
44+
public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase {
8445
@Test
8546
public void testTypedAggregationAnonClass() {
8647
KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
@@ -100,7 +61,6 @@ public void testTypedAggregationAnonClass() {
10061
}
10162

10263
static class IntSumOf extends Aggregator<Tuple2<String, Integer>, Integer, Integer> {
103-
10464
@Override
10565
public Integer zero() {
10666
return 0;
@@ -170,3 +130,47 @@ public Long call(Tuple2<String, Integer> value) throws Exception {
170130
Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList());
171131
}
172132
}
133+
134+
/**
135+
* Common test base shared across this and Java8DatasetAggregatorSuite.
136+
*/
137+
class JavaDatasetAggregatorSuiteBase implements Serializable {
138+
protected transient JavaSparkContext jsc;
139+
protected transient TestSQLContext context;
140+
141+
@Before
142+
public void setUp() {
143+
// Trigger static initializer of TestData
144+
SparkContext sc = new SparkContext("local[*]", "testing");
145+
jsc = new JavaSparkContext(sc);
146+
context = new TestSQLContext(sc);
147+
context.loadTestData();
148+
}
149+
150+
@After
151+
public void tearDown() {
152+
context.sparkContext().stop();
153+
context = null;
154+
jsc = null;
155+
}
156+
157+
protected <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) {
158+
return new Tuple2<>(t1, t2);
159+
}
160+
161+
protected KeyValueGroupedDataset<String, Tuple2<String, Integer>> generateGroupedDataset() {
162+
Encoder<Tuple2<String, Integer>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT());
163+
List<Tuple2<String, Integer>> data =
164+
Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3));
165+
Dataset<Tuple2<String, Integer>> ds = context.createDataset(data, encoder);
166+
167+
return ds.groupByKey(
168+
new MapFunction<Tuple2<String, Integer>, String>() {
169+
@Override
170+
public String call(Tuple2<String, Integer> value) throws Exception {
171+
return value._1();
172+
}
173+
},
174+
Encoders.STRING());
175+
}
176+
}

0 commit comments

Comments
 (0)