Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/autotests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

- uses: actions/setup-python@v2
with:
python-version: '3.x'
python-version: '3.8'

- name: Install python package dependencies
run: |
Expand Down
2 changes: 2 additions & 0 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,8 @@ def sync_project_generator(self, project_directory):

def sync_project(self, project_directory):
"""
Syncs project by pulling server changes and pushing local changes. There is intorduced retry mechanism
for handling server conflicts (when server has changes that we do not have yet or somebody else is syncing).
See description of _sync_project_generator().

:param project_directory: Project's directory
Expand Down
46 changes: 29 additions & 17 deletions mergin/client_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@
import time
from typing import List, Tuple, Optional, ByteString

from .local_changes import FileChange, LocalProjectChanges

from .common import UPLOAD_CHUNK_ATTEMPT_WAIT, UPLOAD_CHUNK_ATTEMPTS, UPLOAD_CHUNK_SIZE, ClientError, ErrorCode
from .local_changes import ChangesValidationError, FileChange, LocalProjectChanges

from .common import (
MAX_UPLOAD_VERSIONED_SIZE,
UPLOAD_CHUNK_ATTEMPT_WAIT,
UPLOAD_CHUNK_ATTEMPTS,
UPLOAD_CHUNK_SIZE,
MAX_UPLOAD_MEDIA_SIZE,
ClientError,
)
from .merginproject import MerginProject
from .editor import filter_changes
from .utils import get_data_checksum
Expand Down Expand Up @@ -176,7 +183,7 @@ def start(self, items: List[UploadQueueItem]):

def update_chunks_from_items(self):
"""
Update chunks in LocalChanges from the upload queue items.
Update chunks in LocalProjectChanges from the upload queue items.
Used just before finalizing the transaction to set the server_chunk_id in v2 API.
"""
self.changes.update_chunk_ids([(item.chunk_id, item.server_chunk_id) for item in self.upload_queue_items])
Expand Down Expand Up @@ -301,28 +308,23 @@ def push_project_async(mc, directory) -> Optional[UploadJob]:
mp.log.info(f"--- push {project_path} - nothing to do")
return

mp.log.debug("push changes:\n" + pprint.pformat(changes))
mp.log.debug("push changes:\n" + pprint.pformat(asdict(changes)))
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")

# If there are any versioned files (aka .gpkg) that are not updated through a diff,
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
# That's because if there are pending transactions, checkpointing or switching from WAL mode
# won't work, and we would end up with some changes left in -wal file which do not get
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
for f in changes["updated"]:
if mp.is_versioned_file(f["path"]) and "diff" not in f:
for f in changes.updated:
if mp.is_versioned_file(f.path) and not f.diff:
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

for f in changes["added"]:
if mp.is_versioned_file(f["path"]):
for f in changes.added:
if mp.is_versioned_file(f.path):
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

local_changes = LocalProjectChanges(
added=[FileChange(**change) for change in changes["added"]],
updated=[FileChange(**change) for change in changes["updated"]],
removed=[FileChange(**change) for change in changes["removed"]],
)
job = create_upload_job(mc, mp, local_changes, tmp_dir)
job = create_upload_job(mc, mp, changes, tmp_dir)
return job


Expand Down Expand Up @@ -477,12 +479,22 @@ def remove_diff_files(job: UploadJob) -> None:
os.remove(diff_file)


def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[dict, int]:
def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[LocalProjectChanges, int]:
"""
Get changes that need to be pushed to the server.
"""
changes = mp.get_push_changes()
project_role = mp.project_role()
changes = filter_changes(mc, project_role, changes)

return changes, sum(len(v) for v in changes.values())
try:
local_changes = LocalProjectChanges(
added=[FileChange(**change) for change in changes["added"]],
updated=[FileChange(**change) for change in changes["updated"]],
removed=[FileChange(**change) for change in changes["removed"]],
)
except ChangesValidationError as e:
raise ClientError(
f"Some files exceeded maximum upload size. Files: {', '.join([c.path for c in e.invalid_changes])}. Maximum size for media files is {MAX_UPLOAD_MEDIA_SIZE / (1024**3)} GB and for geopackage files {MAX_UPLOAD_VERSIONED_SIZE / (1024**3)} GB."
)
return local_changes, sum(len(v) for v in changes.values())
6 changes: 6 additions & 0 deletions mergin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
# seconds to wait between sync callback calls
SYNC_CALLBACK_WAIT = 0.01

# maximum size of media file able to upload in one push (in bytes)
MAX_UPLOAD_MEDIA_SIZE = 10 * (1024**3)

# maximum size of GPKG file able to upload in one push (in bytes)
MAX_UPLOAD_VERSIONED_SIZE = 5 * (1024**3)

# default URL for submitting logs
MERGIN_DEFAULT_LOGS_URL = "https://g4pfq226j0.execute-api.eu-west-1.amazonaws.com/mergin_client_log_submit"

Expand Down
2 changes: 1 addition & 1 deletion mergin/editor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from itertools import filterfalse
from typing import Callable, Dict, List

from .utils import is_mergin_config, is_qgis_file, is_versioned_file
from .utils import is_qgis_file

EDITOR_ROLE_NAME = "editor"

Expand Down
35 changes: 35 additions & 0 deletions mergin/local_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@
from datetime import datetime
from typing import Optional, List, Tuple

from .utils import is_versioned_file
from .common import MAX_UPLOAD_MEDIA_SIZE, MAX_UPLOAD_VERSIONED_SIZE

MAX_UPLOAD_CHANGES = 100


# The custom exception
class ChangesValidationError(Exception):
def __init__(self, message, invalid_changes=[]):
super().__init__(message)
self.invalid_changes = invalid_changes if invalid_changes is not None else []


@dataclass
class FileDiffChange:
Expand Down Expand Up @@ -68,6 +80,29 @@ class LocalProjectChanges:
updated: List[FileChange] = field(default_factory=list)
removed: List[FileChange] = field(default_factory=list)

def __post_init__(self):
"""
Enforce a limit of changes combined from `added` and `updated`.
"""
upload_changes = self.get_upload_changes()
total_changes = len(upload_changes)
oversize_changes = []
for change in upload_changes:
if not is_versioned_file(change.path) and change.size > MAX_UPLOAD_MEDIA_SIZE:
oversize_changes.append(change)
elif not change.diff and change.size > MAX_UPLOAD_VERSIONED_SIZE:
oversize_changes.append(change)
if oversize_changes:
error = ChangesValidationError("Some files exceed the maximum upload size", oversize_changes)
raise error

if total_changes > MAX_UPLOAD_CHANGES:
# Calculate how many changes to keep from `added` and `updated`
added_limit = min(len(self.added), MAX_UPLOAD_CHANGES)
updated_limit = MAX_UPLOAD_CHANGES - added_limit
self.added = self.added[:added_limit]
self.updated = self.updated[:updated_limit]

def to_server_payload(self) -> dict:
return {
"added": [change.to_server_data() for change in self.added],
Expand Down
14 changes: 7 additions & 7 deletions mergin/merginproject.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
conflicted_copy_file_name,
edit_conflict_file_name,
)

from .local_changes import FileChange

this_dir = os.path.dirname(os.path.realpath(__file__))

Expand Down Expand Up @@ -470,20 +470,20 @@ def get_push_changes(self):
changes["updated"] = [f for f in changes["updated"] if f not in not_updated]
return changes

def copy_versioned_file_for_upload(self, f, tmp_dir):
def copy_versioned_file_for_upload(self, f: FileChange, tmp_dir: str) -> str:
"""
Make a temporary copy of the versioned file using geodiff, to make sure that we have full
content in a single file (nothing left in WAL journal)
"""
path = f["path"]
path = f.path
self.log.info("Making a temporary copy (full upload): " + path)
tmp_file = os.path.join(tmp_dir, path)
os.makedirs(os.path.dirname(tmp_file), exist_ok=True)
self.geodiff.make_copy_sqlite(self.fpath(path), tmp_file)
f["size"] = os.path.getsize(tmp_file)
f["checksum"] = generate_checksum(tmp_file)
f["chunks"] = [str(uuid.uuid4()) for i in range(math.ceil(f["size"] / UPLOAD_CHUNK_SIZE))]
f["upload_file"] = tmp_file
f.size = os.path.getsize(tmp_file)
f.checksum = generate_checksum(tmp_file)
f.chunks = [str(uuid.uuid4()) for i in range(math.ceil(f.size / UPLOAD_CHUNK_SIZE))]
f.upload_file = tmp_file
return tmp_file

def get_list_of_push_changes(self, push_changes):
Expand Down
23 changes: 22 additions & 1 deletion mergin/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2328,8 +2328,10 @@ def test_clean_diff_files(mc):
shutil.copy(mp.fpath("inserted_1_A.gpkg"), mp.fpath(f_updated))
mc.push_project(project_dir)

diff_files = glob.glob("*-diff-*", root_dir=os.path.split(mp.fpath_meta("inserted_1_A.gpkg"))[0])
directory = os.path.split(mp.fpath_meta("inserted_1_A.gpkg"))[0]
diff_files = [f for f in os.listdir(directory) if "-diff-" in f]

# Assert that no matching files are found
assert diff_files == []


Expand Down Expand Up @@ -3214,3 +3216,22 @@ def test_client_project_sync_retry(mc):
with pytest.raises(ClientError):
mc.sync_project(project_dir)
assert mock_push_project_async.call_count == 2


def test_push_file_limits(mc):
test_project = "test_push_file_limits"
project = API_USER + "/" + test_project
project_dir = os.path.join(TMP_DIR, test_project)
cleanup(mc, project, [project_dir])
mc.create_project(test_project)
mc.download_project(project, project_dir)
shutil.copy(os.path.join(TEST_DATA_DIR, "base.gpkg"), project_dir)
# setting to some minimal value to mock limit hit
with patch("mergin.local_changes.MAX_UPLOAD_VERSIONED_SIZE", 1):
with pytest.raises(ClientError, match=f"Some files exceeded maximum upload size. Files: base.gpkg."):
mc.push_project(project_dir)

shutil.copy(os.path.join(TEST_DATA_DIR, "test.txt"), project_dir)
with patch("mergin.local_changes.MAX_UPLOAD_MEDIA_SIZE", 1):
with pytest.raises(ClientError, match=f"Some files exceeded maximum upload size. Files: test.txt."):
mc.push_project(project_dir)
94 changes: 93 additions & 1 deletion mergin/test/test_local_changes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime
import pytest
from unittest.mock import patch

from ..local_changes import FileChange, LocalProjectChanges
from ..local_changes import ChangesValidationError, FileChange, LocalProjectChanges, MAX_UPLOAD_CHANGES


def test_local_changes_from_dict():
Expand Down Expand Up @@ -118,3 +120,93 @@ def test_local_changes_get_upload_changes():
assert len(upload_changes) == 2 # Only added and updated should be included
assert upload_changes[0].path == "file1.txt" # First change is from added
assert upload_changes[1].path == "file2.txt" # Second change is from updated


def test_local_changes_post_init_validation_media():
"""Test the get_media_upload_file method of LocalProjectChanges."""
# Define constants
SIZE_LIMIT_MB = 5
SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 15 * 1024 * 1024

# Create sample LocalChange instances
added = [
FileChange(path="file1.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
FileChange(path="file2.jpg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now()), # Over limit
]
updated = [
FileChange(path="file3.mp4", checksum="lmn456", size=5 * 1024 * 1024, mtime=datetime.now()),
FileChange(path="file4.gpkg", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
]

# Initialize LocalProjectChanges
with patch("mergin.local_changes.MAX_UPLOAD_MEDIA_SIZE", SIZE_LIMIT_BYTES):
with pytest.raises(ChangesValidationError, match="Some files exceed") as err:
LocalProjectChanges(added=added, updated=updated)
print(err.value.invalid_changes)
assert len(err.value.invalid_changes) == 1
assert "file2.jpg" == err.value.invalid_changes[0].path
assert err.value.invalid_changes[0].size == LARGE_FILE_SIZE


def test_local_changes_post_init_validation_gpgkg():
"""Test the get_gpgk_upload_file method of LocalProjectChanges."""
# Define constants
SIZE_LIMIT_MB = 10
SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 15 * 1024 * 1024

# Create sample LocalChange instances
added = [
FileChange(path="file1.gpkg", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
FileChange(
path="file2.gpkg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now(), diff=None
), # Over limit
]
updated = [
FileChange(
path="file3.gpkg",
checksum="lmn456",
size=SIZE_LIMIT_BYTES + 1,
mtime=datetime.now(),
diff={"path": "file3-diff.gpkg", "checksum": "diff123", "size": 1024, "mtime": datetime.now()},
),
FileChange(path="file4.txt", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
]

# Initialize LocalProjectChanges
with patch("mergin.local_changes.MAX_UPLOAD_VERSIONED_SIZE", SIZE_LIMIT_BYTES):
with pytest.raises(ChangesValidationError) as err:
LocalProjectChanges(added=added, updated=updated)
assert len(err.value.invalid_changes) == 1
assert "file2.gpkg" == err.value.invalid_changes[0].path
assert err.value.invalid_changes[0].size == LARGE_FILE_SIZE


def test_local_changes_post_init():
"""Test the __post_init__ method of LocalProjectChanges."""
# Define constants
ADDED_COUNT = 80
UPDATED_COUNT = 21
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 2048

# Create more than MAX_UPLOAD_CHANGES changes
added = [
FileChange(path=f"file{i}.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now())
for i in range(ADDED_COUNT)
]
updated = [
FileChange(path=f"file{i}.txt", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now())
for i in range(UPDATED_COUNT)
]

# Initialize LocalProjectChanges
local_changes = LocalProjectChanges(added=added, updated=updated)

# Assertions
assert len(local_changes.added) == ADDED_COUNT # All added changes are included
assert len(local_changes.updated) == MAX_UPLOAD_CHANGES - ADDED_COUNT # Only enough updated changes are included
assert len(local_changes.added) + len(local_changes.updated) == MAX_UPLOAD_CHANGES # Total is limited