Skip to content

Commit

Permalink
Add unit test for fuel/hci/zip_utils.py (NVIDIA#659)
Browse files Browse the repository at this point in the history
* Add unit test for fuel/hci/zip_utils.py

* Norm windows path first
  • Loading branch information
YuanTingHsieh authored Jun 15, 2022
1 parent 54c3fe6 commit 876f401
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 51 deletions.
97 changes: 51 additions & 46 deletions nvflare/fuel/hci/zip_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
import io
import json
import os
from pathlib import Path
from typing import Optional
from zipfile import ZipFile

from nvflare.apis.job_def import ALL_SITES, JobMetaKey

META_NAME = "meta.json"
# A format string for the dummy meta.json


def _get_default_meta(job_folder_name: str) -> str:
# A format string for the dummy meta.json
meta = f"""{{
"{JobMetaKey.JOB_NAME.value}": "{job_folder_name}",
"{JobMetaKey.JOB_FOLDER_NAME.value}": "{job_folder_name}",
Expand All @@ -36,29 +37,38 @@ def _get_default_meta(job_folder_name: str) -> str:
return meta


def _path_join(base: str, *parts: str) -> str:
path = os.path.normpath(os.path.join(base, *parts))
def normpath_for_zip(path):
"""Normalizes the path for zip file.
Args:
path (str): the path to be normalized
"""
path = os.path.normpath(path)
path = os.path.splitdrive(path)[1]
# ZIP spec requires forward slashes
return path.replace("\\", "/")


def remove_leading_dotdot(path: str) -> str:
while path.startswith("../"):
path = str(Path(path))
while path.startswith(f"..{os.path.sep}"):
path = path[3:]
return path


def split_path(path: str) -> (str, str):
"""Split a path into prefix and folder name
"""Splits a path into a pair of head and tail.
It removes trailing `os.path.sep` and call `os.path.split`
Args:
path: Path to split
Returns: A tuple of (prefix, folder_name)
Returns:
A tuple of `(head, tail)`
"""

if path.endswith("/"):
path = str(Path(path))
if path.endswith(os.path.sep):
full_path = path[:-1]
else:
full_path = path
Expand All @@ -67,36 +77,40 @@ def split_path(path: str) -> (str, str):


def get_all_file_paths(directory):
"""Get all file paths in the directory.
"""Gets all file paths in the directory.
Args:
directory: directory to get all paths for
Returns: all paths in the provided directory
Returns:
A list of paths of all the files in the provided directory
"""
file_paths = []

# crawling through directory and subdirectories
for root, directories, files in os.walk(directory):
for filename in files:
file_paths.append(_path_join(root, filename))
file_paths.append(normpath_for_zip(os.path.join(root, filename)))
for dir_name in directories:
file_paths.append(_path_join(root, dir_name))
file_paths.append(normpath_for_zip(os.path.join(root, dir_name)))

return file_paths


def _zip_directory(root_dir: str, folder_name: str, writer: io.BytesIO):
"""Create a zip archive file for the specified directory.
def _zip_directory(root_dir: str, folder_name: str, output_file):
"""Creates a zip archive file for the specified directory.
Args:
root_dir: root path that contains the folder to be zipped
folder_name: path to the folder to be zipped, relative to root_dir
writer: file to write to
output_file: file to write to
"""
dir_name = _path_join(root_dir, folder_name)
assert os.path.exists(dir_name), 'directory "{}" does not exist'.format(dir_name)
assert os.path.isdir(dir_name), '"{}" is not a valid directory'.format(dir_name)
dir_name = normpath_for_zip(os.path.join(root_dir, folder_name))
if not os.path.exists(dir_name):
raise FileNotFoundError(f'output directory "{dir_name}" does not exist')

if not os.path.isdir(dir_name):
raise NotADirectoryError(f'"{dir_name}" is not a valid directory')

file_paths = get_all_file_paths(dir_name)
if folder_name:
Expand All @@ -105,51 +119,42 @@ def _zip_directory(root_dir: str, folder_name: str, writer: io.BytesIO):
prefix_len = len(dir_name) + 1

# writing files to a zipfile
with ZipFile(writer, "w") as z:
with ZipFile(output_file, "w") as z:
# writing each file one by one
for full_path in file_paths:
rel_path = full_path[prefix_len:]
z.write(full_path, arcname=rel_path)


def zip_directory_to_bytes(root_dir: str, folder_name: str) -> bytes:
"""Compresses a directory and return the bytes value of it.
Args:
root_dir: root path that contains the folder to be zipped
folder_name: path to the folder to be zipped, relative to root_dir
"""
bio = io.BytesIO()
_zip_directory(root_dir, folder_name, bio)
return bio.getvalue()


def _unzip_all(reader, output_dir_name: str):
"""Decompress a zip archive file and extract all files to the specified output directory.
def unzip_all_from_bytes(zip_data: bytes, output_dir_name: str):
"""Decompresses a zip and extracts all files to the specified output directory.
Args:
reader: the input zip reader
zip_data: the input zip data
output_dir_name: the output directory for extracted content
"""
assert os.path.exists(output_dir_name), 'output directory "{}" does not exist'.format(output_dir_name)
if not os.path.exists(output_dir_name):
raise FileNotFoundError(f'output directory "{output_dir_name}" does not exist')

assert os.path.isdir(output_dir_name), '"{}" is not a valid directory'.format(output_dir_name)
if not os.path.isdir(output_dir_name):
raise NotADirectoryError(f'"{output_dir_name}" is not a valid directory')

with ZipFile(reader, "r") as z:
with ZipFile(io.BytesIO(zip_data), "r") as z:
z.extractall(output_dir_name)


def unzip_all_from_file(zip_file_name: str, output_dir_name: str):
"""Decompress a zip archive file and extract all files to the specified output directory.
Args:
zip_file_name: the input zip archive file
output_dir_name: the output directory for extracted content
"""
assert os.path.exists(zip_file_name), 'input zip file "{}" does not exist'.format(zip_file_name)
assert os.path.isfile(zip_file_name), '"{}" is not a valid file'.format(zip_file_name)

_unzip_all(zip_file_name, output_dir_name)


def unzip_all_from_bytes(data, output_dir_name: str):
_unzip_all(io.BytesIO(data), output_dir_name)


def convert_legacy_zip(zip_data: bytes) -> bytes:
"""Convert a legacy app in zip into job layout in memory.
Expand All @@ -165,7 +170,7 @@ def convert_legacy_zip(zip_data: bytes) -> bytes:
with ZipFile(reader, "r") as in_zip:
info_list = in_zip.infolist()
folder_name = info_list[0].filename.split("/")[0]
meta_path = _path_join(folder_name, META_NAME)
meta_path = normpath_for_zip(os.path.join(folder_name, META_NAME))
if next((info for info in info_list if info.filename == meta_path), None):
# Already in job layout
meta_data = in_zip.read(meta_path)
Expand All @@ -180,9 +185,9 @@ def convert_legacy_zip(zip_data: bytes) -> bytes:
if meta:
out_zip.writestr(meta_path, json.dumps(meta))
out_zip.comment = in_zip.comment # preserve the comment
for item in in_zip.infolist():
if item.filename != meta_path:
out_zip.writestr(item, in_zip.read(item.filename))
for info in info_list:
if info.filename != meta_path:
out_zip.writestr(info, in_zip.read(info.filename))
else:
out_zip.writestr(meta_path, _get_default_meta(folder_name))
# Push everything else to a sub folder with the same name:
Expand Down
Loading

0 comments on commit 876f401

Please sign in to comment.