-
Notifications
You must be signed in to change notification settings - Fork 68
rework concurrent upload #610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,12 +2,14 @@ | |||||||||||||
| # | ||||||||||||||
| # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial | ||||||||||||||
| from __future__ import annotations | ||||||||||||||
| from contextlib import contextmanager | ||||||||||||||
| import json | ||||||||||||||
| import logging | ||||||||||||||
| import os | ||||||||||||||
| import threading | ||||||||||||||
| import time | ||||||||||||||
| import uuid | ||||||||||||||
| from datetime import datetime, timedelta | ||||||||||||||
| from datetime import datetime, timedelta, timezone | ||||||||||||||
| from enum import Enum | ||||||||||||||
| from typing import Optional, List, Dict, Set, Tuple | ||||||||||||||
| from dataclasses import dataclass, asdict | ||||||||||||||
|
|
@@ -17,11 +19,11 @@ | |||||||||||||
| from flask_login import current_user | ||||||||||||||
| from pygeodiff import GeoDiff | ||||||||||||||
| from sqlalchemy import text, null, desc, nullslast, tuple_ | ||||||||||||||
| from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM | ||||||||||||||
| from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert | ||||||||||||||
| from sqlalchemy.types import String | ||||||||||||||
| from sqlalchemy.ext.hybrid import hybrid_property | ||||||||||||||
| from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError | ||||||||||||||
| from flask import current_app | ||||||||||||||
| from flask import Flask, current_app | ||||||||||||||
|
|
||||||||||||||
| from .files import ( | ||||||||||||||
| DeltaChangeMerged, | ||||||||||||||
|
|
@@ -44,7 +46,6 @@ | |||||||||||||
| LOG_BASE, | ||||||||||||||
| Checkpoint, | ||||||||||||||
| generate_checksum, | ||||||||||||||
| Toucher, | ||||||||||||||
| get_chunk_location, | ||||||||||||||
| get_project_path, | ||||||||||||||
| is_supported_type, | ||||||||||||||
|
|
@@ -1805,6 +1806,9 @@ class Upload(db.Model): | |||||||||||||
| db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True | ||||||||||||||
| ) | ||||||||||||||
| created = db.Column(db.DateTime, default=datetime.utcnow) | ||||||||||||||
| # last ping time to determine if upload is still active | ||||||||||||||
| last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) | ||||||||||||||
| transaction_id = db.Column(db.String, unique=True, nullable=False, index=True) | ||||||||||||||
|
|
||||||||||||||
| user = db.relationship("User") | ||||||||||||||
| project = db.relationship( | ||||||||||||||
|
|
@@ -1822,28 +1826,166 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int): | |||||||||||||
| self.version = version | ||||||||||||||
| self.changes = ChangesSchema().dump(changes) | ||||||||||||||
| self.user_id = user_id | ||||||||||||||
| self.transaction_id = str(uuid.uuid4()) | ||||||||||||||
|
|
||||||||||||||
| @property | ||||||||||||||
| def upload_dir(self): | ||||||||||||||
| return os.path.join(self.project.storage.project_dir, "tmp", self.id) | ||||||||||||||
| @classmethod | ||||||||||||||
| def create_upload( | ||||||||||||||
| cls, project_id: str, version: int, changes: dict, user_id: int | ||||||||||||||
| ) -> Upload | None: | ||||||||||||||
| """Create upload session, it can either create a new record or handover an existing one but with new transaction id | ||||||||||||||
| Old transaction folder is removed and new one is created. | ||||||||||||||
| """ | ||||||||||||||
| now = datetime.now(timezone.utc) | ||||||||||||||
| expiration = current_app.config["LOCKFILE_EXPIRATION"] | ||||||||||||||
| new_tx_id = str(uuid.uuid4()) | ||||||||||||||
|
|
||||||||||||||
| # CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot) | ||||||||||||||
| # NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload | ||||||||||||||
| existing_cte = ( | ||||||||||||||
| db.select(Upload.transaction_id) | ||||||||||||||
| .where( | ||||||||||||||
| Upload.project_id == project_id, | ||||||||||||||
| Upload.version == version, | ||||||||||||||
| ) | ||||||||||||||
| .cte("existing") | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| stmt = ( | ||||||||||||||
| insert(Upload) | ||||||||||||||
| .values( | ||||||||||||||
| id=str(uuid.uuid4()), | ||||||||||||||
| transaction_id=new_tx_id, | ||||||||||||||
| project_id=project_id, | ||||||||||||||
| version=version, | ||||||||||||||
| user_id=user_id, | ||||||||||||||
| last_ping=now, | ||||||||||||||
| changes=ChangesSchema().dump(changes), | ||||||||||||||
| ) | ||||||||||||||
| .add_cte(existing_cte) | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| upsert_stmt = stmt.on_conflict_do_update( | ||||||||||||||
| constraint="uq_upload_project_id", | ||||||||||||||
| set_={ | ||||||||||||||
| "transaction_id": new_tx_id, | ||||||||||||||
| "user_id": user_id, | ||||||||||||||
| "last_ping": now, | ||||||||||||||
| "changes": ChangesSchema().dump(changes), | ||||||||||||||
| }, | ||||||||||||||
| # ONLY update if the existing row is stale | ||||||||||||||
| where=(Upload.last_ping < (now - timedelta(seconds=expiration))), | ||||||||||||||
| ) | ||||||||||||||
|
Comment on lines
+1838
to
+1877
|
||||||||||||||
|
|
||||||||||||||
| upsert_stmt = upsert_stmt.returning( | ||||||||||||||
| Upload, | ||||||||||||||
| db.select(existing_cte.c.transaction_id) | ||||||||||||||
| .scalar_subquery() | ||||||||||||||
| .label("old_transaction_id"), | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| result = db.session.execute(upsert_stmt).fetchone() | ||||||||||||||
| db.session.commit() | ||||||||||||||
|
|
||||||||||||||
| # if nothing returned, it means the WHERE clause failed (active upload) | ||||||||||||||
| if not result: | ||||||||||||||
| return | ||||||||||||||
|
|
||||||||||||||
| upload = result.Upload | ||||||||||||||
| old_transaction_id = result.old_transaction_id | ||||||||||||||
| os.makedirs(upload.upload_dir) | ||||||||||||||
|
|
||||||||||||||
| # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload | ||||||||||||||
| if old_transaction_id: | ||||||||||||||
| upload.project.sync_failed( | ||||||||||||||
| "", "push_lost", "Push artefact removed by subsequent push", user_id | ||||||||||||||
| ) | ||||||||||||||
| if os.path.exists( | ||||||||||||||
| os.path.join( | ||||||||||||||
| upload.project.storage.project_dir, "tmp", old_transaction_id | ||||||||||||||
| ) | ||||||||||||||
| ): | ||||||||||||||
| move_to_tmp( | ||||||||||||||
| os.path.join( | ||||||||||||||
| upload.project.storage.project_dir, "tmp", old_transaction_id | ||||||||||||||
| ), | ||||||||||||||
| old_transaction_id, | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
|
Comment on lines
+1886
to
+1913
|
||||||||||||||
| return upload | ||||||||||||||
|
|
||||||||||||||
| @property | ||||||||||||||
| def lockfile(self): | ||||||||||||||
| return os.path.join(self.upload_dir, "lockfile") | ||||||||||||||
| def upload_dir(self): | ||||||||||||||
| return os.path.join( | ||||||||||||||
| self.project.storage.project_dir, "tmp", self.transaction_id | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| def is_active(self): | ||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check If we need this |
||||||||||||||
| """Check if upload is still active because there was a ping (lockfile update) from underlying process""" | ||||||||||||||
| return os.path.exists(self.lockfile) and ( | ||||||||||||||
| time.time() - os.path.getmtime(self.lockfile) | ||||||||||||||
| < current_app.config["LOCKFILE_EXPIRATION"] | ||||||||||||||
| """Check if upload is still active because there was a ping from underlying process""" | ||||||||||||||
| return datetime.now(tz=timezone.utc) < self.last_ping.replace( | ||||||||||||||
| tzinfo=timezone.utc | ||||||||||||||
| ) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"]) | ||||||||||||||
|
Comment on lines
+1924
to
+1926
|
||||||||||||||
| return datetime.now(tz=timezone.utc) < self.last_ping.replace( | |
| tzinfo=timezone.utc | |
| ) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"]) | |
| return datetime.utcnow() < self.last_ping + timedelta( | |
| seconds=current_app.config["LOCKFILE_EXPIRATION"] | |
| ) |
Copilot
AI
Apr 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _heartbeat_task docstring says it "Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type", but the implementation always starts a threading.Thread. Either adjust the docstring to reflect the actual behavior, or implement a gevent-friendly path (e.g., spawn a greenlet when running under gevent) to avoid misleading future maintainers.
| Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type. | |
| Background heartbeat task intended to run in a background thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add UUID type