Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ jobs:
format:
# check formatting with ruff
runs-on: ubuntu-latest
# temporarily disable this job
if: false
steps:
- uses: actions/checkout@v5

Expand Down
131 changes: 43 additions & 88 deletions dtrx/dtrx.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import shutil
import signal
import stat
import string
import struct
import sys
import tempfile
Expand Down Expand Up @@ -225,9 +224,7 @@ def __init__(self, filename, encoding):
# option based on what's supported, since this behavior changed
if encoding in ("lrzip", "lrz"):
# need to check if this version of lrzip supports the -Q option
output = subprocess.check_output(
"lrzip --help", stderr=subprocess.STDOUT, shell=True
)
output = subprocess.check_output("lrzip --help", stderr=subprocess.STDOUT, shell=True)
if b"-Q" in output:
decoder = ["lrzcat", "-Q"]
else:
Expand Down Expand Up @@ -266,9 +263,7 @@ def add_process(self, processes, command, stdin, stdout):
try:
logger.debug("running command: {}".format(command))
processes.append(
subprocess.Popen(
command, stdin=stdin, stdout=stdout, stderr=subprocess.PIPE
)
subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=subprocess.PIPE)
)
except OSError as error:
if error.errno == errno.ENOENT:
Expand Down Expand Up @@ -302,9 +297,7 @@ def send_stdout_to_dev_null(self):
return True

def run_pipes(self, final_stdout=None):
has_output_target = (
True if final_stdout or self.send_stdout_to_dev_null() else False
)
has_output_target = True if final_stdout or self.send_stdout_to_dev_null() else False
if not self.pipes:
return
elif final_stdout is None:
Expand Down Expand Up @@ -345,9 +338,9 @@ def check_included_archives(self):
self.file_count += len(filenames)
path = path[start_index:]
for filename in filenames:
if ExtractorBuilder.try_by_mimetype(
if ExtractorBuilder.try_by_mimetype(filename) or ExtractorBuilder.try_by_extension(
filename
) or ExtractorBuilder.try_by_extension(filename):
):
self.included_archives.append(os.path.join(path, filename))

def check_contents(self):
Expand Down Expand Up @@ -403,12 +396,8 @@ def first_bad_exit_code(self):

def check_success(self, got_files):
error_index, error_code = self.first_bad_exit_code()
logger.debug(
"success results: %s %s %s" % (got_files, error_index, self.exit_codes)
)
if self.is_fatal_error(error_code) or (
(not got_files) and (error_code is not None)
):
logger.debug("success results: %s %s %s" % (got_files, error_index, self.exit_codes))
if self.is_fatal_error(error_code) or ((not got_files) and (error_code is not None)):
command = " ".join(self.pipes[error_index][0])
self.pw_prompt = False # Don't silently fail with wrong password
raise ExtractorError(
Expand Down Expand Up @@ -486,9 +475,7 @@ def get_filenames(self):
# compression extensions, even if those files shouldn't actually be
# handled this way. So, we call out to the file command to do a quick
# check and make sure this actually looks like a compressed file.
if "compress" not in [
match[0] for match in ExtractorBuilder.try_by_magic(self.filename)
]:
if "compress" not in [match[0] for match in ExtractorBuilder.try_by_magic(self.filename)]:
raise ExtractorError("doesn't look like a compressed file")
yield self.basename()

Expand Down Expand Up @@ -573,9 +560,7 @@ def prepare(self):
encoding = mimetypes.guess_type(data_filename)[1]
if not encoding:
raise ExtractorError("data.tar file has unrecognized encoding")
self.pipe(
["ar", "p", self.filename, data_filename], "extracting data.tar from .deb"
)
self.pipe(["ar", "p", self.filename, data_filename], "extracting data.tar from .deb")
self.pipe(self.decoders[encoding], "decoding data.tar")

def basename(self):
Expand All @@ -594,9 +579,7 @@ def check_contents(self):

class DebMetadataExtractor(DebExtractor):
def prepare(self):
self.pipe(
["ar", "p", self.filename, "control.tar.gz"], "control.tar.gz extraction"
)
self.pipe(["ar", "p", self.filename, "control.tar.gz"], "control.tar.gz extraction")
self.pipe(["zcat"], "control.tar.gz decompression")


Expand Down Expand Up @@ -646,9 +629,7 @@ def extract_archive(self):
extract_fmt_args = {
"OUTPUT_FILE": os.path.splitext(os.path.basename(self.filename))[0],
}
formatted_extract_commands = [
x.format(**extract_fmt_args) for x in self.extract_command
]
formatted_extract_commands = [x.format(**extract_fmt_args) for x in self.extract_command]

self.extract_pipe = formatted_extract_commands + [self.filename]
BaseExtractor.extract_archive(self)
Expand Down Expand Up @@ -722,7 +703,6 @@ class SevenExtractor(NoPipeExtractor):
file_type = "7z file"
list_command = ["7z", "l", "-ba"]
border_re = re.compile("^[- ]+$")
extract_command = ["7z", "x"]
space_re = re.compile(" ")

@property
Expand Down Expand Up @@ -927,19 +907,17 @@ def __init__(self, extractor, options):

def handle(self):
command = "find"
status = subprocess.call(
[
"find",
self.extractor.target,
"-type",
"d",
"-exec",
"chmod",
"u+rwx",
"{}",
";",
]
)
status = subprocess.call([
"find",
self.extractor.target,
"-type",
"d",
"-exec",
"chmod",
"u+rwx",
"{}",
";",
])
if status == 0:
command = "chmod"
status = subprocess.call(["chmod", "-R", "u+rwX", self.extractor.target])
Expand All @@ -950,9 +928,7 @@ def handle(self):
def set_target(self, target, checker):
self.target = checker(target).check()
if self.target != target:
logger.warning(
"extracting %s to %s" % (self.extractor.filename, self.target)
)
logger.warning("extracting %s to %s" % (self.extractor.filename, self.target))


# The "where to extract" table, with options and archive types.
Expand Down Expand Up @@ -983,9 +959,7 @@ def organize(self):
if not os.path.isdir(newdir):
os.makedirs(newdir)
for filename in filenames:
os.rename(
os.path.join(curdir, filename), os.path.join(newdir, filename)
)
os.rename(os.path.join(curdir, filename), os.path.join(newdir, filename))
os.rmdir(curdir)


Expand All @@ -1011,9 +985,7 @@ def can_handle(contents, options):
)

def organize(self):
source = os.path.join(
self.extractor.target, os.listdir(self.extractor.target)[0]
)
source = os.path.join(self.extractor.target, os.listdir(self.extractor.target)[0])
if os.path.isdir(source):
checker = DirectoryChecker
else:
Expand Down Expand Up @@ -1059,9 +1031,7 @@ def organize(self):
@total_ordering
class BasePolicy(object):
try:
size = fcntl.ioctl(
sys.stdout.fileno(), termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0)
)
size = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0))
width = struct.unpack("HHHH", size)[1]
except IOError:
width = 80
Expand Down Expand Up @@ -1155,9 +1125,7 @@ def prep(self, archive_filename, extractor):
question.append(" Expected: " + extractor.basename())
question.append(" Actual: " + extractor.content_name)
choice_vars = (extractor.content_type, extractor.basename())
self.choices = [
text % choice_vars[: text.count("%s")] for text in self.choice_template
]
self.choices = [text % choice_vars[: text.count("%s")] for text in self.choice_template]
self.current_policy = self.permanent_policy or self.ask_question(question)

def ok_for_match(self):
Expand Down Expand Up @@ -1191,9 +1159,7 @@ def __init__(self, options):

def prep(self, current_filename, target, extractor):
archive_count = len(extractor.included_archives)
if (self.permanent_policy is not None) or (
(archive_count * 10) <= extractor.file_count
):
if (self.permanent_policy is not None) or ((archive_count * 10) <= extractor.file_count):
self.current_policy = self.permanent_policy or RECURSE_NOT_NOW
return
question = self.wrap(
Expand All @@ -1213,12 +1179,10 @@ def prep(self, current_filename, target, extractor):
break
print(
"\n%s\n"
% "\n".join(
[
os.path.join(target, included_root, filename)
for filename in extractor.included_archives
]
)
% "\n".join([
os.path.join(target, included_root, filename)
for filename in extractor.included_archives
])
)
if self.current_policy in (RECURSE_ALWAYS, RECURSE_NEVER):
self.permanent_policy = self.current_policy
Expand Down Expand Up @@ -1427,9 +1391,7 @@ def magic_map_matches(self, output, magic_map):

def try_by_magic(self, filename):
try:
process = subprocess.Popen(
["file", "-zL", filename], stdout=subprocess.PIPE
)
process = subprocess.Popen(["file", "-zL", filename], stdout=subprocess.PIPE)
status = process.wait()
if status != 0:
return []
Expand Down Expand Up @@ -1524,9 +1486,9 @@ def reverser(x, y):
print("%s/" % (filename,))
new_filenames = os.listdir(filename)
new_filenames = sorted(new_filenames, key=cmp_to_key(reverser))
filenames.extend(
[pathjoin(filename, new_filename) for new_filename in new_filenames]
)
filenames.extend([
pathjoin(filename, new_filename) for new_filename in new_filenames
])
else:
print(filename)

Expand Down Expand Up @@ -1610,13 +1572,11 @@ def get_supported_extensions():
# get the lists of built-in extensions and combine them
ext_map_base = set(ExtractorBuilder.extension_map.keys())
ext_map = set(
itertools.chain(
*[
x["extensions"]
for x in ExtractorBuilder.extractor_map.values()
if "extensions" in x
]
)
itertools.chain(*[
x["extensions"]
for x in ExtractorBuilder.extractor_map.values()
if "extensions" in x
])
)
ext_map = ext_map_base.union(ext_map)

Expand Down Expand Up @@ -1670,10 +1630,7 @@ def parse_options(self, arguments):
"--one-entry",
dest="one_entry_default",
default=None,
help=(
"specify extraction policy for one-entry "
+ "archives: inside/rename/here"
),
help=("specify extraction policy for one-entry " + "archives: inside/rename/here"),
)
parser.add_option(
"-n",
Expand Down Expand Up @@ -1795,9 +1752,7 @@ def try_extractors(self, filename, builder):
self.current_extractor = extractor # For the abort() method.
error = self.action.run(filename, extractor)
if error:
errors.append(
(extractor.file_type, extractor.encoding, error, extractor.stderr)
)
errors.append((extractor.file_type, extractor.encoding, error, extractor.stderr))
if extractor.target is not None:
self.clean_destination(extractor.target)
else:
Expand Down
24 changes: 7 additions & 17 deletions tests/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ def __init__(self, **kwargs):
self.input = self.input + "\n"

def start_proc(self, command, stdin=None, output=None):
process = subprocess.Popen(
command, stdin=subprocess.PIPE, stdout=output, stderr=output
)
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=output, stderr=output)
if stdin:
process.stdin.write(bytes(str(stdin).encode("utf-8")))
process.stdin.close()
Expand Down Expand Up @@ -194,9 +192,7 @@ def clean(self):
raise ExtractorTestError("cleanup exited with status code %s" % (status,))

def show_pass(self):
self.status_writer.show(
"Passed %i/%i: %s" % (self.test_num, NUM_TESTS, self.name)
)
self.status_writer.show("Passed %i/%i: %s" % (self.test_num, NUM_TESTS, self.name))
return "passed"

def show_report(self, status, message=None):
Expand Down Expand Up @@ -295,9 +291,7 @@ def __init__(self):
Loader=yaml.FullLoader,
)
self.name_regexps = [re.compile(s) for s in sys.argv[1:]]
self.tests = [
ExtractorTest(**data) for data in self.test_data if self.wanted_test(data)
]
self.tests = [ExtractorTest(**data) for data in self.test_data if self.wanted_test(data)]
self.add_subdir_tests()

def wanted_test(self, data):
Expand All @@ -307,18 +301,14 @@ def wanted_test(self, data):

def add_subdir_tests(self):
for odata in self.test_data:
if (
(not self.wanted_test(odata))
or "directory" in odata
or ("baseline" not in odata)
):
if (not self.wanted_test(odata)) or "directory" in odata or ("baseline" not in odata):
continue
data = odata.copy()
data["name"] += " in .."
data["directory"] = "inside-dir"
data["filenames"] = " ".join(
["../%s" % filename for filename in data.get("filenames", "").split()]
)
data["filenames"] = " ".join([
"../%s" % filename for filename in data.get("filenames", "").split()
])
self.tests.append(ExtractorTest(**data))

def run(self):
Expand Down