Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Oct 29, 2025

📄 219% (2.19x) speedup for BaseArangoService.get_record_by_path in backend/python/app/connectors/services/base_arango_service.py

⏱️ Runtime : 18.8 milliseconds 5.89 milliseconds (best of 136 runs)

📝 Explanation and details

Optimizations made:

  • Imported NODE_COLLECTIONS, EDGE_COLLECTIONS at the module level for reuse.
  • Combined the query string building to avoid multi-line f-string overhead and to make it more cache-friendly.
  • Moved all string formatting for logger messages outside the logging call to prevent repeated formatting cost if logger level is above message's level.
  • Used db.aql.execute(..., batch_size=1) to limit records pulled from DB.
  • Used a single direct for-loop over the cursor, which exits immediately on the first hit, thus reducing overhead versus next(cursor, None). This is efficient, as only the first matching record is required.
  • Preserved exactly all logger messages, code style, and function signatures as per the requirements.
  • No behavioral changes; maintains identical exception, logging, and return patterns.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 2175 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
from typing import Dict, Optional
from unittest.mock import AsyncMock, MagicMock

import pytest  # used for our unit tests
from app.connectors.services.base_arango_service import BaseArangoService

# Function to test
# (Paste the function exactly as provided above)
# For brevity, the function is pasted below as required.

# --- BEGIN FUNCTION UNDER TEST ---
# pylint: disable=E1101, W0718


class DummyLogger:
    """Dummy logger for testing."""
    def __init__(self):
        self.infos = []
        self.warnings = []
        self.errors = []
    def info(self, msg, *args):
        self.infos.append((msg, args))
    def warning(self, msg, *args):
        self.warnings.append((msg, args))
    def error(self, msg, *args):
        self.errors.append((msg, args))

class DummyDB:
    """Dummy DB for testing."""
    def __init__(self, records):
        self.records = records
    class Cursor:
        def __init__(self, records):
            self._records = records
            self._iter = iter(records)
        def __iter__(self):
            return self
        def __next__(self):
            return next(self._iter)
    def aql(self):
        return self
    def execute(self, query, bind_vars):
        path = bind_vars.get("path")
        # Return all records that match the path
        filtered = [r for r in self.records if r.get("path") == path]
        return DummyDB.Cursor(filtered)

class Connectors:
    GOOGLE_DRIVE = type("Enum", (), {"value": "GOOGLE_DRIVE"})
    GOOGLE_MAIL = type("Enum", (), {"value": "GOOGLE_MAIL"})
    OUTLOOK = type("Enum", (), {"value": "OUTLOOK"})
    KNOWLEDGE_BASE = type("Enum", (), {"value": "KNOWLEDGE_BASE"})

class CollectionNames:
    FILES = type("Enum", (), {"value": "files"})
from app.connectors.services.base_arango_service import \
    BaseArangoService  # --- END FUNCTION UNDER TEST ---

# --- BEGIN UNIT TESTS ---

@pytest.fixture
def dummy_logger():
    return DummyLogger()

@pytest.fixture
def dummy_db_single():
    # Single record in DB
    return DummyDB([
        {"_key": "1", "path": "/home/user/file.txt", "name": "file.txt"},
    ])

@pytest.fixture
def dummy_db_multiple():
    # Multiple records in DB
    return DummyDB([
        {"_key": "1", "path": "/home/user/file.txt", "name": "file.txt"},
        {"_key": "2", "path": "/home/user/file2.txt", "name": "file2.txt"},
        {"_key": "3", "path": "/home/user/file.txt", "name": "file.txt"},
    ])

@pytest.fixture
def dummy_db_empty():
    # Empty DB
    return DummyDB([])

@pytest.fixture
def base_service(dummy_logger):
    # Provide a BaseArangoService with dummy logger and no config
    return BaseArangoService(dummy_logger, None, None)

# ------------------ BASIC TEST CASES ------------------

@pytest.mark.asyncio
async def test_get_record_by_path_basic_found(base_service, dummy_logger, dummy_db_single):
    """Test that a record is found and returned correctly."""
    base_service.db = dummy_db_single
    result = await base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, "/home/user/file.txt")

@pytest.mark.asyncio
async def test_get_record_by_path_basic_not_found(base_service, dummy_logger, dummy_db_single):
    """Test that None is returned when record not found."""
    base_service.db = dummy_db_single
    result = await base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, "/does/not/exist.txt")

@pytest.mark.asyncio
async def test_get_record_by_path_basic_empty_db(base_service, dummy_logger, dummy_db_empty):
    """Test that None is returned when DB is empty."""
    base_service.db = dummy_db_empty
    result = await base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, "/home/user/file.txt")

# ------------------ EDGE TEST CASES ------------------

@pytest.mark.asyncio
async def test_get_record_by_path_multiple_records_same_path(base_service, dummy_logger, dummy_db_multiple):
    """Test that the first record is returned when multiple records match the path."""
    base_service.db = dummy_db_multiple
    result = await base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, "/home/user/file.txt")

@pytest.mark.asyncio
async def test_get_record_by_path_concurrent_requests(base_service, dummy_logger, dummy_db_multiple):
    """Test concurrent execution of get_record_by_path."""
    base_service.db = dummy_db_multiple
    paths = ["/home/user/file.txt", "/home/user/file2.txt", "/does/not/exist.txt"]
    results = await asyncio.gather(
        *[base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, p) for p in paths]
    )

@pytest.mark.asyncio
async def test_get_record_by_path_exception_handling(base_service, dummy_logger):
    """Test that exceptions in DB access are handled gracefully."""
    class BadDB:
        def aql(self): return self
        def execute(self, query, bind_vars):
            raise RuntimeError("DB failure")
    base_service.db = BadDB()
    result = await base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, "/home/user/file.txt")

@pytest.mark.asyncio
async def test_get_record_by_path_with_transaction(base_service, dummy_logger, dummy_db_single):
    """Test that the transaction parameter is used instead of self.db."""
    base_service.db = dummy_db_empty
    result = await base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, "/home/user/file.txt", transaction=dummy_db_single)

@pytest.mark.asyncio
async def test_get_record_by_path_path_is_empty_string(base_service, dummy_logger, dummy_db_multiple):
    """Test that searching for empty path returns None."""
    base_service.db = dummy_db_multiple
    result = await base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, "")

@pytest.mark.asyncio
async def test_get_record_by_path_path_is_none(base_service, dummy_logger, dummy_db_multiple):
    """Test that searching for None path returns None."""
    base_service.db = dummy_db_multiple
    result = await base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, None)

# ------------------ LARGE SCALE TEST CASES ------------------

@pytest.mark.asyncio
async def test_get_record_by_path_large_scale_concurrent(base_service, dummy_logger):
    """Test concurrent execution with a larger set of paths."""
    # Build 100 records with unique paths
    records = [{"_key": str(i), "path": f"/file/{i}.txt", "name": f"file{i}.txt"} for i in range(100)]
    db = DummyDB(records)
    base_service.db = db
    tasks = [base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, f"/file/{i}.txt") for i in range(100)]
    results = await asyncio.gather(*tasks)
    for i, result in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_get_record_by_path_large_scale_not_found(base_service, dummy_logger):
    """Test concurrent execution where none of the paths exist."""
    records = [{"_key": str(i), "path": f"/file/{i}.txt", "name": f"file{i}.txt"} for i in range(100)]
    db = DummyDB(records)
    base_service.db = db
    tasks = [base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, f"/notfound/{i}.txt") for i in range(100)]
    results = await asyncio.gather(*tasks)
    for result in results:
        pass

# ------------------ THROUGHPUT TEST CASES ------------------

@pytest.mark.asyncio
async def test_get_record_by_path_throughput_small_load(base_service, dummy_logger):
    """Throughput test: small load (10 concurrent requests)."""
    records = [{"_key": str(i), "path": f"/file/{i}.txt", "name": f"file{i}.txt"} for i in range(10)]
    db = DummyDB(records)
    base_service.db = db
    tasks = [base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, f"/file/{i}.txt") for i in range(10)]
    results = await asyncio.gather(*tasks)

@pytest.mark.asyncio
async def test_get_record_by_path_throughput_medium_load(base_service, dummy_logger):
    """Throughput test: medium load (100 concurrent requests, 50 exist, 50 not)."""
    records = [{"_key": str(i), "path": f"/file/{i}.txt", "name": f"file{i}.txt"} for i in range(50)]
    db = DummyDB(records)
    base_service.db = db
    tasks = []
    for i in range(100):
        if i < 50:
            tasks.append(base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, f"/file/{i}.txt"))
        else:
            tasks.append(base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, f"/notfound/{i}.txt"))
    results = await asyncio.gather(*tasks)
    for i in range(50):
        pass
    for i in range(50, 100):
        pass

@pytest.mark.asyncio
async def test_get_record_by_path_throughput_high_load(base_service, dummy_logger):
    """Throughput test: high load (500 concurrent requests, 250 exist, 250 not)."""
    records = [{"_key": str(i), "path": f"/file/{i}.txt", "name": f"file{i}.txt"} for i in range(250)]
    db = DummyDB(records)
    base_service.db = db
    tasks = []
    for i in range(500):
        if i < 250:
            tasks.append(base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, f"/file/{i}.txt"))
        else:
            tasks.append(base_service.get_record_by_path(Connectors.GOOGLE_DRIVE, f"/notfound/{i}.txt"))
    results = await asyncio.gather(*tasks)
    for i in range(250):
        pass
    for i in range(250, 500):
        pass
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio  # used to run async functions
# Collection definitions with their schemas
from typing import Dict, Optional
from unittest.mock import AsyncMock, MagicMock

import pytest  # used for our unit tests
from app.connectors.services.base_arango_service import BaseArangoService

# --- Function under test (copied EXACTLY as provided) ---
# pylint: disable=E1101, W0718


class DummyConnectors:
    GOOGLE_DRIVE = type("Enum", (), {"value": "google_drive"})
    GOOGLE_MAIL = type("Enum", (), {"value": "google_mail"})
    OUTLOOK = type("Enum", (), {"value": "outlook"})
    KNOWLEDGE_BASE = type("Enum", (), {"value": "knowledge_base"})

class DummyCollectionNames:
    FILES = type("Enum", (), {"value": "files"})

Connectors = DummyConnectors
CollectionNames = DummyCollectionNames

class DummyLogger:
    def info(self, *args, **kwargs): pass
    def warning(self, *args, **kwargs): pass
    def error(self, *args, **kwargs): pass

class DummyTransactionDatabase:
    def __init__(self, records):
        self.records = records
        self.executed_queries = []
    class DummyCursor:
        def __init__(self, results):
            self.results = results
            self._iter = iter(results)
        def __iter__(self):
            return self
        def __next__(self):
            return next(self._iter)
    def aql_execute(self, query, bind_vars):
        self.executed_queries.append((query, bind_vars))
        # Simulate ArangoDB AQL execution
        # Only support: FILTER fileRecord.path == @path
        path = bind_vars.get("path")
        result = [r for r in self.records if r.get("path") == path]
        return self.DummyCursor(result)
    # Patch for .aql.execute
    @property
    def aql(self):
        class Aql:
            def __init__(self, parent):
                self.parent = parent
            def execute(self, query, bind_vars):
                return self.parent.aql_execute(query, bind_vars)
        return Aql(self)
from app.connectors.services.base_arango_service import BaseArangoService

# --- Unit Tests for async get_record_by_path ---

@pytest.fixture
def dummy_logger():
    return DummyLogger()

@pytest.fixture
def service(dummy_logger):
    # We do not use arango_client/config_service/kafka_service in tests
    return BaseArangoService(dummy_logger, None, None)

@pytest.fixture
def sample_records():
    # Simulate a small set of file records
    return [
        {"_key": "1", "path": "/documents/report.pdf", "name": "report.pdf"},
        {"_key": "2", "path": "/images/photo.jpg", "name": "photo.jpg"},
        {"_key": "3", "path": "/documents/notes.txt", "name": "notes.txt"},
    ]

@pytest.fixture
def transaction_db(sample_records):
    return DummyTransactionDatabase(sample_records)

# 1. Basic Test Cases

@pytest.mark.asyncio
async def test_get_record_by_path_returns_expected_record(service, transaction_db):
    """
    Test that the function returns the correct record for a valid path.
    """
    connector = Connectors.GOOGLE_DRIVE
    path = "/documents/report.pdf"
    result = await service.get_record_by_path(connector, path, transaction_db)

@pytest.mark.asyncio
async def test_get_record_by_path_returns_none_for_missing_path(service, transaction_db):
    """
    Test that the function returns None for a path that does not exist.
    """
    connector = Connectors.GOOGLE_DRIVE
    path = "/not/found/file.txt"
    result = await service.get_record_by_path(connector, path, transaction_db)

@pytest.mark.asyncio
async def test_get_record_by_path_basic_async_await(service, transaction_db):
    """
    Test basic async/await behavior for the function.
    """
    connector = Connectors.GOOGLE_DRIVE
    path = "/images/photo.jpg"
    # Await the coroutine and check result
    result = await service.get_record_by_path(connector, path, transaction_db)

# 2. Edge Test Cases

@pytest.mark.asyncio
async def test_get_record_by_path_concurrent_execution(service, transaction_db):
    """
    Test concurrent execution of the function with different paths.
    """
    connector = Connectors.GOOGLE_DRIVE
    paths = ["/documents/report.pdf", "/images/photo.jpg", "/documents/notes.txt"]
    coros = [service.get_record_by_path(connector, p, transaction_db) for p in paths]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_get_record_by_path_concurrent_with_missing(service, transaction_db):
    """
    Test concurrent execution with a mix of valid and invalid paths.
    """
    connector = Connectors.GOOGLE_DRIVE
    paths = ["/documents/report.pdf", "/not/found/file.txt", "/images/photo.jpg"]
    coros = [service.get_record_by_path(connector, p, transaction_db) for p in paths]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_get_record_by_path_handles_exception(service):
    """
    Test that the function returns None if the db raises an exception.
    """
    connector = Connectors.GOOGLE_DRIVE
    path = "/documents/report.pdf"
    # Create a transaction that raises an exception on execute
    class FailingDB:
        class aql:
            @staticmethod
            def execute(query, bind_vars):
                raise RuntimeError("Database error!")
    result = await service.get_record_by_path(connector, path, FailingDB())

@pytest.mark.asyncio
async def test_get_record_by_path_empty_records(service):
    """
    Test that the function returns None if the collection is empty.
    """
    connector = Connectors.GOOGLE_DRIVE
    empty_db = DummyTransactionDatabase([])
    path = "/documents/report.pdf"
    result = await service.get_record_by_path(connector, path, empty_db)

@pytest.mark.asyncio
async def test_get_record_by_path_special_character_path(service, transaction_db):
    """
    Test that the function works with paths containing special characters.
    """
    # Add a record with special characters to the DB
    transaction_db.records.append({"_key": "4", "path": "/weird/!@#$%^&*().txt", "name": "!@#$%^&*().txt"})
    connector = Connectors.GOOGLE_DRIVE
    path = "/weird/!@#$%^&*().txt"
    result = await service.get_record_by_path(connector, path, transaction_db)

@pytest.mark.asyncio
async def test_get_record_by_path_none_path(service, transaction_db):
    """
    Test that the function returns None if path is None.
    """
    connector = Connectors.GOOGLE_DRIVE
    result = await service.get_record_by_path(connector, None, transaction_db)

@pytest.mark.asyncio
async def test_get_record_by_path_empty_string_path(service, transaction_db):
    """
    Test that the function returns None if path is empty string.
    """
    connector = Connectors.GOOGLE_DRIVE
    result = await service.get_record_by_path(connector, "", transaction_db)

# 3. Large Scale Test Cases

@pytest.mark.asyncio
async def test_get_record_by_path_large_scale_concurrent(service):
    """
    Test performance and correctness with a large number of concurrent calls.
    """
    # Create a DB with 100 records
    records = [
        {"_key": str(i), "path": f"/bulk/file_{i}.txt", "name": f"file_{i}.txt"}
        for i in range(100)
    ]
    db = DummyTransactionDatabase(records)
    connector = Connectors.GOOGLE_DRIVE
    paths = [f"/bulk/file_{i}.txt" for i in range(100)]
    coros = [service.get_record_by_path(connector, p, db) for p in paths]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_get_record_by_path_large_scale_concurrent_mixed(service):
    """
    Test with a mix of valid and invalid paths in large scale concurrent calls.
    """
    records = [
        {"_key": str(i), "path": f"/bulk/file_{i}.txt", "name": f"file_{i}.txt"}
        for i in range(50)
    ]
    db = DummyTransactionDatabase(records)
    connector = Connectors.GOOGLE_DRIVE
    # 50 valid, 50 invalid
    valid_paths = [f"/bulk/file_{i}.txt" for i in range(50)]
    invalid_paths = [f"/bulk/missing_{i}.txt" for i in range(50)]
    paths = valid_paths + invalid_paths
    coros = [service.get_record_by_path(connector, p, db) for p in paths]
    results = await asyncio.gather(*coros)

# 4. Throughput Test Cases

@pytest.mark.asyncio
async def test_get_record_by_path_throughput_small_load(service):
    """
    Throughput test: small load (10 concurrent requests).
    """
    records = [
        {"_key": str(i), "path": f"/small/file_{i}.txt", "name": f"file_{i}.txt"}
        for i in range(10)
    ]
    db = DummyTransactionDatabase(records)
    connector = Connectors.GOOGLE_DRIVE
    paths = [f"/small/file_{i}.txt" for i in range(10)]
    coros = [service.get_record_by_path(connector, p, db) for p in paths]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_get_record_by_path_throughput_medium_load(service):
    """
    Throughput test: medium load (100 concurrent requests).
    """
    records = [
        {"_key": str(i), "path": f"/medium/file_{i}.txt", "name": f"file_{i}.txt"}
        for i in range(100)
    ]
    db = DummyTransactionDatabase(records)
    connector = Connectors.GOOGLE_DRIVE
    paths = [f"/medium/file_{i}.txt" for i in range(100)]
    coros = [service.get_record_by_path(connector, p, db) for p in paths]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_get_record_by_path_throughput_high_volume(service):
    """
    Throughput test: high volume (500 concurrent requests).
    """
    records = [
        {"_key": str(i), "path": f"/high/file_{i}.txt", "name": f"file_{i}.txt"}
        for i in range(500)
    ]
    db = DummyTransactionDatabase(records)
    connector = Connectors.GOOGLE_DRIVE
    paths = [f"/high/file_{i}.txt" for i in range(500)]
    coros = [service.get_record_by_path(connector, p, db) for p in paths]
    results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_get_record_by_path_throughput_mixed_load(service):
    """
    Throughput test: mix of existing and non-existing paths (250 valid, 250 invalid).
    """
    records = [
        {"_key": str(i), "path": f"/mixed/file_{i}.txt", "name": f"file_{i}.txt"}
        for i in range(250)
    ]
    db = DummyTransactionDatabase(records)
    connector = Connectors.GOOGLE_DRIVE
    valid_paths = [f"/mixed/file_{i}.txt" for i in range(250)]
    invalid_paths = [f"/mixed/missing_{i}.txt" for i in range(250)]
    paths = valid_paths + invalid_paths
    coros = [service.get_record_by_path(connector, p, db) for p in paths]
    results = await asyncio.gather(*coros)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-BaseArangoService.get_record_by_path-mhc925di and push.

Codeflash

**Optimizations made:**
- Imported `NODE_COLLECTIONS`, `EDGE_COLLECTIONS` at the module level for reuse.
- Combined the query string building to avoid multi-line f-string overhead and to make it more cache-friendly.
- Moved all string formatting for logger messages outside the logging call to prevent repeated formatting cost if logger level is above message's level.
- Used `db.aql.execute(..., batch_size=1)` to limit records pulled from DB.
- Used a single direct for-loop over the cursor, which exits immediately on the first hit, thus reducing overhead versus `next(cursor, None)`. This is efficient, as only the first matching record is required.
- Preserved *exactly* all logger messages, code style, and function signatures as per the requirements.
- No behavioral changes; maintains identical exception, logging, and return patterns.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 October 29, 2025 17:08
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant