From acddd29c053da7a98cd482202ec06bf82770d84a Mon Sep 17 00:00:00 2001 From: evertonstz Date: Thu, 5 Dec 2019 07:25:44 -0300 Subject: [PATCH] add checkers to make auto deleting .pkg files after extration securelly possible --- pynps.py | 143 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 110 insertions(+), 33 deletions(-) diff --git a/pynps.py b/pynps.py index 9943f11..7cd2fba 100644 --- a/pynps.py +++ b/pynps.py @@ -247,8 +247,14 @@ def dl_file(dict, system, DLFOLDER, WGET): if os.path.isfile(f"{dl_folder}/{filename}"): printft(HTML("[DOWNLOAD] file exists, wget will decide if the file is completely downloaded, if it's not the download will be resumed")) - process = subprocess.run([WGET, "-q", "--show-progress", "-c", "-P", - dl_folder, url], cwd=dl_folder) + try: + process = subprocess.run([WGET, "-q", "--show-progress", "-c", "-P", + dl_folder, url], cwd=dl_folder) + except KeyboardInterrupt: + printft(HTML("\n[DOWNLOAD] File was partially downloaded, you can resume this download by searching for same pkg again")) + printft(HTML(f"[DOWNLOAD] File location: {dl_folder}/{filename}")) + printft(HTML("Download interrupted by user")) + sys.exit(0) return True @@ -448,8 +454,49 @@ def check_pkg2zip(location, CONFIGFOLDER): return False -def run_pkg2zip(file, output_location, PKG2ZIP, args, zrif=False): # OK! +def run_pkg2zip(file, output_location, PKG2ZIP, args, extraction_folder, zrif=False): # OK! """this fuction is used to extract a pkg with pkg2zip""" + def runner( list, cwd): + + p = subprocess.Popen(list, cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + + r = re.compile('pkg2zip v\\d.\\d') + full_out = '' + for line in iter(p.stdout.readline, b''): + out = line.rstrip().decode() + full_out += f"{out}\n" + if out.startswith("ERROR") == False and r.match(out) is None: + print(out) + + # test if file exist + + # test for corrupted and file not being a pkg + if "ERROR: not a pkg file" in full_out: + # corrupted file and feeding inexistent file + if os.path.isfile(file): + if file.endswith(".pkg"): + printft(HTML("[PKG2ZIP] The provided file is is a .pkg, but seems to be corrupted")) + else: + printft(HTML("[PKG2ZIP] The provided file is is not a .pkg")) + else: + printft(HTML("[PKG2ZIP] Provided file doesn't exist")) + return False + elif "ERROR: pkg file is too small" in full_out: + # download not ended + printft(HTML("[PKG2ZIP] The provided file is too small, it's probably corrupted or didn't fully downloaded")) + return False + elif "ERROR: failed to read 256 bytes from file" in full_out: + # feeded a folder to pkg2zip + if os.path.isdir(file): + printft(HTML("[PKG2ZIP] The provided file seems to be a folder")) + else: + printft(HTML("[PKG2ZIP] Unknown extraction error")) + return False + else: + printft(HTML(f"[PKG2ZIP] File extracted to: {extraction_folder}")) + return True # create extraction folder create_folder(output_location) @@ -461,12 +508,14 @@ def run_pkg2zip(file, output_location, PKG2ZIP, args, zrif=False): # OK! run_lst = [PKG2ZIP, file] for x in args: run_lst.insert(1, x) - process = subprocess.run(run_lst, cwd=output_location) + process = runner(run_lst, cwd=output_location) else: run_lst = [PKG2ZIP, file, zrif] for x in args: run_lst.insert(1, x) - process = subprocess.run(run_lst, cwd=output_location) + process = runner(run_lst, cwd=output_location) + + return process def fix_folder_syntax(folder): @@ -576,13 +625,13 @@ def main(): # tests existence of pkg2zip PKG2ZIP = check_pkg2zip(PKG2ZIP, CONFIGFOLDER) if PKG2ZIP == False: - printft(HTML("[ERROR] you don't have a valid pkg2zip installation or binary in your system.")) + printft(HTML("[ERROR] you don't have a valid pkg2zip installation or binary in your system")) sys.exit(1) # tests existence of wget WGET = check_wget(WGET, CONFIGFOLDER) if WGET == False: - printft(HTML("[ERROR] you don't have a valid wget installation or binary in your system.")) + printft(HTML("[ERROR] you don't have a valid wget installation or binary in your system")) sys.exit(1) # makin dicts for links @@ -622,6 +671,8 @@ def main(): action="store_true") parser.add_argument("-E", "-dde", "--demos", help="to download PSV demos.", action="store_true") + parser.add_argument("-k", "--keepkg", help="using this flag will keep the pkg after the extraction", + action="store_true") parser.add_argument("-eb", "--eboot", help="use this argument to unpack PSP games as EBOOT.PBP", action="store_true") parser.add_argument("-cso", "--compress_cso", help="use this argument to unpack PSP games as a compressed .cso file. You can use any number beetween 1 and 9 for compression factors, were 1 is less compressed and 9 is more compressed.", @@ -633,13 +684,15 @@ def main(): args = parser.parse_args() + keepkg = args.keepkg + if args.console is not None: system = list({x.upper() for x in args.console}) else: system = ["PSV", "PSP", "PSX", "PSM"] if args.eboot is True and args.compress_cso is not None: - printft(HTML("[ERROR] you can't use --eboot and --compress_cso at the same time.")) + printft(HTML("[ERROR] you can't use --eboot and --compress_cso at the same time")) if args.compress_cso is not None: cso_factor = args.compress_cso @@ -684,7 +737,7 @@ def main(): sys.exit(0) elif args.update == False and args.search is None: - printft(HTML("[SEARCH] No search term provided, you need to search for something, exiting.")) + printft(HTML("[SEARCH] No search term provided, you need to search for something")) parser.print_help() sys.exit(1) @@ -771,10 +824,10 @@ def validate(self, document): index_to_download_raw = prompt("Enter the number for what you want to download, you can enter multiple numbers using commas: ", validator=Check_game_input()) except KeyboardInterrupt: - printft(HTML("Interrupted by user, exiting.")) + printft(HTML("Interrupted by user")) sys.exit(0) except: - printft(HTML("Interrupted by user, exiting.")) + printft(HTML("Interrupted by user")) sys.exit(0) else: index_to_download_raw = "1" @@ -860,10 +913,10 @@ def validate(self, document): if accept.lower() != "y": raise except KeyboardInterrupt: - printft(HTML("Interrupted by user, exiting.")) + printft(HTML("Interrupted by user")) sys.exit(0) except: - printft(HTML("Interrupted by user, exiting.")) + printft(HTML("Interrupted by user")) sys.exit(0) files_downloaded = [] @@ -877,7 +930,7 @@ def validate(self, document): if "SHA256" in i.keys(): printft(HTML(f"{fill_term()}")) if i["SHA256"] == "": - printft(HTML("[CHECKSUM] No checksum provided by NPS, skipping check.")) + printft(HTML("[CHECKSUM] No checksum provided by NPS, skipping check")) else: sha256_dl = checksum_file(downloaded_file_loc) @@ -896,7 +949,7 @@ def validate(self, document): printft(HTML("[CHECKSUM] downloaded is not corrupted!")) files_downloaded.append(i) else: - print("ERROR: skipping file, wget was unable to download, try again latter.") + print("ERROR: skipping file, wget was unable to download, try again latter") # autoextract with pkg2zip # PKG2ZIP = check_pkg2zip(PKG2ZIP) @@ -914,60 +967,84 @@ def validate(self, document): pass # expose the exact directory were the pkg was extracted! + extraction_folder = "" if i['System'] == "PSV": if i["Type"] in ["GAMES", "DEMOS"]: - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/app/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/app/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/app/{i['Title ID']}")) if i["Type"] == "UPDATES": - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/patch/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/patch/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/patch/{i['Title ID']}")) if i["Type"] == "DLCS": - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/addcont/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/addcont/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/addcont/{i['Title ID']}")) if i["Type"] == "THEMES": theme_folder_name = get_theme_folder_name(f"{DLFOLDER}/Extracted/bgdl/t/") - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/bgdl/t/{theme_folder_name}/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/bgdl/t/{theme_folder_name}/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/bgdl/t/{theme_folder_name}/{i['Title ID']}")) if i['System'] == "PSP": if i["Type"] == "GAMES": if cso_factor == False and args.eboot == False: - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/ISO/game_name [{i['Title ID']}].iso")) + extraction_folder = f"{DLFOLDER}/Extracted/pspemu/ISO/game_name [{i['Title ID']}].iso" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/ISO/game_name [{i['Title ID']}].iso")) elif cso_factor in [str(x) for x in range(1, 10)]: - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/ISO/game_name [{i['Title ID']}].cso")) + extraction_folder = f"{DLFOLDER}/Extracted/pspemu/ISO/game_name [{i['Title ID']}].cso" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/ISO/game_name [{i['Title ID']}].cso")) elif args.eboot == True: - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}")) if i["Type"] == "DLCS": - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}")) if i["Type"] == "THEMES": - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/THEME/theme_name.PTF")) + extraction_folder = f"{DLFOLDER}/Extracted/pspemu/PSP/THEME/theme_name.PTF" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/THEME/theme_name.PTF")) if i["Type"] == "UPDATES": - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}")) if i['System'] == "PSX": - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/pspemu/PSP/GAME/{i['Title ID']}")) if i['System'] == "PSM": - printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/psm/{i['Title ID']}")) + extraction_folder = f"{DLFOLDER}/Extracted/psm/{i['Title ID']}" + # printft(HTML(f"[EXTRACTION] {i['Name']} ➔ {DLFOLDER}/Extracted/psm/{i['Title ID']}")) # -x is default argument to not create .zip files pkg2zip_args = ["-x"] if cso_factor != False and i["Type"] == "GAMES" and i['System'] == "PSP": pkg2zip_args.append("-c"+cso_factor) elif cso_factor != False and i["Type"] != "UPDATES" and i['System'] != "PSP": - printft(HTML(f"[EXTRACTION] cso is only supported for PSP games, since you're extracting a {i['System']} {i['Type'][:-1].lower()} the compression will be skipped.")) + printft(HTML(f"[EXTRACTION] cso is only supported for PSP games, since you're extracting a {i['System']} {i['Type'][:-1].lower()} the compression will be skipped")) if args.eboot == True and i["Type"] == "GAMES" and i['System'] == "PSP": pkg2zip_args.append("-p") # append more commands here if needed! + + printft(HTML("[PKG2ZIP] Attempting to extract .pkg file")) if i['System'] == "PSV" and zrif not in ["", "MISSING", None]: - run_pkg2zip(dl_dile_loc, dl_location, PKG2ZIP, pkg2zip_args, zrif) + delete = run_pkg2zip(dl_dile_loc, dl_location, PKG2ZIP, pkg2zip_args, extraction_folder, zrif) else: - run_pkg2zip(dl_dile_loc, dl_location, PKG2ZIP, pkg2zip_args) + delete = run_pkg2zip(dl_dile_loc, dl_location, PKG2ZIP, pkg2zip_args, extraction_folder) + + #testing if extraction was completion and delete file if needed + if delete == True and keepkg == False: + # delete file + printft(HTML("[EXTRACTION] Attempting to delete .pkg file")) + try: + os.remove(dl_dile_loc) + printft(HTML("[EXTRACTION] Success, the compressed .pkg was deleted")) + except: + printft(HTML(f"[EXTRACTION] Unable to delete {dl_dile_loc}")) else: - printft(HTML( - "[EXTRACTION] skipping extraction since there's no pkg2zip binary in your system.")) + printft(HTML("[EXTRACTION] skipping extraction since there's no pkg2zip binary in your system")) sys.exit(0) printft(HTML(f"{fill_term()}")) printft(HTML("Done!")) if __name__ == '__main__': - main() + main() \ No newline at end of file