-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Supports decompression of every LZSS format known so far. Has a decent command-line interface.
- Loading branch information
Showing
2 changed files
with
306 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,278 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import sys | ||
from sys import stdin, stdout, stderr, exit | ||
from os import SEEK_SET, SEEK_CUR, SEEK_END | ||
from errno import EPIPE | ||
from struct import pack, unpack | ||
|
||
__all__ = ('decompress', 'decompress_file', 'decompress_bytes') | ||
|
||
class DecompressionError(ValueError): | ||
pass | ||
|
||
def bits(byte): | ||
return ((byte >> 7) & 1, | ||
(byte >> 6) & 1, | ||
(byte >> 5) & 1, | ||
(byte >> 4) & 1, | ||
(byte >> 3) & 1, | ||
(byte >> 2) & 1, | ||
(byte >> 1) & 1, | ||
(byte) & 1) | ||
|
||
def decompress_raw_lzss10(indata, decompressed_size, _overlay=False): | ||
"""Decompress LZSS-compressed bytes. Returns a bytearray.""" | ||
data = bytearray() | ||
|
||
it = iter(indata) | ||
|
||
if _overlay: | ||
disp_extra = 3 | ||
else: | ||
disp_extra = 1 | ||
|
||
def writebyte(b): | ||
data.append(b) | ||
def readbyte(): | ||
return next(it) | ||
def readshort(): | ||
# big-endian | ||
a = next(it) | ||
b = next(it) | ||
return (a << 8) | b | ||
def copybyte(): | ||
data.append(next(it)) | ||
|
||
while len(data) < decompressed_size: | ||
b = readbyte() | ||
if b == 0: | ||
# dumb optimization | ||
for _ in range(8): | ||
copybyte() | ||
continue | ||
flags = bits(b) | ||
for flag in flags: | ||
if flag == 0: | ||
try: | ||
copybyte() | ||
except StopIteration: | ||
return data | ||
elif flag == 1: | ||
sh = readshort() | ||
count = (sh >> 0xc) + 3 | ||
disp = (sh & 0xfff) + disp_extra | ||
|
||
for _ in range(count): | ||
writebyte(data[-disp]) | ||
else: | ||
raise ValueError(flag) | ||
|
||
if decompressed_size <= len(data): | ||
break | ||
|
||
if len(data) != decompressed_size: | ||
raise DecompressionError("decompressed size does not match the expected size") | ||
|
||
return data | ||
|
||
def decompress_raw_lzss11(indata, decompressed_size): | ||
"""Decompress LZSS-compressed bytes. Returns a bytearray.""" | ||
data = bytearray() | ||
|
||
it = iter(indata) | ||
|
||
def writebyte(b): | ||
data.append(b) | ||
def readbyte(): | ||
return next(it) | ||
def copybyte(): | ||
data.append(next(it)) | ||
|
||
while len(data) < decompressed_size: | ||
b = readbyte() | ||
if b == 0: | ||
# dumb optimization | ||
for _ in range(8): | ||
copybyte() | ||
continue | ||
flags = bits(b) | ||
for flag in flags: | ||
if flag == 0: | ||
copybyte() | ||
elif flag == 1: | ||
b = readbyte() | ||
indicator = b >> 4 | ||
|
||
if indicator == 0: | ||
# 8 bit count, 12 bit disp | ||
# indicator is 0, don't need to mask b | ||
count = (b << 4) | ||
b = readbyte() | ||
count += b >> 4 | ||
count += 0x11 | ||
elif indicator == 1: | ||
# 16 bit count, 12 bit disp | ||
count = ((b & 0xf) << 12) + (readbyte() << 4) | ||
b = readbyte() | ||
count += b >> 4 | ||
count += 0x111 | ||
else: | ||
# indicator is count (4 bits), 12 bit disp | ||
count = indicator | ||
count += 1 | ||
|
||
disp = ((b & 0xf) << 8) + readbyte() | ||
disp += 1 | ||
|
||
for _ in range(count): | ||
writebyte(data[-disp]) | ||
else: | ||
raise ValueError(flag) | ||
|
||
if decompressed_size <= len(data): | ||
break | ||
|
||
if len(data) != decompressed_size: | ||
raise DecompressionError("decompressed size does not match the expected size") | ||
|
||
return data | ||
|
||
|
||
def decompress_overlay(f, out): | ||
# the compression header is at the end of the file | ||
f.seek(-8, SEEK_END) | ||
header = f.read(8) | ||
|
||
# decompression goes backwards. | ||
# end < here < start | ||
|
||
# end_delta == here - decompression end address | ||
# start_delta == decompression start address - here | ||
end_delta, start_delta = unpack("<LL", header) | ||
|
||
filelen = f.tell() | ||
|
||
padding = end_delta >> 0x18 | ||
end_delta &= 0xFFFFFF | ||
decompressed_size = start_delta + end_delta | ||
|
||
f.seek(-end_delta, SEEK_END) | ||
|
||
data = bytearray() | ||
data.extend(f.read(end_delta - padding)) | ||
data.reverse() | ||
|
||
#stdout.write(data.tostring()) | ||
|
||
uncompressed_data = decompress_raw_lzss10(data, decompressed_size, | ||
_overlay=True) | ||
uncompressed_data.reverse() | ||
|
||
# first we write up to the portion of the file which was "overwritten" by | ||
# the decompressed data, then the decompressed data itself. | ||
# i wonder if it's possible for decompression to overtake the compressed | ||
# data, so that the decompression code is reading its own output... | ||
f.seek(0, SEEK_SET) | ||
out.write(f.read(filelen - end_delta)) | ||
out.write(uncompressed_data) | ||
|
||
def decompress(obj): | ||
"""Decompress LZSS-compressed bytes or a file-like object. | ||
Shells out to decompress_file() or decompress_bytes() depending on | ||
whether or not the passed-in object has a 'read' attribute or not. | ||
Returns a bytearray.""" | ||
if hasattr(obj, 'read'): | ||
return decompress_file(obj) | ||
else: | ||
return decompress_bytes(obj) | ||
|
||
def decompress_bytes(data): | ||
"""Decompress LZSS-compressed bytes. Returns a bytearray.""" | ||
header = data[:4] | ||
if header[0] == 0x10: | ||
decompress = decompress_raw_lzss10 | ||
elif header[1] == 0x11: | ||
decompress = decompress_raw_lzss11 | ||
else: | ||
raise DecompressionError("not as lzss-compressed file") | ||
|
||
decompressed_size, = unpack("<L", header[1:] + b'\x00') | ||
|
||
data = data[4:] | ||
return decompress_raw_lzss10(data, decompressed_size) | ||
|
||
def decompress_file(f): | ||
"""Decompress an LZSS-compressed file. Returns a bytearray. | ||
This isn't any more efficient than decompress_bytes, as it reads | ||
the entire file into memory. It is offered as a convenience. | ||
""" | ||
header = f.read(4) | ||
if header[0] == 0x10: | ||
decompress = decompress_raw_lzss10 | ||
elif header[1] == 0x11: | ||
decompress = decompress_raw_lzss11 | ||
else: | ||
raise DecompressionError("not as lzss-compressed file") | ||
|
||
decompressed_size, = unpack("<L", header[1:] + b'\x00') | ||
|
||
data = f.read() | ||
return decompress_raw_lzss10(data, decompressed_size) | ||
|
||
def main(args=None): | ||
if args is None: | ||
args = sys.argv[1:] | ||
|
||
if '--overlay' in args: | ||
args.remove('--overlay') | ||
overlay = True | ||
else: | ||
overlay = False | ||
|
||
if len(args) < 1 or args[0] == '-': | ||
if overlay: | ||
print("Can't decompress overlays from stdin", file=stderr) | ||
return 2 | ||
|
||
if hasattr(stdin, 'detach'): | ||
f = stdin.detach() | ||
else: | ||
f = stdin | ||
else: | ||
try: | ||
f = open(args[0], "rb") | ||
except IOError as e: | ||
print(e, file=stderr) | ||
return 2 | ||
|
||
stdout = sys.stdout | ||
if hasattr(stdout, 'detach'): | ||
# grab the underlying binary stream | ||
stdout = stdout.detach() | ||
|
||
try: | ||
if overlay: | ||
decompress_overlay(f, stdout) | ||
else: | ||
data = f.read() | ||
stdout.write(decompress(data)) | ||
except IOError as e: | ||
if e.errno == EPIPE: | ||
# don't complain about a broken pipe | ||
pass | ||
else: | ||
raise | ||
except (DecompressionError,) as e: | ||
print(e, file=stderr) | ||
return 1 | ||
|
||
return 0 | ||
|
||
|
||
|
||
if __name__ == '__main__': | ||
exit(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
#!/usr/bin/env python3 | ||
|
||
from lzss3 import (decompress_raw_lzss10, decompress_raw_lzss11, | ||
decompress_overlay) | ||
|
||
def test_lzss10(): | ||
assert decompress_raw_lzss10(b'\x00', 0) == b'' | ||
assert decompress_raw_lzss10(b'\x00abcdefgh', 8) == b'abcdefgh' | ||
assert decompress_raw_lzss10(b'\x08abcd\xd0\x03', 20) == b'abcd' * 5 | ||
|
||
def test_lzss11(): | ||
assert decompress_raw_lzss11(b'\x00', 0) == b'' | ||
assert decompress_raw_lzss11(b'\x00abcdefgh', 8) == b'abcdefgh' | ||
assert decompress_raw_lzss11(b'\x08abcd\xf0\x03', 20) == b'abcd' * 5 | ||
assert decompress_raw_lzss11(b'\x08abcd\x01\x30\x03', 40) == b'abcd' * 10 | ||
assert decompress_raw_lzss11(b'\x08abcd\x10\x07\xb0\x03', 400) == b'abcd' * 100 | ||
|
||
def test_overlay(): | ||
from io import BytesIO | ||
in_ = BytesIO(b'\x01\xd0abcd\x08\xff\x10\x00\x00\x09\x04\x00\x00\x00') | ||
out = BytesIO() | ||
decompress_overlay(in_, out) | ||
assert out.getvalue() == b'abcd' * 5 | ||
|
||
if __name__ == '__main__': | ||
test_lzss10() | ||
test_lzss11() | ||
test_overlay() |