Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions tests/handlers/filesystem/test_jffs2.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import binascii

import pytest
from dissect.cstruct import Instance
from helpers import unhex

from unblob.file_utils import Endian, File
Expand Down Expand Up @@ -59,7 +58,7 @@ def get_valid_jffs2_old_be_header():
)


def calculate_crc(header: Instance):
def calculate_crc(header):
return (binascii.crc32(header.dumps()[:-4], -1) ^ -1) & 0xFFFFFFFF


Expand Down Expand Up @@ -188,9 +187,7 @@ def calculate_crc(header: Instance):
),
],
)
def test_valid_header_new(
header: Instance, node_start_offset: int, eof: int, expected: bool
):
def test_valid_header_new(header, node_start_offset: int, eof: int, expected: bool):
header.hdr_crc = calculate_crc(header)
assert new_handler.valid_header(header, node_start_offset, eof) == expected

Expand Down Expand Up @@ -270,9 +267,7 @@ def test_valid_header_new(
),
],
)
def test_valid_header_old(
header: Instance, node_start_offset: int, eof: int, expected: bool
):
def test_valid_header_old(header, node_start_offset: int, eof: int, expected: bool):
header.hdr_crc = calculate_crc(header)
assert old_handler.valid_header(header, node_start_offset, eof) == expected

Expand Down
4 changes: 2 additions & 2 deletions unblob/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pathlib import Path
from typing import Iterable, Iterator, List, Literal, Optional, Tuple, Union

from dissect.cstruct import Instance, cstruct
from dissect.cstruct import cstruct
from structlog import get_logger

from .logging import format_hex
Expand Down Expand Up @@ -336,7 +336,7 @@ def parse(
struct_name: str,
file: Union[File, bytes],
endian: Endian,
) -> Instance:
):
cparser = self.cparser_le if endian is Endian.LITTLE else self.cparser_be
struct_parser = getattr(cparser, struct_name)
return struct_parser(file)
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/arc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractors.command import Command
Expand Down Expand Up @@ -54,7 +53,7 @@ def valid_name(self, name: bytes) -> bool:
except UnicodeDecodeError:
return False

def valid_header(self, header: Instance) -> bool:
def valid_header(self, header) -> bool:
if header.archive_marker != 0x1A:
return False
if header.header_type > 0x07:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/cpio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Optional, Type

import attr
from dissect.cstruct import Instance
from structlog import get_logger

from ...file_utils import (
Expand Down Expand Up @@ -252,7 +251,7 @@ def _pad_file(self, end_offset: int) -> int:
return end_offset

@classmethod
def _pad_header(cls, header: Instance, c_namesize: int) -> int:
def _pad_header(cls, header, c_namesize: int) -> int:
return round_up(len(header) + c_namesize, cls._PAD_ALIGN)

@classmethod
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/dlink/encrpted_img.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Optional

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import File, InvalidInputFormat
Expand Down Expand Up @@ -53,7 +52,7 @@ class EncrptedHandler(StructHandler):
HEADER_STRUCT = "dlink_header_t"
EXTRACTOR = EncrptedExtractor()

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.size < len(header):
return False
return True
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/dlink/shrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Optional

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import File, InvalidInputFormat
Expand Down Expand Up @@ -70,7 +69,7 @@ class SHRSHandler(StructHandler):
HEADER_STRUCT = "dlink_header_t"
EXTRACTOR = SHRSExtractor()

def is_valid_header(self, header: Instance, file: File) -> bool:
def is_valid_header(self, header, file: File) -> bool:
if header.file_size < len(header):
return False
# we're exactly past the header, we compute the digest
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/engeniustech/engenius.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import Endian, File, InvalidInputFormat, StructParser
Expand Down Expand Up @@ -81,7 +80,7 @@ class EngeniusHandler(StructHandler):
EXTRACTOR = EngeniusExtractor()
PATTERN_MATCH_OFFSET = -0x5C

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.length <= len(header):
return False
try:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/hp/bdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -35,7 +34,7 @@
"""


def is_valid_header(header: Instance) -> bool:
def is_valid_header(header) -> bool:
if header.toc_offset == 0 or header.toc_entries == 0:
return False
try:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/hp/ipkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import (
Expand Down Expand Up @@ -48,7 +47,7 @@
"""


def is_valid_header(header: Instance) -> bool:
def is_valid_header(header) -> bool:
if header.toc_offset == 0 or header.toc_entries == 0:
return False
try:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/instar/bneg.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -70,7 +69,7 @@ class BNEGHandler(StructHandler):
HEADER_STRUCT = "bneg_header_t"
EXTRACTOR = BNEGExtractor()

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.partition_1_size == 0:
return False
if header.partition_2_size == 0:
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/netgear/chk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -59,7 +58,7 @@ class NetgearCHKHandler(StructHandler):
HEADER_STRUCT = "chk_header_t"
EXTRACTOR = CHKExtractor()

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.header_len != len(header):
return False
try:
Expand Down
5 changes: 2 additions & 3 deletions unblob/handlers/archive/netgear/trx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path
from typing import Iterable, Optional, cast

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -75,12 +74,12 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]
start_offset=start_offset, end_offset=start_offset + header.len
)

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
if header.len < len(header):
return False
return True

def _is_crc_valid(self, file: File, start_offset: int, header: Instance) -> bool:
def _is_crc_valid(self, file: File, start_offset: int, header) -> bool:
file.seek(start_offset + CRC_CONTENT_OFFSET)
content = bytearray(file.read(header.len - CRC_CONTENT_OFFSET))
computed_crc = (binascii.crc32(content) ^ -1) & 0xFFFFFFFF
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/archive/qnap/qnap_nas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Optional

import attr
from dissect.cstruct import Instance
from pyperscan import Flag, Pattern, Scan, StreamDatabase
from structlog import get_logger

Expand Down Expand Up @@ -44,7 +43,7 @@ class QTSSearchContext:
end_offset: int


def is_valid_header(header: Instance) -> bool:
def is_valid_header(header) -> bool:
try:
header.device_id.decode("utf-8")
header.file_version.decode("utf-8")
Expand Down
7 changes: 3 additions & 4 deletions unblob/handlers/archive/xiaomi/hdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path
from typing import Iterable, Optional, Tuple, cast

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import (
Expand Down Expand Up @@ -74,7 +73,7 @@ def calculate_crc(file: File, start_offset: int, size: int) -> int:
return (digest ^ -1) & 0xFFFFFFFF


def is_valid_blob_header(blob_header: Instance) -> bool:
def is_valid_blob_header(blob_header) -> bool:
if blob_header.magic == BLOB_MAGIC:
return False
if not blob_header.size:
Expand All @@ -86,7 +85,7 @@ def is_valid_blob_header(blob_header: Instance) -> bool:
return True


def is_valid_header(header: Instance) -> bool:
def is_valid_header(header) -> bool:
if header.signature_offset < len(header):
return False
if not header.blob_offsets[0]:
Expand Down Expand Up @@ -147,7 +146,7 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]
return ValidChunk(start_offset=start_offset, end_offset=end_offset)

def _is_crc_valid(
self, file: File, header: Instance, start_offset: int, end_offset: int
self, file: File, header, start_offset: int, end_offset: int
) -> bool:
computed_crc = calculate_crc(
file,
Expand Down
9 changes: 4 additions & 5 deletions unblob/handlers/archive/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import struct
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from ...extractors import Command
Expand Down Expand Up @@ -90,7 +89,7 @@ def has_encrypted_files(
self,
file: File,
start_offset: int,
end_of_central_directory: Instance,
end_of_central_directory,
) -> bool:
file.seek(start_offset + end_of_central_directory.offset_of_cd, io.SEEK_SET)
for _ in range(end_of_central_directory.total_entries):
Expand All @@ -104,7 +103,7 @@ def has_encrypted_files(
return False

@staticmethod
def is_zip64_eocd(end_of_central_directory: Instance):
def is_zip64_eocd(end_of_central_directory):
# see https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.1.TXT section J
return (
end_of_central_directory.disk_number == 0xFFFF
Expand All @@ -116,14 +115,14 @@ def is_zip64_eocd(end_of_central_directory: Instance):
)

@staticmethod
def is_zip64_cd_file(file_header: Instance):
def is_zip64_cd_file(file_header):
# see https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT section 4.3.9.2
return (
file_header.file_size == 0xFFFFFFFF
or file_header.compress_size == 0xFFFFFFFF
)

def _parse_zip64(self, file: File, start_offset: int, offset: int) -> Instance:
def _parse_zip64(self, file: File, start_offset: int, offset: int):
file.seek(start_offset, io.SEEK_SET)
for eocd_locator_offset in iterate_patterns(
file, struct.pack("<I", self.ZIP64_EOCD_LOCATOR_HEADER)
Expand Down
7 changes: 2 additions & 5 deletions unblob/handlers/executable/elf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import attr
import lief
from dissect.cstruct import Instance
from structlog import get_logger

from unblob.extractor import carve_chunk_to_file
Expand Down Expand Up @@ -138,7 +137,7 @@ class _ELFBase(StructHandler):
SECTION_HEADER_STRUCT = "elf_shdr_t"
PROGRAM_HEADER_STRUCT = "elf_phdr_t"

def is_valid_header(self, header: Instance) -> bool:
def is_valid_header(self, header) -> bool:
# check that header fields have valid values
try:
lief.ELF.E_TYPE(header.e_type)
Expand Down Expand Up @@ -197,9 +196,7 @@ def get_last_program_end(

return last_program_end

def get_end_offset(
self, file: File, start_offset: int, header: Instance, endian
) -> int:
def get_end_offset(self, file: File, start_offset: int, header, endian) -> int:
# Usually the section header is the last, but in some cases the program headers are
# put to the end of the file, and in some cases sections header and actual sections
# can be also intermixed, so we need also to check the end of the last section and
Expand Down
4 changes: 1 addition & 3 deletions unblob/handlers/filesystem/cramfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import struct
from typing import Optional

from dissect.cstruct import Instance

from unblob.extractors import Command

from ...file_utils import Endian, convert_int32, get_endian
Expand Down Expand Up @@ -59,7 +57,7 @@ def _is_crc_valid(
self,
file: File,
start_offset: int,
header: Instance,
header,
endian: Endian,
) -> bool:
# old cramfs format do not support crc
Expand Down
3 changes: 1 addition & 2 deletions unblob/handlers/filesystem/jffs2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import io
from typing import Optional

from dissect.cstruct import Instance
from structlog import get_logger

from unblob.file_utils import (
Expand Down Expand Up @@ -61,7 +60,7 @@ def guess_endian(self, file: File) -> Endian:
file.seek(-2, io.SEEK_CUR)
return endian

def valid_header(self, header: Instance, node_start_offset: int, eof: int) -> bool:
def valid_header(self, header, node_start_offset: int, eof: int) -> bool:
header_crc = (binascii.crc32(header.dumps()[:-4], -1) ^ -1) & 0xFFFFFFFF
check_crc = True

Expand Down
Loading