@@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
3030 private def mergeValue (buffer : ArrayBuffer [Int ], i : Int ) = buffer += i
3131 private def mergeCombiners (buf1 : ArrayBuffer [Int ], buf2 : ArrayBuffer [Int ]) = buf1 ++= buf2
3232
33+ private def createSparkConf (loadDefaults : Boolean ): SparkConf = {
34+ val conf = new SparkConf (loadDefaults)
35+ // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
36+ // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
37+ conf.set(" spark.serializer.objectStreamReset" , " 0" )
38+ conf.set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
39+ // Ensure that we actually have multiple batches per spill file
40+ conf.set(" spark.shuffle.spill.batchSize" , " 10" )
41+ conf
42+ }
43+
3344 test(" simple insert" ) {
34- val conf = new SparkConf (false )
45+ val conf = createSparkConf (false )
3546 sc = new SparkContext (" local" , " test" , conf)
3647
3748 val map = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
@@ -57,7 +68,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
5768 }
5869
5970 test(" insert with collision" ) {
60- val conf = new SparkConf (false )
71+ val conf = createSparkConf (false )
6172 sc = new SparkContext (" local" , " test" , conf)
6273
6374 val map = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
@@ -80,7 +91,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
8091 }
8192
8293 test(" ordering" ) {
83- val conf = new SparkConf (false )
94+ val conf = createSparkConf (false )
8495 sc = new SparkContext (" local" , " test" , conf)
8596
8697 val map1 = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
@@ -125,7 +136,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
125136 }
126137
127138 test(" null keys and values" ) {
128- val conf = new SparkConf (false )
139+ val conf = createSparkConf (false )
129140 sc = new SparkContext (" local" , " test" , conf)
130141
131142 val map = new ExternalAppendOnlyMap [Int , Int , ArrayBuffer [Int ]](createCombiner,
@@ -166,7 +177,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
166177 }
167178
168179 test(" simple aggregator" ) {
169- val conf = new SparkConf (false )
180+ val conf = createSparkConf (false )
170181 sc = new SparkContext (" local" , " test" , conf)
171182
172183 // reduceByKey
@@ -181,7 +192,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
181192 }
182193
183194 test(" simple cogroup" ) {
184- val conf = new SparkConf (false )
195+ val conf = createSparkConf (false )
185196 sc = new SparkContext (" local" , " test" , conf)
186197 val rdd1 = sc.parallelize(1 to 4 ).map(i => (i, i))
187198 val rdd2 = sc.parallelize(1 to 4 ).map(i => (i% 2 , i))
@@ -199,7 +210,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
199210 }
200211
201212 test(" spilling" ) {
202- val conf = new SparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
213+ val conf = createSparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
203214 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
204215 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
205216
@@ -249,7 +260,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
249260 }
250261
251262 test(" spilling with hash collisions" ) {
252- val conf = new SparkConf (true )
263+ val conf = createSparkConf (true )
253264 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
254265 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
255266
@@ -304,7 +315,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
304315 }
305316
306317 test(" spilling with many hash collisions" ) {
307- val conf = new SparkConf (true )
318+ val conf = createSparkConf (true )
308319 conf.set(" spark.shuffle.memoryFraction" , " 0.0001" )
309320 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
310321
@@ -329,7 +340,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
329340 }
330341
331342 test(" spilling with hash collisions using the Int.MaxValue key" ) {
332- val conf = new SparkConf (true )
343+ val conf = createSparkConf (true )
333344 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
334345 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
335346
@@ -347,7 +358,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
347358 }
348359
349360 test(" spilling with null keys and values" ) {
350- val conf = new SparkConf (true )
361+ val conf = createSparkConf (true )
351362 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
352363 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
353364
0 commit comments