Conversation
This is a refactoring that replaces the filesystem-based Toucher/lockfile mechanism with a database-based heartbeat approach for tracking active uploads. This eliminates the NFS-specific hack (os.access() + os.utime()) that was needed to bust NFS attribute caches in the old approach. The new approach is cleaner and more cloud-native.
Replace the old try/except IntegrityError + cleanup loop pattern with an atomic upsert in Upload.create_upload(). Decouple the upload directory name from the DB primary key via transaction_id. Upload directory is created at this stage, no need to take care for it later. With the upsert logic, the ObjectDeletedError scenario which happened because a concurrent request could delete a stale upload row mid-operation is eliminated: - During push_finish, the heartbeat context manager continuously updates last_ping, keeping the upload fresh throughout the operation - A concurrent request can only take over an upload whose last_ping has expired - Since heartbeat prevents expiry, no other request can claim the row while push_finish is running - The upload object therefore stays valid for the full lifetime of the request — ObjectDeletedError becomes impossible
Coverage Report for CI Build 24507690430Coverage decreased (-0.04%) to 93.207%Details
Uncovered Changes
Coverage Regressions7 previously-covered lines in 3 files lost coverage.
Coverage Stats
💛 - Coveralls |
There was a problem hiding this comment.
Pull request overview
This PR replaces filesystem lockfile-based upload coordination with database-driven liveness tracking (upload.last_ping) and introduces a stable external upload identifier (upload.transaction_id), enabling concurrent upload handling via an atomic upsert/takeover strategy.
Changes:
- Add
last_pingandtransaction_idcolumns toupload(with backfills and constraints). - Rework upload session creation to use an upsert-based “stale takeover” approach and heartbeat updates instead of lockfiles.
- Update API/controllers/schemas/tests to use
transaction_idin routes and responses.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py | Adds upload.last_ping and backfills to support DB-based liveness detection. |
| server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py | Adds upload.transaction_id, backfills from id, enforces NOT NULL + unique index for stable external transaction IDs. |
| server/mergin/sync/models.py | Implements Upload.create_upload() upsert/takeover strategy, upload_dir based on transaction_id, and heartbeat thread updating last_ping. |
| server/mergin/sync/public_api_controller.py | Switches v1 push flow to Upload.create_upload(), replaces lockfile toucher with upload.heartbeat(), and returns/uses transaction_id. |
| server/mergin/sync/public_api_v2_controller.py | Switches v2 create version flow to Upload.create_upload() and uses upload.heartbeat() during heavy work. |
| server/mergin/sync/permissions.py | Replaces get_upload with get_upload_or_fail() lookup by transaction_id. |
| server/mergin/sync/schemas.py | Updates serialized “uploads” lists to expose transaction_id rather than internal id. |
| server/mergin/sync/utils.py | Removes the old lockfile Toucher helper. |
| server/mergin/sync/public_api.yaml | Updates push_finish description to remove lockfile mention. |
| server/mergin/tests/test_project_controller.py | Updates tests to use transaction_id everywhere and adds coverage for stale upload takeover + cleanup. |
| server/mergin/tests/test_public_api_v2.py | Updates concurrency-related test setup to use Upload.create_upload(). |
| server/mergin/tests/test_db_hooks.py | Updates upload creation in tests to use Upload.create_upload(). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| now = datetime.now(timezone.utc) | ||
| expiration = current_app.config["LOCKFILE_EXPIRATION"] | ||
| new_tx_id = str(uuid.uuid4()) | ||
|
|
||
| # CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot) | ||
| # NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload | ||
| existing_cte = ( | ||
| db.select(Upload.transaction_id) | ||
| .where( | ||
| Upload.project_id == project_id, | ||
| Upload.version == version, | ||
| ) | ||
| .cte("existing") | ||
| ) | ||
|
|
||
| stmt = ( | ||
| insert(Upload) | ||
| .values( | ||
| id=str(uuid.uuid4()), | ||
| transaction_id=new_tx_id, | ||
| project_id=project_id, | ||
| version=version, | ||
| user_id=user_id, | ||
| last_ping=now, | ||
| changes=ChangesSchema().dump(changes), | ||
| ) | ||
| .add_cte(existing_cte) | ||
| ) | ||
|
|
||
| upsert_stmt = stmt.on_conflict_do_update( | ||
| constraint="uq_upload_project_id", | ||
| set_={ | ||
| "transaction_id": new_tx_id, | ||
| "user_id": user_id, | ||
| "last_ping": now, | ||
| "changes": ChangesSchema().dump(changes), | ||
| }, | ||
| # ONLY update if the existing row is stale | ||
| where=(Upload.last_ping < (now - timedelta(seconds=expiration))), | ||
| ) |
There was a problem hiding this comment.
last_ping is stored in a db.DateTime column without timezone=True (naive timestamps), but create_upload() binds an aware datetime (datetime.now(timezone.utc)) into last_ping and uses it in the staleness predicate. With PostgreSQL/psycopg this can lead to inconsistent comparisons/casts (or runtime errors) depending on server/session timezone settings. Consider storing UTC as naive consistently (e.g., datetime.utcnow() everywhere + UPDATE ... SET last_ping = timezone('utc', now())) or switch the column/migration to DateTime(timezone=True) and keep all values aware.
| return datetime.now(tz=timezone.utc) < self.last_ping.replace( | ||
| tzinfo=timezone.utc | ||
| ) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"]) |
There was a problem hiding this comment.
is_active() currently forces tz-awareness via self.last_ping.replace(tzinfo=timezone.utc). If last_ping is persisted as a naive timestamp (as per the column definition/migration), this can silently misinterpret values if the DB/session timezone is not UTC. Once timestamp handling is made consistent (UTC-naive or timezone-aware), simplify this to compare like-for-like (naive↔naive or aware↔aware) without replace(tzinfo=...).
| return datetime.now(tz=timezone.utc) < self.last_ping.replace( | |
| tzinfo=timezone.utc | |
| ) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"]) | |
| return datetime.utcnow() < self.last_ping + timedelta( | |
| seconds=current_app.config["LOCKFILE_EXPIRATION"] | |
| ) |
| result = db.session.execute(upsert_stmt).fetchone() | ||
| db.session.commit() | ||
|
|
||
| # if nothing returned, it means the WHERE clause failed (active upload) | ||
| if not result: | ||
| return | ||
|
|
||
| upload = result.Upload | ||
| old_transaction_id = result.old_transaction_id | ||
| os.makedirs(upload.upload_dir) | ||
|
|
||
| # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload | ||
| if old_transaction_id: | ||
| upload.project.sync_failed( | ||
| "", "push_lost", "Push artefact removed by subsequent push", user_id | ||
| ) | ||
| if os.path.exists( | ||
| os.path.join( | ||
| upload.project.storage.project_dir, "tmp", old_transaction_id | ||
| ) | ||
| ): | ||
| move_to_tmp( | ||
| os.path.join( | ||
| upload.project.storage.project_dir, "tmp", old_transaction_id | ||
| ), | ||
| old_transaction_id, | ||
| ) | ||
|
|
There was a problem hiding this comment.
create_upload() commits the DB transaction before creating/moving the on-disk upload directories. If os.makedirs() or move_to_tmp() fails, the DB row (and new transaction_id) remains committed but the filesystem state is incomplete, potentially blocking future uploads until it becomes stale. Wrap the filesystem operations in a try/except and, on failure, delete the upload row (or rollback/compensate) so the system can recover immediately.
|
|
||
| def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int): | ||
| """ | ||
| Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type. |
There was a problem hiding this comment.
The _heartbeat_task docstring says it "Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type", but the implementation always starts a threading.Thread. Either adjust the docstring to reflect the actual behavior, or implement a gevent-friendly path (e.g., spawn a greenlet when running under gevent) to avoid misleading future maintainers.
| Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type. | |
| Background heartbeat task intended to run in a background thread. |
MarcelGeo
left a comment
There was a problem hiding this comment.
We could try to run some concurrent push test If this will really help and do not move race conditions to different segments.
| created = db.Column(db.DateTime, default=datetime.utcnow) | ||
| # last ping time to determine if upload is still active | ||
| last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) | ||
| transaction_id = db.Column(db.String, unique=True, nullable=False, index=True) |
| self.project.storage.project_dir, "tmp", self.transaction_id | ||
| ) | ||
|
|
||
| def is_active(self): |
| # let's move uploaded files where they are expected to be | ||
| if to_be_added_files or to_be_updated_files: | ||
| temp_files_dir = os.path.join( | ||
| upload.upload_dir, "files", v_next_version |
There was a problem hiding this comment.
This could not be the same situation as with the previous solution, but in different way:
- somebody will create version with different data in
v{n}folder?
| psycopg2.Error, | ||
| FileNotFoundError, | ||
| IntegrityError, | ||
| ObjectDeletedError, |
There was a problem hiding this comment.
This could be still the case, no? When somebody will remove Upload , another process is trying to use it (insert, upsert) . Or?
This PR contains two fixes / improvements