@@ -34,38 +34,41 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
34
34
conf.set(" spark.serializer" , " org.apache.spark.serializer.KryoSerializer" )
35
35
conf.set(" spark.kryo.registrator" , classOf [MyRegistrator ].getName)
36
36
37
- test(" configuration limits" ) {
38
- val conf1 = conf.clone()
37
+ test(" SPARK-7392 configuration limits" ) {
39
38
val kryoBufferProperty = " spark.kryoserializer.buffer"
40
39
val kryoBufferMaxProperty = " spark.kryoserializer.buffer.max"
41
- conf1.set(kryoBufferProperty, " 64k" )
42
- conf1.set(kryoBufferMaxProperty, " 64m" )
43
- new KryoSerializer (conf1).newInstance()
40
+
41
+ def newKryoInstance (
42
+ conf : SparkConf ,
43
+ bufferSize : String = " 64k" ,
44
+ maxBufferSize : String = " 64m" ): SerializerInstance = {
45
+ val kryoConf = conf.clone()
46
+ kryoConf.set(kryoBufferProperty, bufferSize)
47
+ kryoConf.set(kryoBufferMaxProperty, maxBufferSize)
48
+ new KryoSerializer (kryoConf).newInstance()
49
+ }
50
+
51
+ // test default values
52
+ newKryoInstance(conf, " 64k" , " 64m" )
44
53
// 2048m = 2097152k
45
- conf1.set(kryoBufferProperty, " 2097151k" )
46
- conf1.set(kryoBufferMaxProperty, " 64m" )
47
54
// should not throw exception when kryoBufferMaxProperty < kryoBufferProperty
48
- new KryoSerializer (conf1).newInstance()
49
- conf1.set(kryoBufferMaxProperty, " 2097151k" )
50
- new KryoSerializer (conf1).newInstance()
51
- val conf2 = conf.clone()
52
- conf2.set(kryoBufferProperty, " 2048m" )
53
- val thrown1 = intercept[IllegalArgumentException ](new KryoSerializer (conf2).newInstance())
55
+ newKryoInstance(conf, " 2097151k" , " 64m" )
56
+ // test maximum size with unit of KiB
57
+ newKryoInstance(conf, " 2097151k" , " 2097151k" )
58
+ // should throw exception with bufferSize out of bound
59
+ val thrown1 = intercept[IllegalArgumentException ](newKryoInstance(conf, " 2048m" ))
54
60
assert(thrown1.getMessage.contains(kryoBufferProperty))
55
- val conf3 = conf.clone()
56
- conf3.set(kryoBufferMaxProperty, " 2048m " )
57
- val thrown2 = intercept[ IllegalArgumentException ]( new KryoSerializer (conf3).newInstance( ))
61
+ // should throw exception with maxBufferSize out of bound
62
+ val thrown2 = intercept[ IllegalArgumentException ](
63
+ newKryoInstance(conf, maxBufferSize = " 2048m " ))
58
64
assert(thrown2.getMessage.contains(kryoBufferMaxProperty))
59
- val conf4 = conf.clone()
60
- conf4.set(kryoBufferProperty, " 2g" )
61
- conf4.set(kryoBufferMaxProperty, " 3g" )
62
- val thrown3 = intercept[IllegalArgumentException ](new KryoSerializer (conf4).newInstance())
65
+ // should throw exception when both bufferSize and maxBufferSize out of bound
66
+ // exception should only contain "spark.kryoserializer.buffer"
67
+ val thrown3 = intercept[IllegalArgumentException ](newKryoInstance(conf, " 2g" , " 3g" ))
63
68
assert(thrown3.getMessage.contains(kryoBufferProperty))
64
69
assert(! thrown3.getMessage.contains(kryoBufferMaxProperty))
65
- val conf5 = conf.clone()
66
- conf5.set(kryoBufferProperty, " 8m" )
67
- conf5.set(kryoBufferMaxProperty, " 9m" )
68
- new KryoSerializer (conf5).newInstance()
70
+ // test configuration with mb is supported properly
71
+ newKryoInstance(conf, " 8m" , " 9m" )
69
72
}
70
73
71
74
test(" basic types" ) {
0 commit comments