diff --git a/defender2yara/defender/download.py b/defender2yara/defender/download.py index e7322ad3..b11450d5 100644 --- a/defender2yara/defender/download.py +++ b/defender2yara/defender/download.py @@ -1,13 +1,20 @@ from typing import Tuple +import platform import httpx import re +import time from tqdm import tqdm import os import glob import shutil -import libarchive + +from defender2yara.util.cabarchive import expand_mpam_fe from defender2yara.defender.vdm import Vdm +import logging + +logger = logging.getLogger(__package__) + DOWNLOAD_URL = "https://go.microsoft.com/fwlink/?LinkID=121721&arch=x86" URL_PATTERN = r'https://definitionupdates\.microsoft\.com/download/DefinitionUpdates/versionedsignatures/[aA][mM]/[0-9.]+/[0-9.]+/x86/mpam-fe\.exe' UPDATE_CATALOG = "https://www.microsoft.com/en-us/wdsi/definitions/antimalware-definition-release-notes?requestVersion={signature_version}" @@ -78,7 +85,7 @@ def create_cache_dir(signature_version, engine_version, cache_path='cache')->Non return -def move_files(signature_version,engine_version,cache_path) -> None: +def move_files(signature_version,engine_version,source_dir,cache_path) -> None: major_version = ".".join(signature_version.split('.')[0:2]) minor_version = ".".join(signature_version.split('.')[2:4]) @@ -87,15 +94,25 @@ def move_files(signature_version,engine_version,cache_path) -> None: engine_dir = os.path.join(cache_path,"engine",engine_version) # move base signature vmd - shutil.move("mpasbase.vdm",os.path.join(base_vdm_dir,"mpasbase.vdm")) - shutil.move("mpavbase.vdm",os.path.join(base_vdm_dir,"mpavbase.vdm")) + shutil.move(os.path.join(source_dir,"mpasbase.vdm"),os.path.join(base_vdm_dir,"mpasbase.vdm")) + shutil.move(os.path.join(source_dir,"mpavbase.vdm"),os.path.join(base_vdm_dir,"mpavbase.vdm")) # move delta signature vmd - shutil.move("mpasdlta.vdm",os.path.join(delta_vdm_dir,"mpasdlta.vdm")) - shutil.move("mpavdlta.vdm",os.path.join(delta_vdm_dir,"mpavdlta.vdm")) + shutil.move(os.path.join(source_dir,"mpasdlta.vdm"),os.path.join(delta_vdm_dir,"mpasdlta.vdm")) + shutil.move(os.path.join(source_dir,"mpavdlta.vdm"),os.path.join(delta_vdm_dir,"mpavdlta.vdm")) # move engine - shutil.move("mpengine.dll",os.path.join(engine_dir,"mpengine.dll")) + retry_count = 0 + while retry_count < 5: + try: + shutil.move(os.path.join(source_dir,"mpengine.dll"),os.path.join(engine_dir,"mpengine.dll")) + break + except PermissionError: + time.sleep(1) + retry_count += 1 + if retry_count == 5: + logger.warning(f"Failed to move {os.path.join(source_dir,'mpengine.dll')}: PermissionError. (maybe due to antivirus scanning?)") + #os.remove(os.path.join(source_dir,'mpengine.dll')) return @@ -113,12 +130,18 @@ def get_latest_signature_vdm(proxy)->Tuple[str,str,str]: def parse_full_engine_exe(full_engine_path:str,cache_path:str,rm_full_engine:bool) -> Tuple[str,str]: if not os.path.exists(full_engine_path): raise FileNotFoundError(f"mpam-fe.exe file not found: {full_engine_path}") - # extract cabinet file - libarchive.extract_file(full_engine_path) + if platform.system() == 'Windows': + # extract cabarchive with windows expand command + expand_mpam_fe(full_engine_path) + source_dir = os.path.dirname(full_engine_path) + else: + # extract cabarchive with libarchive + import libarchive + libarchive.extract_file(full_engine_path) + source_dir = os.path.curdir - # get signature version (libarchive extract files to current directory) - vdm_path = os.path.join(os.path.curdir,"mpavdlta.vdm") - engine_path = os.path.join(os.path.curdir,"mpengine.dll") + vdm_path = os.path.join(source_dir,"mpavdlta.vdm") + engine_path = os.path.join(source_dir,"mpengine.dll") _, signature_version = Vdm.get_meta_info(vdm_path) _, engine_version = Vdm.get_meta_info(engine_path) @@ -127,12 +150,12 @@ def parse_full_engine_exe(full_engine_path:str,cache_path:str,rm_full_engine:boo create_cache_dir(signature_version,engine_version,cache_path) # move files - move_files(signature_version,engine_version,cache_path) + move_files(signature_version,engine_version,source_dir,cache_path) # clean up - files_to_remove = glob.glob("M?SigStub.exe",root_dir=os.path.dirname(os.path.curdir)) + files_to_remove = glob.glob("M?SigStub.exe",root_dir=source_dir) for file_path in files_to_remove: - os.remove(file_path) + os.remove(os.path.join(source_dir,file_path)) if rm_full_engine: os.remove(full_engine_path) diff --git a/defender2yara/util/cabarchive.py b/defender2yara/util/cabarchive.py new file mode 100644 index 00000000..0885caf7 --- /dev/null +++ b/defender2yara/util/cabarchive.py @@ -0,0 +1,49 @@ +import os +import subprocess +from defender2yara.util.pe import parse_pe_resources + +CAB_SIGNATURE = b'MSCF' + + +def expand_cab(cab_file_path): + """ + Expands a CAB file using the Windows 'expand' command. + + Args: + cab_file_path (str): The path to the CAB file. + + Raises: + FileNotFoundError: If the CAB file does not exist. + subprocess.CalledProcessError: If the expand command fails. + """ + if not os.path.isfile(cab_file_path): + raise FileNotFoundError(f"The CAB file '{cab_file_path}' does not exist.") + + output_dir = os.path.dirname(cab_file_path) + + command = ['expand', cab_file_path, '-F:*', output_dir] + + try: + result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # print(result.stdout.decode("shift-jis")) + except subprocess.CalledProcessError as e: + print(f"Error expanding CAB file: {e.stderr.decode()}") + raise e + +def expand_mpam_fe(path): + """ + Expand mpam-fe.exe using the Windows 'expand' command. + + Args: + path (str): The path to the mpam-fe.exe file. + """ + resources = parse_pe_resources(path) + for resource in resources.values(): + for res in resource: + if res['Data'][:4] == CAB_SIGNATURE: + cab_data = res['Data'] + tmp_cab = os.path.join(os.path.dirname(path),"tmp.cab") + with open(os.path.join(os.path.dirname(path),"tmp.cab"),"wb") as f: + f.write(cab_data) + expand_cab(tmp_cab) + os.remove(tmp_cab) diff --git a/defender2yara/util/pe.py b/defender2yara/util/pe.py index 112859cb..6a10c89f 100644 --- a/defender2yara/util/pe.py +++ b/defender2yara/util/pe.py @@ -1,4 +1,5 @@ from typing import Tuple +import os import pefile import logging @@ -65,4 +66,49 @@ def parse_pe_meta_info(pe_file_path: str) -> Tuple[str,str]: return original_filename, product_version except Exception as e: logger.warning(f"Failed to parse metadata from PE file. Error: {e}") - raise e \ No newline at end of file + raise e + +def parse_pe_resources(pe_file_path): + """ + Parses the resources section of a PE file using the pefile library. + + Args: + pe_file_path (str): The path to the PE file. + + Returns: + dict: A dictionary containing the parsed resources. + + Raises: + FileNotFoundError: If the PE file does not exist. + pefile.PEFormatError: If the file is not a valid PE file. + """ + resources = {} + + if not os.path.isfile(pe_file_path): + raise FileNotFoundError(f"The PE file '{pe_file_path}' does not exist.") + + try: + pe = pefile.PE(pe_file_path) + except pefile.PEFormatError as e: + raise pefile.PEFormatError(f"Error parsing PE file: {e}") + + if hasattr(pe, 'DIRECTORY_ENTRY_RESOURCE'): + for resource_type in pe.DIRECTORY_ENTRY_RESOURCE.entries: + type_name = pefile.RESOURCE_TYPE.get(resource_type.struct.Id, str(resource_type.struct.Id)) + resources[type_name] = [] + + if hasattr(resource_type, 'directory'): + for resource_id in resource_type.directory.entries: + if hasattr(resource_id, 'directory'): + for resource_lang in resource_id.directory.entries: + data_rva = resource_lang.data.struct.OffsetToData + size = resource_lang.data.struct.Size + data = pe.get_memory_mapped_image()[data_rva:data_rva+size] + resources[type_name].append({ + 'ResourceId': resource_id.struct.Id, + 'Language': resource_lang.struct.Id, + 'Data': data + }) + else: + raise ValueError("No resources found in the PE file.") + return resources \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b8e3ce7a..a8cb8d2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "defender2yara" -version = "1.0.1" +version = "1.0.2" description = "Convert Microsoft Defender Antivirus Signatures(VDM) to YARA rules." authors = ["Tomoaki Tani"] readme = "README.md"