|
| 1 | +# Licensed under the GPL-3.0 License. |
| 2 | +# Created for TagStudio: https://github.com/CyanVoxel/TagStudio |
| 3 | + |
| 4 | +import shutil |
| 5 | +import typing |
| 6 | +from pathlib import Path |
| 7 | + |
| 8 | +from PySide6.QtCore import QThreadPool |
| 9 | +from PySide6.QtGui import QDragEnterEvent, QDragMoveEvent, QDropEvent |
| 10 | +from PySide6.QtWidgets import QMessageBox |
| 11 | +from src.qt.helpers.custom_runnable import CustomRunnable |
| 12 | +from src.qt.helpers.function_iterator import FunctionIterator |
| 13 | +from src.qt.widgets.progress import ProgressWidget |
| 14 | + |
| 15 | +if typing.TYPE_CHECKING: |
| 16 | + from src.qt.ts_qt import QtDriver |
| 17 | + |
| 18 | +import logging |
| 19 | + |
| 20 | + |
| 21 | +class DropImport: |
| 22 | + def __init__(self, driver: "QtDriver"): |
| 23 | + self.driver = driver |
| 24 | + |
| 25 | + def dropEvent(self, event: QDropEvent): # noqa: N802 |
| 26 | + if ( |
| 27 | + event.source() is self.driver |
| 28 | + ): # change that if you want to drop something originating from tagstudio, for moving or so |
| 29 | + return |
| 30 | + |
| 31 | + if not event.mimeData().hasUrls(): |
| 32 | + return |
| 33 | + |
| 34 | + self.urls = event.mimeData().urls() |
| 35 | + self.import_files() |
| 36 | + |
| 37 | + def dragEnterEvent(self, event: QDragEnterEvent): # noqa: N802 |
| 38 | + if event.mimeData().hasUrls(): |
| 39 | + event.accept() |
| 40 | + else: |
| 41 | + event.ignore() |
| 42 | + |
| 43 | + def dragMoveEvent(self, event: QDragMoveEvent): # noqa: N802 |
| 44 | + if event.mimeData().hasUrls(): |
| 45 | + event.accept() |
| 46 | + else: |
| 47 | + logging.info(self.driver.selected) |
| 48 | + event.ignore() |
| 49 | + |
| 50 | + def import_files(self): |
| 51 | + self.files: list[Path] = [] |
| 52 | + self.dirs_in_root: list[Path] = [] |
| 53 | + self.duplicate_files: list[Path] = [] |
| 54 | + |
| 55 | + def displayed_text(x): |
| 56 | + text = f"Searching New Files...\n{x[0] + 1} File{'s' if x[0] + 1 != 1 else ''} Found." |
| 57 | + if x[1] == 0: |
| 58 | + return text |
| 59 | + return text + f" {x[1]} Already exist in the library folders" |
| 60 | + |
| 61 | + create_progress_bar( |
| 62 | + self.collect_files_to_import, |
| 63 | + "Searching Files", |
| 64 | + "Searching New Files...\nPreparing...", |
| 65 | + displayed_text, |
| 66 | + self.ask_user, |
| 67 | + ) |
| 68 | + |
| 69 | + def collect_files_to_import(self): |
| 70 | + for url in self.urls: |
| 71 | + if not url.isLocalFile(): |
| 72 | + continue |
| 73 | + |
| 74 | + file = Path(url.toLocalFile()) |
| 75 | + |
| 76 | + if file.is_dir(): |
| 77 | + for f in self.get_files_in_folder(file): |
| 78 | + if f.is_dir(): |
| 79 | + continue |
| 80 | + self.files.append(f) |
| 81 | + if (self.driver.lib.library_dir / self.get_relative_path(file)).exists(): |
| 82 | + self.duplicate_files.append(f) |
| 83 | + yield [len(self.files), len(self.duplicate_files)] |
| 84 | + |
| 85 | + self.dirs_in_root.append(file.parent) |
| 86 | + else: |
| 87 | + self.files.append(file) |
| 88 | + |
| 89 | + if file.parent not in self.dirs_in_root: |
| 90 | + self.dirs_in_root.append( |
| 91 | + file.parent |
| 92 | + ) # to create relative path of files not in folder |
| 93 | + |
| 94 | + if (Path(self.driver.lib.library_dir) / file.name).exists(): |
| 95 | + self.duplicate_files.append(file) |
| 96 | + |
| 97 | + yield [len(self.files), len(self.duplicate_files)] |
| 98 | + |
| 99 | + def copy_files(self): |
| 100 | + file_count = 0 |
| 101 | + duplicated_files_progress = 0 |
| 102 | + for file in self.files: |
| 103 | + if file.is_dir(): |
| 104 | + continue |
| 105 | + |
| 106 | + dest_file = self.get_relative_path(file) |
| 107 | + |
| 108 | + if file in self.duplicate_files: |
| 109 | + duplicated_files_progress += 1 |
| 110 | + if self.choice == 1: # overwrite |
| 111 | + pass |
| 112 | + elif self.choice == 2: # rename |
| 113 | + new_name = self.get_renamed_duplicate_filename_in_lib(dest_file) |
| 114 | + dest_file = dest_file.with_name(new_name) |
| 115 | + else: # skip |
| 116 | + continue |
| 117 | + |
| 118 | + (self.driver.lib.library_dir / dest_file).parent.mkdir(parents=True, exist_ok=True) |
| 119 | + shutil.copyfile(file, self.driver.lib.library_dir / dest_file) |
| 120 | + |
| 121 | + file_count += 1 |
| 122 | + yield [file_count, duplicated_files_progress] |
| 123 | + |
| 124 | + def ask_user(self): |
| 125 | + self.choice = -1 |
| 126 | + |
| 127 | + if len(self.duplicate_files) > 0: |
| 128 | + self.duplicates_choice() |
| 129 | + else: |
| 130 | + self.begin_transfer() |
| 131 | + |
| 132 | + def duplicate_prompt_callback(self, button): |
| 133 | + match button: |
| 134 | + case self.skip_button: |
| 135 | + self.choice = 0 |
| 136 | + case self.overwrite_button: |
| 137 | + self.choice = 1 |
| 138 | + case self.rename_button: |
| 139 | + self.choice = 2 |
| 140 | + case _: |
| 141 | + # skip on unknown state |
| 142 | + self.choice = 0 |
| 143 | + |
| 144 | + self.begin_transfer() |
| 145 | + |
| 146 | + def begin_transfer(self): |
| 147 | + def displayed_text(x): |
| 148 | + dupes_choice_text = ( |
| 149 | + "Skipped" if self.choice == 0 else ("Overridden" if self.choice == 1 else "Renamed") |
| 150 | + ) |
| 151 | + |
| 152 | + text = ( |
| 153 | + f"Importing New Files...\n{x[0] + 1} File{'s' if x[0] + 1 != 1 else ''} Imported." |
| 154 | + ) |
| 155 | + if x[1] == 0: |
| 156 | + return text |
| 157 | + return text + f" {x[1]} {dupes_choice_text}" |
| 158 | + |
| 159 | + create_progress_bar( |
| 160 | + self.copy_files, |
| 161 | + "Import Files", |
| 162 | + "Importing New Files...\nPreparing...", |
| 163 | + displayed_text, |
| 164 | + self.driver.add_new_files_callback, |
| 165 | + len(self.files), |
| 166 | + ) |
| 167 | + |
| 168 | + def duplicates_choice(self): |
| 169 | + display_limit: int = 5 |
| 170 | + self.msg_box = QMessageBox() |
| 171 | + self.msg_box.setWindowTitle(f"File Conflict{'s' if len(self.duplicate_files) > 1 else ''}") |
| 172 | + |
| 173 | + dupes_to_show = self.duplicate_files |
| 174 | + if len(self.duplicate_files) > display_limit: |
| 175 | + dupes_to_show = dupes_to_show[0:display_limit] |
| 176 | + |
| 177 | + dupe_file_names = "" |
| 178 | + for dupe in dupes_to_show: |
| 179 | + dupe_file_names += f" {self.get_relative_path(dupe)}\n" |
| 180 | + if len(self.duplicate_files) > display_limit: |
| 181 | + dupe_file_names += f" and {len(self.duplicate_files) - display_limit} more\n" |
| 182 | + |
| 183 | + self.msg_box.setText( |
| 184 | + f"The following files: \n{dupe_file_names}" |
| 185 | + + "have filenames that already exist in the library folder." |
| 186 | + ) |
| 187 | + |
| 188 | + self.skip_button = self.msg_box.addButton("Skip", QMessageBox.ButtonRole.YesRole) |
| 189 | + self.overwrite_button = self.msg_box.addButton( |
| 190 | + "Overwrite", QMessageBox.ButtonRole.DestructiveRole |
| 191 | + ) |
| 192 | + self.rename_button = self.msg_box.addButton( |
| 193 | + "Rename", QMessageBox.ButtonRole.DestructiveRole |
| 194 | + ) |
| 195 | + self.cancel_button = self.msg_box.setStandardButtons(QMessageBox.Cancel) # type: ignore[attr-defined] |
| 196 | + self.msg_box.buttonClicked.connect(lambda button: self.duplicate_prompt_callback(button)) |
| 197 | + |
| 198 | + self.msg_box.open() |
| 199 | + |
| 200 | + def get_files_exists_in_library(self, path: Path) -> list[Path]: |
| 201 | + exists: list[Path] = [] |
| 202 | + if not path.is_dir(): |
| 203 | + return exists |
| 204 | + |
| 205 | + files = self.get_files_in_folder(path) |
| 206 | + for file in files: |
| 207 | + if file.is_dir(): |
| 208 | + exists += self.get_files_exists_in_library(file) |
| 209 | + elif (self.driver.lib.library_dir / self.get_relative_path(file)).exists(): |
| 210 | + exists.append(file) |
| 211 | + return exists |
| 212 | + |
| 213 | + def get_relative_path(self, path: Path) -> Path: |
| 214 | + for dir in self.dirs_in_root: |
| 215 | + if path.is_relative_to(dir): |
| 216 | + return path.relative_to(dir) |
| 217 | + return Path(path.name) |
| 218 | + |
| 219 | + def get_files_in_folder(self, path: Path) -> list[Path]: |
| 220 | + files = [] |
| 221 | + for file in path.glob("**/*"): |
| 222 | + files.append(file) |
| 223 | + return files |
| 224 | + |
| 225 | + def get_renamed_duplicate_filename_in_lib(self, filepath: Path) -> str: |
| 226 | + index = 2 |
| 227 | + o_filename = filepath.name |
| 228 | + dot_idx = o_filename.index(".") |
| 229 | + while (self.driver.lib.library_dir / filepath).exists(): |
| 230 | + filepath = filepath.with_name( |
| 231 | + o_filename[:dot_idx] + f" ({index})" + o_filename[dot_idx:] |
| 232 | + ) |
| 233 | + index += 1 |
| 234 | + return filepath.name |
| 235 | + |
| 236 | + |
| 237 | +def create_progress_bar( |
| 238 | + function, title: str, text: str, update_label_callback, done_callback, max=0 |
| 239 | +): |
| 240 | + iterator = FunctionIterator(function) |
| 241 | + pw = ProgressWidget( |
| 242 | + window_title=title, |
| 243 | + label_text=text, |
| 244 | + cancel_button_text=None, |
| 245 | + minimum=0, |
| 246 | + maximum=max, |
| 247 | + ) |
| 248 | + pw.show() |
| 249 | + iterator.value.connect(lambda x: pw.update_progress(x[0] + 1)) |
| 250 | + iterator.value.connect(lambda x: pw.update_label(update_label_callback(x))) |
| 251 | + r = CustomRunnable(lambda: iterator.run()) |
| 252 | + r.done.connect(lambda: (pw.hide(), done_callback())) # type: ignore |
| 253 | + QThreadPool.globalInstance().start(r) |
0 commit comments