Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 157 additions & 15 deletions server/mergin/sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
#
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial
from __future__ import annotations
from contextlib import contextmanager
import json
import logging
import os
import threading
import time
import uuid
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Optional, List, Dict, Set, Tuple
from dataclasses import dataclass, asdict
Expand All @@ -17,11 +19,11 @@
from flask_login import current_user
from pygeodiff import GeoDiff
from sqlalchemy import text, null, desc, nullslast, tuple_
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert
from sqlalchemy.types import String
from sqlalchemy.ext.hybrid import hybrid_property
from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError
from flask import current_app
from flask import Flask, current_app

from .files import (
DeltaChangeMerged,
Expand All @@ -44,7 +46,6 @@
LOG_BASE,
Checkpoint,
generate_checksum,
Toucher,
get_chunk_location,
get_project_path,
is_supported_type,
Expand Down Expand Up @@ -1805,6 +1806,9 @@ class Upload(db.Model):
db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True
)
created = db.Column(db.DateTime, default=datetime.utcnow)
# last ping time to determine if upload is still active
last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
transaction_id = db.Column(db.String, unique=True, nullable=False, index=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add UUID type


user = db.relationship("User")
project = db.relationship(
Expand All @@ -1822,28 +1826,166 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int):
self.version = version
self.changes = ChangesSchema().dump(changes)
self.user_id = user_id
self.transaction_id = str(uuid.uuid4())

@property
def upload_dir(self):
return os.path.join(self.project.storage.project_dir, "tmp", self.id)
@classmethod
def create_upload(
cls, project_id: str, version: int, changes: dict, user_id: int
) -> Upload | None:
"""Create upload session, it can either create a new record or handover an existing one but with new transaction id
Old transaction folder is removed and new one is created.
"""
now = datetime.now(timezone.utc)
expiration = current_app.config["LOCKFILE_EXPIRATION"]
new_tx_id = str(uuid.uuid4())

# CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot)
# NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload
existing_cte = (
db.select(Upload.transaction_id)
.where(
Upload.project_id == project_id,
Upload.version == version,
)
.cte("existing")
)

stmt = (
insert(Upload)
.values(
id=str(uuid.uuid4()),
transaction_id=new_tx_id,
project_id=project_id,
version=version,
user_id=user_id,
last_ping=now,
changes=ChangesSchema().dump(changes),
)
.add_cte(existing_cte)
)

upsert_stmt = stmt.on_conflict_do_update(
constraint="uq_upload_project_id",
set_={
"transaction_id": new_tx_id,
"user_id": user_id,
"last_ping": now,
"changes": ChangesSchema().dump(changes),
},
# ONLY update if the existing row is stale
where=(Upload.last_ping < (now - timedelta(seconds=expiration))),
)
Comment on lines +1838 to +1877
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last_ping is stored in a db.DateTime column without timezone=True (naive timestamps), but create_upload() binds an aware datetime (datetime.now(timezone.utc)) into last_ping and uses it in the staleness predicate. With PostgreSQL/psycopg this can lead to inconsistent comparisons/casts (or runtime errors) depending on server/session timezone settings. Consider storing UTC as naive consistently (e.g., datetime.utcnow() everywhere + UPDATE ... SET last_ping = timezone('utc', now())) or switch the column/migration to DateTime(timezone=True) and keep all values aware.

Copilot uses AI. Check for mistakes.

upsert_stmt = upsert_stmt.returning(
Upload,
db.select(existing_cte.c.transaction_id)
.scalar_subquery()
.label("old_transaction_id"),
)

result = db.session.execute(upsert_stmt).fetchone()
db.session.commit()

# if nothing returned, it means the WHERE clause failed (active upload)
if not result:
return

upload = result.Upload
old_transaction_id = result.old_transaction_id
os.makedirs(upload.upload_dir)

# old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload
if old_transaction_id:
upload.project.sync_failed(
"", "push_lost", "Push artefact removed by subsequent push", user_id
)
if os.path.exists(
os.path.join(
upload.project.storage.project_dir, "tmp", old_transaction_id
)
):
move_to_tmp(
os.path.join(
upload.project.storage.project_dir, "tmp", old_transaction_id
),
old_transaction_id,
)

Comment on lines +1886 to +1913
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_upload() commits the DB transaction before creating/moving the on-disk upload directories. If os.makedirs() or move_to_tmp() fails, the DB row (and new transaction_id) remains committed but the filesystem state is incomplete, potentially blocking future uploads until it becomes stale. Wrap the filesystem operations in a try/except and, on failure, delete the upload row (or rollback/compensate) so the system can recover immediately.

Copilot uses AI. Check for mistakes.
return upload

@property
def lockfile(self):
return os.path.join(self.upload_dir, "lockfile")
def upload_dir(self):
return os.path.join(
self.project.storage.project_dir, "tmp", self.transaction_id
)

def is_active(self):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check If we need this

"""Check if upload is still active because there was a ping (lockfile update) from underlying process"""
return os.path.exists(self.lockfile) and (
time.time() - os.path.getmtime(self.lockfile)
< current_app.config["LOCKFILE_EXPIRATION"]
"""Check if upload is still active because there was a ping from underlying process"""
return datetime.now(tz=timezone.utc) < self.last_ping.replace(
tzinfo=timezone.utc
) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"])
Comment on lines +1924 to +1926
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_active() currently forces tz-awareness via self.last_ping.replace(tzinfo=timezone.utc). If last_ping is persisted as a naive timestamp (as per the column definition/migration), this can silently misinterpret values if the DB/session timezone is not UTC. Once timestamp handling is made consistent (UTC-naive or timezone-aware), simplify this to compare like-for-like (naive↔naive or aware↔aware) without replace(tzinfo=...).

Suggested change
return datetime.now(tz=timezone.utc) < self.last_ping.replace(
tzinfo=timezone.utc
) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"])
return datetime.utcnow() < self.last_ping + timedelta(
seconds=current_app.config["LOCKFILE_EXPIRATION"]
)

Copilot uses AI. Check for mistakes.

def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int):
"""
Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type.
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _heartbeat_task docstring says it "Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type", but the implementation always starts a threading.Thread. Either adjust the docstring to reflect the actual behavior, or implement a gevent-friendly path (e.g., spawn a greenlet when running under gevent) to avoid misleading future maintainers.

Suggested change
Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type.
Background heartbeat task intended to run in a background thread.

Copilot uses AI. Check for mistakes.
Uses a fresh engine connection to stay pool-efficient.
"""
# manual context push is required for background execution
with app.app_context():
while not stop_event.is_set():
try:
# db.engine.begin() is efficient and isolated, it immediately returns a connection to the pool
with db.engine.begin() as conn:
conn.execute(
db.text(
"UPDATE upload SET last_ping = NOW() WHERE id = :id"
),
{"id": self.id},
)
except Exception as e:
logging.exception(
f"Upload heartbeat failed for ID {self.project_id} and version {self.version}: {e}"
)

# wait for x seconds, but wake up immediately if stop_event is set
stop_event.wait(timeout)

@contextmanager
def heartbeat(self, timeout: int = 5):
"""
Context manager to be used inside a Flask route.

Example of usage:
-----------------
with upload.heartbeat(interval):
do_something_slow
"""
# we need to pass a real Flask app object to the thread
app = current_app._get_current_object()
stop_event = threading.Event()

bg = threading.Thread(
target=self._heartbeat_task, args=(app, stop_event, timeout), daemon=True
)

bg.start()
try:
yield
finally:
# signal the loop to stop
stop_event.set()

# wait for the task to finish its last SQL call.
# in Gevent, this yields to other requests (non-blocking), while in Sync, this blocks the current thread for up to 2s
# this is to protect main thread / greenlet from zombie bg processes
bg.join(timeout=2)

def clear(self):
"""Clean up pending upload.
Uploaded files and table records are removed, and another upload can start.
"""
try:
move_to_tmp(self.upload_dir, self.id)
move_to_tmp(self.upload_dir, self.transaction_id)
db.session.delete(self)
db.session.commit()
except Exception:
Expand All @@ -1864,7 +2006,7 @@ def process_chunks(
to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE]
current_files = [f for f in self.project.files if f.path not in to_remove]

with Toucher(self.lockfile, 5):
with self.heartbeat(5):
for f in file_changes:
if f.change == PushChangeType.DELETE:
continue
Expand Down
7 changes: 3 additions & 4 deletions server/mergin/sync/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,16 @@ def check_project_permissions(
return None


def get_upload(transaction_id):
upload = Upload.query.get_or_404(transaction_id)
def get_upload_or_fail(transaction_id: str) -> Upload:
upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404()
# upload to 'removed' projects is forbidden
if upload.project.removed_at:
abort(404)

if upload.user_id != current_user.id:
abort(403, "You do not have permissions for ongoing upload")

upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", transaction_id)
return upload, upload_dir
return upload


def projects_query(permission, as_admin=True, public=True):
Expand Down
2 changes: 1 addition & 1 deletion server/mergin/sync/public_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ paths:
- do integrity check comparing uploaded file sizes with what was expected
- move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset)
- bump up version in database
- remove artifacts (chunks, lockfile) by moving them to tmp directory"
- remove artifacts (chunks) by moving them to tmp directory"
operationId: push_finish
parameters:
- name: transaction_id
Expand Down
Loading
Loading