|
| 1 | +#!/usr/bin/env python3 |
| 2 | +import logging |
| 3 | +import sys |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +import atheris |
| 7 | +import structlog |
| 8 | + |
| 9 | + |
| 10 | +def set_unblob_log_level(level=logging.CRITICAL): |
| 11 | + logger = logging.getLogger("unblob") |
| 12 | + |
| 13 | + def logger_factory(): |
| 14 | + return logger |
| 15 | + |
| 16 | + structlog.configure(logger_factory=logger_factory) |
| 17 | + logger.setLevel(level) |
| 18 | + |
| 19 | + |
| 20 | +def extract(inpath: Path, outpath: Path): # noqa: ARG001 |
| 21 | + return |
| 22 | + |
| 23 | + |
| 24 | +with atheris.instrument_imports(include=["unblob"], exclude=["unblob_native"]): |
| 25 | + from unblob.extractors.command import Command |
| 26 | + from unblob.file_utils import File |
| 27 | + from unblob.finder import search_chunks |
| 28 | + from unblob.models import Task, TaskResult |
| 29 | + from unblob.processing import ExtractionConfig |
| 30 | + |
| 31 | + # NOTE: monkey patch Command extractor so we don't loose time executing subprocesses |
| 32 | + Command.extract = classmethod(extract) # type: ignore |
| 33 | + |
| 34 | + |
| 35 | +@atheris.instrument_func |
| 36 | +def test_search_chunks(data): |
| 37 | + config = ExtractionConfig( |
| 38 | + extract_root=Path("/dev/shm"), # noqa: S108 |
| 39 | + force_extract=True, |
| 40 | + entropy_depth=0, |
| 41 | + entropy_plot=False, |
| 42 | + skip_magic=[], |
| 43 | + skip_extension=[], |
| 44 | + skip_extraction=False, |
| 45 | + process_num=1, |
| 46 | + keep_extracted_chunks=True, |
| 47 | + verbose=0, |
| 48 | + ) |
| 49 | + |
| 50 | + if not len(data): |
| 51 | + return |
| 52 | + |
| 53 | + with File.from_bytes(data) as file: |
| 54 | + task = Task( |
| 55 | + path=Path("/dev/shm/nonexistent"), depth=0, blob_id="" # noqa: S108 |
| 56 | + ) |
| 57 | + result = TaskResult(task) |
| 58 | + search_chunks(file, len(data), config.handlers, result) |
| 59 | + |
| 60 | + |
| 61 | +if __name__ == "__main__": |
| 62 | + set_unblob_log_level() |
| 63 | + atheris.Setup(sys.argv, test_search_chunks) |
| 64 | + atheris.Fuzz() |
0 commit comments