@@ -43,8 +43,6 @@ trait CompressionCodec {
43
43
def compressedOutputStream (s : OutputStream ): OutputStream
44
44
45
45
def compressedInputStream (s : InputStream ): InputStream
46
-
47
- def isAvailable () : Boolean = true
48
46
}
49
47
50
48
private [spark] object CompressionCodec extends Logging {
@@ -67,9 +65,9 @@ private[spark] object CompressionCodec extends Logging {
67
65
Some (ctor.newInstance(conf).asInstanceOf [CompressionCodec ])
68
66
} catch {
69
67
case e : ClassNotFoundException => None
68
+ case e : IllegalArgumentException => None
70
69
}
71
- codec.filter(_.isAvailable())
72
- .getOrElse(throw new IllegalArgumentException (s " Codec [ $codecName] is not available. " +
70
+ codec.getOrElse(throw new IllegalArgumentException (s " Codec [ $codecName] is not available. " +
73
71
s " Consider setting $configKey= $FALLBACK_COMPRESSION_CODEC" ))
74
72
}
75
73
@@ -131,19 +129,16 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
131
129
@ DeveloperApi
132
130
class SnappyCompressionCodec (conf : SparkConf ) extends CompressionCodec {
133
131
132
+ try {
133
+ Snappy .getNativeLibraryVersion
134
+ } catch {
135
+ case e : Error => throw new IllegalArgumentException
136
+ }
137
+
134
138
override def compressedOutputStream (s : OutputStream ): OutputStream = {
135
139
val blockSize = conf.getInt(" spark.io.compression.snappy.block.size" , 32768 )
136
140
new SnappyOutputStream (s, blockSize)
137
141
}
138
142
139
143
override def compressedInputStream (s : InputStream ): InputStream = new SnappyInputStream (s)
140
-
141
- override def isAvailable () = {
142
- try {
143
- Snappy .getNativeLibraryVersion
144
- true
145
- } catch {
146
- case e : Error => false
147
- }
148
- }
149
144
}
0 commit comments