@@ -28,7 +28,7 @@ import com.google.common.io.Files
2828import org .apache .hadoop .conf .Configuration
2929import org .apache .hadoop .fs .Path
3030import org .apache .hadoop .io ._
31- import org .apache .hadoop .io .compress .DefaultCodec
31+ import org .apache .hadoop .io .compress .{ BZip2Codec , CompressionCodec , DefaultCodec }
3232import org .apache .hadoop .mapred .{FileAlreadyExistsException , FileSplit , JobConf , TextInputFormat , TextOutputFormat }
3333import org .apache .hadoop .mapreduce .Job
3434import org .apache .hadoop .mapreduce .lib .input .{FileSplit => NewFileSplit , TextInputFormat => NewTextInputFormat }
@@ -113,25 +113,33 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
113113 assert(output.map(_.toString).collect().toList === List (" (1,a)" , " (2,aa)" , " (3,aaa)" ))
114114 }
115115
116- test( " SequenceFile (compressed) " ) {
117- sc = new SparkContext ( " local " , " test " )
118- val normalDir = new File (tempDir , " output_normal " ).getAbsolutePath
119- val compressedOutputDir = new File (tempDir, " output_compressed " ).getAbsolutePath
120- val codec = new DefaultCodec ()
116+ def runSequenceFileCodecTest ( codec : CompressionCodec , codecName : String ) : Unit = {
117+ test( s " SequenceFile (compressed) - $codecName " ) {
118+ sc = new SparkContext ( " local " , " test " )
119+ val normalDir = new File (tempDir, " output_normal " ).getAbsolutePath
120+ val compressedOutputDir = new File (tempDir, " output_compressed " ).getAbsolutePath
121121
122- val data = sc.parallelize(Seq .fill(100 )(" abc" ), 1 ).map(x => (x, x))
123- data.saveAsSequenceFile(normalDir)
124- data.saveAsSequenceFile(compressedOutputDir, Some (classOf [ DefaultCodec ] ))
122+ val data = sc.parallelize(Seq .fill(100 )(" abc" ), 1 ).map(x => (x, x))
123+ data.saveAsSequenceFile(normalDir)
124+ data.saveAsSequenceFile(compressedOutputDir, Some (codec.getClass ))
125125
126- val normalFile = new File (normalDir, " part-00000" )
127- val normalContent = sc.sequenceFile[String , String ](normalDir).collect
128- assert(normalContent === Array .fill(100 )((" abc" , " abc" )))
126+ val normalFile = new File (normalDir, " part-00000" )
127+ val normalContent = sc.sequenceFile[String , String ](normalDir).collect
128+ assert(normalContent === Array .fill(100 )((" abc" , " abc" )))
129129
130- val compressedFile = new File (compressedOutputDir, " part-00000" + codec.getDefaultExtension)
131- val compressedContent = sc.sequenceFile[String , String ](compressedOutputDir).collect
132- assert(compressedContent === Array .fill(100 )((" abc" , " abc" )))
130+ val compressedFile = new File (compressedOutputDir, " part-00000" + codec.getDefaultExtension)
131+ val compressedContent = sc.sequenceFile[String , String ](compressedOutputDir).collect
132+ assert(compressedContent === Array .fill(100 )((" abc" , " abc" )))
133133
134- assert(compressedFile.length < normalFile.length)
134+ assert(compressedFile.length < normalFile.length)
135+ }
136+ }
137+
138+ // Hadoop "gzip" and "zstd" codecs require native library installed for sequence files
139+ // "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681.
140+ Seq ((new DefaultCodec (), " default" ), (new BZip2Codec (), " bzip2" )).foreach {
141+ case (codec, codecName) =>
142+ runSequenceFileCodecTest(codec, codecName)
135143 }
136144
137145 test(" SequenceFile with writable key" ) {
0 commit comments