@@ -1405,6 +1405,74 @@ class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
14051405class LzmaStreamWriteTest (LzmaTest , StreamWriteTest ):
14061406 decompressor = lzma .LZMADecompressor if lzma else None
14071407
1408+ class _CompressedWriteTest (TarTest ):
1409+ # This is not actually a standalone test.
1410+ # It does not inherit WriteTest because it only makes sense with gz,bz2
1411+ source = (b"And we move to Bristol where they have a special, " +
1412+ b"Very Silly candidate" )
1413+
1414+ def _compressed_tar (self , compresslevel ):
1415+ fobj = io .BytesIO ()
1416+ with tarfile .open (tmpname , self .mode , fobj ,
1417+ compresslevel = compresslevel ) as tarfl :
1418+ tarfl .addfile (tarfile .TarInfo ("foo" ), io .BytesIO (self .source ))
1419+ return fobj
1420+
1421+ def _test_bz2_header (self , compresslevel ):
1422+ fobj = self ._compressed_tar (compresslevel )
1423+ self .assertEqual (fobj .getvalue ()[0 :10 ],
1424+ b"BZh%d1AY&SY" % compresslevel )
1425+
1426+ def _test_gz_header (self , compresslevel ):
1427+ fobj = self ._compressed_tar (compresslevel )
1428+ self .assertEqual (fobj .getvalue ()[:3 ], b"\x1f \x8b \x08 " )
1429+
1430+ class Bz2CompressWriteTest (Bz2Test , _CompressedWriteTest , unittest .TestCase ):
1431+ prefix = "w:"
1432+ def test_compression_levels (self ):
1433+ self ._test_bz2_header (1 )
1434+ self ._test_bz2_header (5 )
1435+ self ._test_bz2_header (9 )
1436+
1437+ class Bz2CompressStreamWriteTest (Bz2Test , _CompressedWriteTest ,
1438+ unittest .TestCase ):
1439+ prefix = "w|"
1440+ def test_compression_levels (self ):
1441+ self ._test_bz2_header (1 )
1442+ self ._test_bz2_header (5 )
1443+ self ._test_bz2_header (9 )
1444+
1445+ class GzCompressWriteTest (GzipTest , _CompressedWriteTest , unittest .TestCase ):
1446+ prefix = "w:"
1447+ def test_compression_levels (self ):
1448+ self ._test_gz_header (1 )
1449+ self ._test_gz_header (5 )
1450+ self ._test_gz_header (9 )
1451+
1452+ class GzCompressStreamWriteTest (GzipTest , _CompressedWriteTest ,
1453+ unittest .TestCase ):
1454+ prefix = "w|"
1455+ def test_compression_levels (self ):
1456+ self ._test_gz_header (1 )
1457+ self ._test_gz_header (5 )
1458+ self ._test_gz_header (9 )
1459+
1460+ class CompressLevelRaises (unittest .TestCase ):
1461+ def test_compresslevel_wrong_modes (self ):
1462+ compresslevel = 5
1463+ fobj = io .BytesIO ()
1464+ with self .assertRaises (TypeError ):
1465+ tarfile .open (tmpname , "w:" , fobj , compresslevel = compresslevel )
1466+
1467+ def test_wrong_compresslevels (self ):
1468+ # BZ2 checks that the compresslevel is in [1,9]. gz does not
1469+ fobj = io .BytesIO ()
1470+ with self .assertRaises (ValueError ):
1471+ tarfile .open (tmpname , "w:bz2" , fobj , compresslevel = 0 )
1472+ with self .assertRaises (ValueError ):
1473+ tarfile .open (tmpname , "w:bz2" , fobj , compresslevel = 10 )
1474+ with self .assertRaises (ValueError ):
1475+ tarfile .open (tmpname , "w|bz2" , fobj , compresslevel = 10 )
14081476
14091477class GNUWriteTest (unittest .TestCase ):
14101478 # This testcase checks for correct creation of GNU Longname
0 commit comments