Skip to content

rework concurrent upload#610

Open
varmar05 wants to merge 4 commits intodevelopfrom
rework_concurrent_upload
Open

rework concurrent upload#610
varmar05 wants to merge 4 commits intodevelopfrom
rework_concurrent_upload

Conversation

@varmar05
Copy link
Copy Markdown
Collaborator

@varmar05 varmar05 commented Apr 14, 2026

This PR contains two fixes / improvements

  • Replace lockfile with upload last_ping db column
  • Rework concurrent upload using upsert strategy

This is a refactoring that replaces the filesystem-based Toucher/lockfile mechanism with a database-based heartbeat approach for tracking active uploads.
This eliminates the NFS-specific hack (os.access() + os.utime()) that was needed to bust NFS attribute caches in the old approach. The new approach is cleaner and more cloud-native.
Replace the old try/except IntegrityError + cleanup loop pattern with an atomic upsert in Upload.create_upload().
Decouple the upload directory name from the DB primary key via transaction_id. Upload directory is created at this stage, no need to take care for it later.

With the upsert logic, the ObjectDeletedError scenario which happened because a concurrent request could delete a stale upload row mid-operation is eliminated:
  - During push_finish, the heartbeat context manager continuously updates last_ping, keeping the upload fresh
  throughout the operation
  - A concurrent request can only take over an upload whose last_ping has expired
  - Since heartbeat prevents expiry, no other request can claim the row while push_finish is running
  - The upload object therefore stays valid for the full lifetime of the request — ObjectDeletedError becomes
  impossible
@varmar05 varmar05 requested a review from MarcelGeo April 14, 2026 11:14
@coveralls
Copy link
Copy Markdown

coveralls commented Apr 14, 2026

Coverage Report for CI Build 24507690430

Coverage decreased (-0.04%) to 93.207%

Details

  • Coverage decreased (-0.04%) from the base build.
  • Patch coverage: 9 uncovered changes across 3 files (156 of 165 lines covered, 94.55%).
  • 7 coverage regressions across 3 files.

Uncovered Changes

File Changed Covered %
server/mergin/sync/public_api_v2_controller.py 24 20 83.33%
server/mergin/sync/models.py 53 50 94.34%
server/mergin/sync/public_api_controller.py 26 24 92.31%

Coverage Regressions

7 previously-covered lines in 3 files lost coverage.

File Lines Losing Coverage Coverage
server/mergin/sync/models.py 5 93.31%
server/mergin/sync/public_api_controller.py 1 92.89%
server/mergin/sync/public_api_v2_controller.py 1 89.78%

Coverage Stats

Coverage Status
Relevant Lines: 9775
Covered Lines: 9111
Line Coverage: 93.21%
Coverage Strength: 0.93 hits per line

💛 - Coveralls

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces filesystem lockfile-based upload coordination with database-driven liveness tracking (upload.last_ping) and introduces a stable external upload identifier (upload.transaction_id), enabling concurrent upload handling via an atomic upsert/takeover strategy.

Changes:

  • Add last_ping and transaction_id columns to upload (with backfills and constraints).
  • Rework upload session creation to use an upsert-based “stale takeover” approach and heartbeat updates instead of lockfiles.
  • Update API/controllers/schemas/tests to use transaction_id in routes and responses.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py Adds upload.last_ping and backfills to support DB-based liveness detection.
server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py Adds upload.transaction_id, backfills from id, enforces NOT NULL + unique index for stable external transaction IDs.
server/mergin/sync/models.py Implements Upload.create_upload() upsert/takeover strategy, upload_dir based on transaction_id, and heartbeat thread updating last_ping.
server/mergin/sync/public_api_controller.py Switches v1 push flow to Upload.create_upload(), replaces lockfile toucher with upload.heartbeat(), and returns/uses transaction_id.
server/mergin/sync/public_api_v2_controller.py Switches v2 create version flow to Upload.create_upload() and uses upload.heartbeat() during heavy work.
server/mergin/sync/permissions.py Replaces get_upload with get_upload_or_fail() lookup by transaction_id.
server/mergin/sync/schemas.py Updates serialized “uploads” lists to expose transaction_id rather than internal id.
server/mergin/sync/utils.py Removes the old lockfile Toucher helper.
server/mergin/sync/public_api.yaml Updates push_finish description to remove lockfile mention.
server/mergin/tests/test_project_controller.py Updates tests to use transaction_id everywhere and adds coverage for stale upload takeover + cleanup.
server/mergin/tests/test_public_api_v2.py Updates concurrency-related test setup to use Upload.create_upload().
server/mergin/tests/test_db_hooks.py Updates upload creation in tests to use Upload.create_upload().

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1838 to +1877
now = datetime.now(timezone.utc)
expiration = current_app.config["LOCKFILE_EXPIRATION"]
new_tx_id = str(uuid.uuid4())

# CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot)
# NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload
existing_cte = (
db.select(Upload.transaction_id)
.where(
Upload.project_id == project_id,
Upload.version == version,
)
.cte("existing")
)

stmt = (
insert(Upload)
.values(
id=str(uuid.uuid4()),
transaction_id=new_tx_id,
project_id=project_id,
version=version,
user_id=user_id,
last_ping=now,
changes=ChangesSchema().dump(changes),
)
.add_cte(existing_cte)
)

upsert_stmt = stmt.on_conflict_do_update(
constraint="uq_upload_project_id",
set_={
"transaction_id": new_tx_id,
"user_id": user_id,
"last_ping": now,
"changes": ChangesSchema().dump(changes),
},
# ONLY update if the existing row is stale
where=(Upload.last_ping < (now - timedelta(seconds=expiration))),
)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last_ping is stored in a db.DateTime column without timezone=True (naive timestamps), but create_upload() binds an aware datetime (datetime.now(timezone.utc)) into last_ping and uses it in the staleness predicate. With PostgreSQL/psycopg this can lead to inconsistent comparisons/casts (or runtime errors) depending on server/session timezone settings. Consider storing UTC as naive consistently (e.g., datetime.utcnow() everywhere + UPDATE ... SET last_ping = timezone('utc', now())) or switch the column/migration to DateTime(timezone=True) and keep all values aware.

Copilot uses AI. Check for mistakes.
Comment on lines +1924 to +1926
return datetime.now(tz=timezone.utc) < self.last_ping.replace(
tzinfo=timezone.utc
) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"])
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_active() currently forces tz-awareness via self.last_ping.replace(tzinfo=timezone.utc). If last_ping is persisted as a naive timestamp (as per the column definition/migration), this can silently misinterpret values if the DB/session timezone is not UTC. Once timestamp handling is made consistent (UTC-naive or timezone-aware), simplify this to compare like-for-like (naive↔naive or aware↔aware) without replace(tzinfo=...).

Suggested change
return datetime.now(tz=timezone.utc) < self.last_ping.replace(
tzinfo=timezone.utc
) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"])
return datetime.utcnow() < self.last_ping + timedelta(
seconds=current_app.config["LOCKFILE_EXPIRATION"]
)

Copilot uses AI. Check for mistakes.
Comment on lines +1886 to +1913
result = db.session.execute(upsert_stmt).fetchone()
db.session.commit()

# if nothing returned, it means the WHERE clause failed (active upload)
if not result:
return

upload = result.Upload
old_transaction_id = result.old_transaction_id
os.makedirs(upload.upload_dir)

# old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload
if old_transaction_id:
upload.project.sync_failed(
"", "push_lost", "Push artefact removed by subsequent push", user_id
)
if os.path.exists(
os.path.join(
upload.project.storage.project_dir, "tmp", old_transaction_id
)
):
move_to_tmp(
os.path.join(
upload.project.storage.project_dir, "tmp", old_transaction_id
),
old_transaction_id,
)

Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_upload() commits the DB transaction before creating/moving the on-disk upload directories. If os.makedirs() or move_to_tmp() fails, the DB row (and new transaction_id) remains committed but the filesystem state is incomplete, potentially blocking future uploads until it becomes stale. Wrap the filesystem operations in a try/except and, on failure, delete the upload row (or rollback/compensate) so the system can recover immediately.

Copilot uses AI. Check for mistakes.

def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int):
"""
Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type.
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _heartbeat_task docstring says it "Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type", but the implementation always starts a threading.Thread. Either adjust the docstring to reflect the actual behavior, or implement a gevent-friendly path (e.g., spawn a greenlet when running under gevent) to avoid misleading future maintainers.

Suggested change
Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type.
Background heartbeat task intended to run in a background thread.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

@MarcelGeo MarcelGeo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could try to run some concurrent push test If this will really help and do not move race conditions to different segments.

created = db.Column(db.DateTime, default=datetime.utcnow)
# last ping time to determine if upload is still active
last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
transaction_id = db.Column(db.String, unique=True, nullable=False, index=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add UUID type

self.project.storage.project_dir, "tmp", self.transaction_id
)

def is_active(self):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check If we need this

# let's move uploaded files where they are expected to be
if to_be_added_files or to_be_updated_files:
temp_files_dir = os.path.join(
upload.upload_dir, "files", v_next_version
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could not be the same situation as with the previous solution, but in different way:

  • somebody will create version with different data in v{n} folder?

psycopg2.Error,
FileNotFoundError,
IntegrityError,
ObjectDeletedError,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be still the case, no? When somebody will remove Upload , another process is trying to use it (insert, upsert) . Or?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants