Skip to content

Introduce reset_local_changes() #189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ssl
from enum import Enum, auto
import re
import typing
import warnings

from .common import ClientError, LoginError, InvalidProject
Expand All @@ -22,6 +23,8 @@
download_file_finalize,
download_project_async,
download_file_async,
download_files_async,
download_files_finalize,
download_diffs_async,
download_project_finalize,
download_project_wait,
Expand Down Expand Up @@ -1127,3 +1130,63 @@ def has_writing_permissions(self, project_path):
"""
info = self.project_info(project_path)
return info["permissions"]["upload"]

def reset_local_changes(self, directory: str, files_to_reset: typing.List[str] = None) -> None:
"""
Reset local changes to either all files or only listed files.
Added files are removed, removed files are brought back and updates are discarded.

:param directory: Project's directory
:type directory: String
:param files_to_reset List of files to reset, relative paths of file
:type files_to_reset: List of strings, default None
"""
all_files = files_to_reset is None

mp = MerginProject(directory)

current_version = mp.version()

push_changes = mp.get_push_changes()

files_download = []

# remove all added files
for file in push_changes["added"]:
if all_files or file["path"] in files_to_reset:
os.remove(mp.fpath(file["path"]))

# update files get override with previous version
for file in push_changes["updated"]:
if all_files or file["path"] in files_to_reset:
if mp.is_versioned_file(file["path"]):
mp.geodiff.make_copy_sqlite(mp.fpath_meta(file["path"]), mp.fpath(file["path"]))
else:
files_download.append(file["path"])

# removed files are redownloaded
for file in push_changes["removed"]:
if all_files or file["path"] in files_to_reset:
files_download.append(file["path"])

if files_download:
self.download_files(directory, files_download, version=current_version)

def download_files(
self, project_dir: str, file_paths: typing.List[str], output_paths: typing.List[str] = None, version: str = None
):
"""
Download project files at specified version. Get the latest if no version specified.

:param project_dir: project local directory
:type project_dir: String
:param file_path: List of relative paths of files to download in the project directory
:type file_path: List[String]
:param output_paths: List of paths for files to download to. Should be same length of as file_path. Default is `None` which means that files are downloaded into MerginProject at project_dir.
:type output_paths: List[String]
:param version: optional version tag for downloaded file
:type version: String
"""
job = download_files_async(self, project_dir, file_paths, output_paths, version=version)
pull_project_wait(job)
download_files_finalize(job)
168 changes: 103 additions & 65 deletions mergin/client_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pprint
import shutil
import tempfile
import typing

import concurrent.futures

Expand Down Expand Up @@ -621,77 +622,14 @@ def download_file_async(mc, project_dir, file_path, output_file, version):
Starts background download project file at specified version.
Returns handle to the pending download.
"""
mp = MerginProject(project_dir)
project_path = mp.project_full_name()
ver_info = f"at version {version}" if version is not None else "at latest version"
mp.log.info(f"Getting {file_path} {ver_info}")
latest_proj_info = mc.project_info(project_path)
if version:
project_info = mc.project_info(project_path, version=version)
else:
project_info = latest_proj_info
mp.log.info(f"Got project info. version {project_info['version']}")

# set temporary directory for download
temp_dir = tempfile.mkdtemp(prefix="mergin-py-client-")

download_list = []
update_tasks = []
total_size = 0
# None can not be used to indicate latest version of the file, so
# it is necessary to pass actual version.
if version is None:
version = latest_proj_info["version"]
for file in project_info["files"]:
if file["path"] == file_path:
file["version"] = version
items = _download_items(file, temp_dir)
is_latest_version = version == latest_proj_info["version"]
task = UpdateTask(file["path"], items, output_file, latest_version=is_latest_version)
download_list.extend(task.download_queue_items)
for item in task.download_queue_items:
total_size += item.size
update_tasks.append(task)
break
if not download_list:
warn = f"No {file_path} exists at version {version}"
mp.log.warning(warn)
shutil.rmtree(temp_dir)
raise ClientError(warn)

mp.log.info(f"will download file {file_path} in {len(download_list)} chunks, total size {total_size}")
job = DownloadJob(project_path, total_size, version, update_tasks, download_list, temp_dir, mp, project_info)
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
job.futures = []
for item in download_list:
future = job.executor.submit(_do_download, item, mc, mp, project_path, job)
job.futures.append(future)

return job
return download_files_async(mc, project_dir, [file_path], [output_file], version)


def download_file_finalize(job):
"""
To be called when download_file_async is finished
"""
job.executor.shutdown(wait=True)

# make sure any exceptions from threads are not lost
for future in job.futures:
if future.exception() is not None:
raise future.exception()

job.mp.log.info("--- download finished")

temp_dir = None
for task in job.update_tasks:
task.apply(job.directory, job.mp)
if task.download_queue_items:
temp_dir = os.path.dirname(task.download_queue_items[0].download_file_path)

# Remove temporary download directory
if temp_dir is not None:
shutil.rmtree(temp_dir)
download_files_finalize(job)


def download_diffs_async(mc, project_directory, file_path, versions):
Expand Down Expand Up @@ -804,3 +742,103 @@ def download_diffs_finalize(job):

job.mp.log.info("--- diffs pull finished")
return diffs


def download_files_async(
mc, project_dir: str, file_paths: typing.List[str], output_paths: typing.List[str], version: str
):
"""
Starts background download project files at specified version.
Returns handle to the pending download.
"""
mp = MerginProject(project_dir)
project_path = mp.project_full_name()
ver_info = f"at version {version}" if version is not None else "at latest version"
mp.log.info(f"Getting [{', '.join(file_paths)}] {ver_info}")
latest_proj_info = mc.project_info(project_path)
if version:
project_info = mc.project_info(project_path, version=version)
else:
project_info = latest_proj_info
mp.log.info(f"Got project info. version {project_info['version']}")

# set temporary directory for download
temp_dir = tempfile.mkdtemp(prefix="mergin-py-client-")

if output_paths is None:
output_paths = []
for file in file_paths:
output_paths.append(mp.fpath(file))

if len(output_paths) != len(file_paths):
warn = "Output file paths are not of the same length as file paths. Cannot store required files."
mp.log.warning(warn)
shutil.rmtree(temp_dir)
raise ClientError(warn)

download_list = []
update_tasks = []
total_size = 0
# None can not be used to indicate latest version of the file, so
# it is necessary to pass actual version.
if version is None:
version = latest_proj_info["version"]
for file in project_info["files"]:
if file["path"] in file_paths:
index = file_paths.index(file["path"])
file["version"] = version
items = _download_items(file, temp_dir)
is_latest_version = version == latest_proj_info["version"]
task = UpdateTask(file["path"], items, output_paths[index], latest_version=is_latest_version)
download_list.extend(task.download_queue_items)
for item in task.download_queue_items:
total_size += item.size
update_tasks.append(task)

missing_files = []
files_to_download = []
project_file_paths = [file["path"] for file in project_info["files"]]
for file in file_paths:
if file not in project_file_paths:
missing_files.append(file)
else:
files_to_download.append(file)

if not download_list or missing_files:
warn = f"No [{', '.join(missing_files)}] exists at version {version}"
mp.log.warning(warn)
shutil.rmtree(temp_dir)
raise ClientError(warn)

mp.log.info(
f"will download files [{', '.join(files_to_download)}] in {len(download_list)} chunks, total size {total_size}"
)
job = DownloadJob(project_path, total_size, version, update_tasks, download_list, temp_dir, mp, project_info)
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
job.futures = []
for item in download_list:
future = job.executor.submit(_do_download, item, mc, mp, project_path, job)
job.futures.append(future)

return job


def download_files_finalize(job):
"""
To be called when download_file_async is finished
"""
job.executor.shutdown(wait=True)

# make sure any exceptions from threads are not lost
for future in job.futures:
if future.exception() is not None:
raise future.exception()

job.mp.log.info("--- download finished")

for task in job.update_tasks:
task.apply(job.directory, job.mp)

# Remove temporary download directory
if job.directory is not None and os.path.exists(job.directory):
shutil.rmtree(job.directory)
Loading