Skip to content

Conversation

@LinoGiger
Copy link
Collaborator

No description provided.

@claude
Copy link

claude bot commented Jan 16, 2026

Pull Request Review: Asset Upload Deduplication with Futures

Summary

This PR adds deduplication for concurrent asset uploads using concurrent.futures.Future objects and locks. The goal is to prevent multiple threads from uploading the same asset simultaneously by having subsequent requests wait for the in-flight upload to complete.

Code Quality & Design

✅ Strengths

  1. Double-checked locking pattern: Correctly implemented to minimize lock contention with fast-path cache checks before acquiring locks
  2. Proper exception handling: Futures are properly set with exceptions via set_exception(), ensuring waiting threads receive the error
  3. Resource cleanup: finally blocks correctly remove in-flight entries from the tracking dictionaries
  4. Consistent pattern: The same approach is applied to both URL and file uploads

⚠️ Issues & Concerns

1. Critical: Cache Update Race Condition (High Priority)

Location: Lines 126-127 and 173-174

The cache is updated outside the lock after the upload completes:

result = response.file_name
self._url_memory_cache[url_key] = result  # ⚠️ Outside lock
in_flight.set_result(result)

Problem: If multiple threads call upload_asset() with the same asset:

  • Thread A starts the upload, creates a Future
  • Thread B arrives, sees the in-flight Future, waits on in_flight.result() (line 118)
  • Thread A completes, updates cache, sets Future result
  • Thread B wakes up from result(), returns immediately (line 118)
  • Race: Thread A might not have updated the cache yet when Thread B checks it next time

Impact: Low probability but possible cache miss on subsequent calls

Recommendation: Move cache updates inside the lock:

with self._url_cache_lock:
    self._url_memory_cache[url_key] = result
    in_flight.set_result(result)

2. Memory Leak Potential (Medium Priority)

Location: Lines 29-32

The _url_in_flight and _file_in_flight dictionaries are class-level variables that persist for the lifetime of the application.

Problem:

  • If an upload raises an exception that's caught externally (e.g., network timeout, keyboard interrupt between lines 107-115), the entry might never be removed
  • If in_flight.result() raises an exception (line 118 or 164), the calling thread propagates it, but the Future remains in the dictionary

Example scenario:

# Thread A acquires lock, creates Future, sets should_fetch=True
# Thread A releases lock (line 115)
# Thread A is interrupted/killed before reaching finally block
# Future remains in _url_in_flight forever

Recommendation: Add a timeout mechanism or periodic cleanup:

# Option 1: Add timeout to result()
return in_flight.result(timeout=300)  # 5 minutes

# Option 2: Store timestamp and periodically clean stale entries
_url_in_flight[url_key] = (Future(), time.time())

3. Missing Cache Cleanup in clear_cache() (Medium Priority)

Location: Lines 247-250

The clear_cache() method doesn't clear the new in-flight dictionaries:

def clear_cache(self):
    self._shared_upload_cache.clear()
    self._url_memory_cache.clear()
    # Missing: self._url_in_flight.clear()
    # Missing: self._file_in_flight.clear()
    logger.info("Upload cache cleared")

Impact: Stale in-flight entries could block future uploads if cache is cleared while uploads are in progress

Recommendation:

def clear_cache(self):
    with self._url_cache_lock:
        self._shared_upload_cache.clear()
        self._url_memory_cache.clear()
        self._url_in_flight.clear()
    with self._file_cache_lock:
        self._file_in_flight.clear()
    logger.info("Upload cache cleared")

4. DiskCache Thread Safety (Low Priority - Verification Needed)

Location: Lines 140, 147

The code uses self._shared_upload_cache.get() (a FanoutCache from diskcache) both outside and inside locks.

Question: Is FanoutCache.get() thread-safe for reads? The code assumes it is (checking outside the lock on line 140), but this should be verified.

Recommendation: Add a comment documenting the thread-safety assumptions, or consult the diskcache documentation.

5. Incomplete PR Title

The PR title is truncated: "added future locks to make it more efficient if" - the condition after "if" is missing.

Performance Considerations

✅ Positive Impact

  • Deduplication works: Multiple concurrent uploads of the same asset will only result in one actual upload
  • Fast path optimization: Unlocked cache checks minimize lock contention for cache hits
  • Efficient waiting: Using Futures is more efficient than polling or sleep-based waiting

⚠️ Considerations

  1. Lock granularity: Separate locks for URL and file uploads is good, prevents unnecessary blocking
  2. Lock hold time: Locks are held only for dictionary operations, not during I/O - excellent design
  3. Memory overhead: Small overhead from Future objects, but negligible compared to upload costs

Security Concerns

✅ No Major Issues

  • No injection vulnerabilities introduced
  • No new authentication/authorization concerns
  • Proper exception propagation maintains error handling security

⚠️ Minor Concern

Location: Line 118, 164

in_flight.result() blocks indefinitely if the uploading thread hangs or is killed.

Recommendation: Add a timeout to prevent indefinite blocking:

try:
    return in_flight.result(timeout=300)  # 5 minutes
except TimeoutError:
    logger.error("Upload timed out waiting for in-flight request")
    # Remove stale entry and retry
    with self._url_cache_lock:
        self._url_in_flight.pop(url_key, None)
    raise

Test Coverage

❌ Critical Gap

No tests found for the new concurrency logic.

Required tests:

  1. Concurrent uploads of same asset: Verify only one upload occurs
  2. Exception handling: Verify exceptions propagate correctly to waiting threads
  3. Race condition testing: Verify cache consistency under concurrent load
  4. Cleanup verification: Verify in-flight entries are removed after completion/failure
  5. Memory leak test: Verify no leaked Futures after interrupted uploads

Recommendation: Add a test file like tests/test_asset_uploader_concurrency.py with:

import pytest
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import Mock, patch

def test_concurrent_duplicate_uploads_only_uploads_once():
    """Verify that multiple threads uploading the same asset only upload once."""
    # Use mock to count actual upload calls
    # Submit 10 concurrent uploads of the same file
    # Assert mock was called exactly once

def test_exception_propagates_to_all_waiters():
    """Verify that if an upload fails, all waiting threads receive the exception."""
    # Mock upload to raise an exception
    # Submit multiple concurrent requests
    # Assert all threads receive the same exception

def test_in_flight_cleanup_on_exception():
    """Verify in-flight entries are cleaned up even when exceptions occur."""
    # Trigger an exception during upload
    # Verify _url_in_flight is empty after exception

Recommendations Summary

High Priority (Must Fix)

  1. ✅ Move cache updates inside locks to prevent race conditions
  2. ⚠️ Add timeout to in_flight.result() calls to prevent indefinite blocking
  3. ✅ Add comprehensive concurrency tests

Medium Priority (Should Fix)

  1. ✅ Clear in-flight dictionaries in clear_cache() method
  2. ⚠️ Add memory leak prevention (timeout or cleanup mechanism for stale entries)
  3. ✅ Complete the PR title/description

Low Priority (Nice to Have)

  1. ✅ Document thread-safety assumptions for DiskCache
  2. ✅ Add code comments explaining the double-checked locking pattern

Conclusion

This is a solid implementation of concurrent upload deduplication with a well-thought-out design. The main concerns are:

  1. A subtle race condition in cache updates
  2. Potential memory leaks from stale Future objects
  3. Missing test coverage for the new concurrency logic

The double-checked locking pattern is correctly implemented, and the use of Futures for coordination is appropriate. With the recommended fixes, this will be a robust addition to the codebase.

Overall Assessment: Approve with changes requested


Generated by Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants