Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Oct 29, 2025

📄 11% (0.11x) speedup for BookStackDataSource.destroy_recycle_bin_item in backend/python/app/sources/external/bookstack/bookstack.py

⏱️ Runtime : 2.59 milliseconds 2.34 milliseconds (best of 230 runs)

📝 Explanation and details

The optimization achieves a 10% runtime improvement through two key micro-optimizations that reduce object allocation overhead:

1. Conditional Header Merging in HTTPClient

  • Original: Always creates a new merged dictionary with {**self.headers, **request.headers} regardless of whether request headers exist
  • Optimized: Only creates a merged dictionary when request.headers is non-empty, otherwise directly uses self.headers
  • Impact: Eliminates unnecessary dictionary creation and copying when requests have no custom headers (common case)

2. Direct Header Reference in BookStackDataSource

  • Original: headers = dict(self.http.headers) creates a defensive copy of headers
  • Optimized: headers = self.http.headers uses direct reference since no mutation occurs
  • Impact: Eliminates dict copying overhead per request

3. F-string URL Construction

  • Original: url = self.base_url + "/api/recycle-bin/{deletion_id}".format(deletion_id=deletion_id)
  • Optimized: url = f"{self.base_url}/api/recycle-bin/{deletion_id}"
  • Impact: Reduces string manipulation overhead by using more efficient f-string interpolation

The line profiler shows the URL construction improvement (461μs → 268μs, 42% faster) and header handling optimization (230μs → 167μs, 27% faster). These optimizations are particularly effective for high-throughput scenarios where the function is called repeatedly, as evidenced by the throughput improvement from 157,776 to 159,160 operations/second (0.9% increase). The optimizations work best for typical API usage patterns where requests have minimal custom headers and simple URL templating.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 730 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 88.9%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
# --- Function under test (EXACT COPY) ---
# Import necessary classes for mocking and testing
from typing import Any, Dict, Union

import pytest  # used for our unit tests
from app.sources.external.bookstack.bookstack import BookStackDataSource

# --- Minimal stub implementations for HTTPRequest, HTTPResponse, BookStackResponse, BookStackRESTClientViaToken, BookStackClient ---

class HTTPRequest:
    def __init__(self, method, url, headers, query_params, body):
        self.method = method
        self.url = url
        self.headers = headers
        self.query_params = query_params
        self.body = body
        self.path_params = {}  # Not used in destroy_recycle_bin_item

class BookStackResponse:
    def __init__(self, success: bool, data: Any = None, error: str = None):
        self.success = success
        self.data = data
        self.error = error

class BookStackRESTClientViaToken:
    def __init__(self, base_url: str, token_id: str, token_secret: str):
        self.base_url = base_url.rstrip('/')
        self.token_id = token_id
        self.token_secret = token_secret
        self.headers = {
            "Authorization": f"Token {token_id}:{token_secret}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }

    def get_base_url(self):
        return self.base_url

    async def execute(self, request: HTTPRequest, **kwargs):
        # Simulate different behaviors based on deletion_id in URL
        if "/api/recycle-bin/" in request.url:
            try:
                deletion_id = int(request.url.split("/api/recycle-bin/")[1])
            except Exception:
                raise ValueError("Invalid deletion_id in URL")
            # Simulate not found
            if deletion_id == 404:
                raise Exception("Item not found")
            # Simulate forbidden
            if deletion_id == 403:
                raise Exception("Permission denied")
            # Simulate server error
            if deletion_id == 500:
                raise Exception("Internal server error")
            # Simulate success
            return HTTPResponse({"id": deletion_id, "deleted": True})
        raise Exception("Unknown endpoint")

class BookStackClient:
    def __init__(self, client: BookStackRESTClientViaToken):
        self.client = client

    def get_client(self):
        return self.client
from app.sources.external.bookstack.bookstack import BookStackDataSource


# --- Fixtures for tests ---
@pytest.fixture
def valid_client():
    # Provide a valid BookStackClient with stubbed REST client
    return BookStackClient(BookStackRESTClientViaToken("http://localhost", "tokenid", "tokensecret"))

@pytest.fixture
def datasource(valid_client):
    # Provide a BookStackDataSource using the valid client
    return BookStackDataSource(valid_client)

# --- Basic Test Cases ---

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_basic_success(datasource):
    """Test basic successful deletion of a recycle bin item."""
    deletion_id = 123
    resp = await datasource.destroy_recycle_bin_item(deletion_id)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_basic_return_type(datasource):
    """Test that the function returns a BookStackResponse object."""
    resp = await datasource.destroy_recycle_bin_item(1)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_basic_async_behavior(datasource):
    """Test that the async function can be awaited and works as a coroutine."""
    deletion_id = 2
    # Await the coroutine directly
    resp = await datasource.destroy_recycle_bin_item(deletion_id)

# --- Edge Test Cases ---

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_not_found(datasource):
    """Test deletion with a non-existent deletion_id (should fail)."""
    deletion_id = 404  # Simulate not found
    resp = await datasource.destroy_recycle_bin_item(deletion_id)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_permission_denied(datasource):
    """Test deletion with a forbidden deletion_id (should fail)."""
    deletion_id = 403  # Simulate permission denied
    resp = await datasource.destroy_recycle_bin_item(deletion_id)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_server_error(datasource):
    """Test deletion with a server error deletion_id (should fail)."""
    deletion_id = 500  # Simulate server error
    resp = await datasource.destroy_recycle_bin_item(deletion_id)

@pytest.mark.asyncio

async def test_destroy_recycle_bin_item_concurrent_success(datasource):
    """Test concurrent deletion requests for different IDs."""
    ids = [10, 20, 30, 40, 50]
    results = await asyncio.gather(
        *(datasource.destroy_recycle_bin_item(i) for i in ids)
    )
    for i, resp in zip(ids, results):
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_concurrent_mixed(datasource):
    """Test concurrent deletion requests with some failures."""
    ids = [10, 404, 20, 403, 500]
    results = await asyncio.gather(
        *(datasource.destroy_recycle_bin_item(i) for i in ids)
    )
    expected = [True, False, True, False, False]
    for resp, exp in zip(results, expected):
        pass

# --- Large Scale Test Cases ---

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_large_scale_concurrency(datasource):
    """Test large scale concurrent deletion requests (up to 100)."""
    ids = list(range(1, 101))  # 100 items
    results = await asyncio.gather(
        *(datasource.destroy_recycle_bin_item(i) for i in ids)
    )
    for i, resp in zip(ids, results):
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_large_scale_with_errors(datasource):
    """Test large scale concurrent requests with some errors."""
    ids = list(range(1, 91)) + [404, 403, 500, 404, 403, 500]  # 100 items, 6 errors
    results = await asyncio.gather(
        *(datasource.destroy_recycle_bin_item(i) for i in ids)
    )
    for i, resp in zip(ids, results):
        if i in (404, 403, 500):
            pass
        else:
            pass

# --- Throughput Test Cases ---

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_throughput_small_load(datasource):
    """Throughput test: small load of 10 requests."""
    ids = list(range(1000, 1010))
    results = await asyncio.gather(
        *(datasource.destroy_recycle_bin_item(i) for i in ids)
    )
    for i, resp in zip(ids, results):
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_throughput_medium_load(datasource):
    """Throughput test: medium load of 50 requests."""
    ids = list(range(2000, 2050))
    results = await asyncio.gather(
        *(datasource.destroy_recycle_bin_item(i) for i in ids)
    )
    for i, resp in zip(ids, results):
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_throughput_high_volume(datasource):
    """Throughput test: high volume load of 200 requests."""
    ids = list(range(3000, 3200))  # 200 items
    results = await asyncio.gather(
        *(datasource.destroy_recycle_bin_item(i) for i in ids)
    )
    for i, resp in zip(ids, results):
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_throughput_mixed_load(datasource):
    """Throughput test: mixed load with errors and successes."""
    ids = [10, 404, 20, 403, 500] * 10  # 50 requests, 30 errors, 20 successes
    results = await asyncio.gather(
        *(datasource.destroy_recycle_bin_item(i) for i in ids)
    )
    for i, resp in zip(ids, results):
        if i in (404, 403, 500):
            pass
        else:
            pass
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio  # used to run async functions
# --- Function under test (EXACT COPY, DO NOT MODIFY) ---
from typing import Dict, Union

import pytest  # used for our unit tests
from app.sources.external.bookstack.bookstack import BookStackDataSource

# --- Minimal stubs for dependencies to isolate the function under test ---

# HTTPRequest stub
class HTTPRequest:
    def __init__(self, method, url, headers, query_params, body):
        self.method = method
        self.url = url
        self.headers = headers
        self.query_params = query_params
        self.body = body

# BookStackResponse stub
class BookStackResponse:
    def __init__(self, success, data=None, error=None):
        self.success = success
        self.data = data
        self.error = error

# Minimal HTTP client stub for async execution
class DummyHTTPClient:
    def __init__(self, should_fail=False, response_data=None):
        self.headers = {"Authorization": "Token dummy:dummy", "Content-Type": "application/json", "Accept": "application/json"}
        self._should_fail = should_fail
        self._response_data = response_data if response_data is not None else {"deleted": True, "id": 123}

    def get_base_url(self):
        return "https://bookstack.example.com"

    async def execute(self, request, **kwargs):
        # Simulate async HTTP execution
        if self._should_fail:
            raise RuntimeError("Simulated HTTP error")
        # Simulate a successful HTTPResponse
        return HTTPResponse(self._response_data)

# BookStackClient stub
class BookStackClient:
    def __init__(self, http_client):
        self._client = http_client

    def get_client(self):
        return self._client
from app.sources.external.bookstack.bookstack import BookStackDataSource

# --- Unit Tests ---

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_basic_success():
    """Basic: Test successful deletion with valid deletion_id."""
    client = BookStackClient(DummyHTTPClient(response_data={"deleted": True, "id": 42}))
    datasource = BookStackDataSource(client)
    result = await datasource.destroy_recycle_bin_item(42)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_basic_return_type():
    """Basic: Ensure return type is BookStackResponse and fields are correct."""
    client = BookStackClient(DummyHTTPClient())
    datasource = BookStackDataSource(client)
    result = await datasource.destroy_recycle_bin_item(123)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_basic_async_behavior():
    """Basic: Ensure function is awaitable and works in async context."""
    client = BookStackClient(DummyHTTPClient())
    datasource = BookStackDataSource(client)
    # Await the coroutine
    result = await datasource.destroy_recycle_bin_item(99)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_edge_invalid_id():
    """Edge: Test with invalid deletion_id (negative, zero, very large)."""
    client = BookStackClient(DummyHTTPClient(response_data={"deleted": True, "id": -1}))
    datasource = BookStackDataSource(client)
    # Negative ID
    result = await datasource.destroy_recycle_bin_item(-1)

    # Zero ID
    client = BookStackClient(DummyHTTPClient(response_data={"deleted": True, "id": 0}))
    datasource = BookStackDataSource(client)
    result = await datasource.destroy_recycle_bin_item(0)

    # Very large ID
    large_id = 999999999
    client = BookStackClient(DummyHTTPClient(response_data={"deleted": True, "id": large_id}))
    datasource = BookStackDataSource(client)
    result = await datasource.destroy_recycle_bin_item(large_id)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_edge_http_error():
    """Edge: Simulate HTTP error and ensure error is handled."""
    client = BookStackClient(DummyHTTPClient(should_fail=True))
    datasource = BookStackDataSource(client)
    result = await datasource.destroy_recycle_bin_item(123)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_edge_concurrent_execution():
    """Edge: Test concurrent execution with different deletion_ids."""
    client = BookStackClient(DummyHTTPClient())
    datasource = BookStackDataSource(client)
    ids = [101, 102, 103, 104, 105]
    coros = [datasource.destroy_recycle_bin_item(deletion_id) for deletion_id in ids]
    results = await asyncio.gather(*coros)
    for i, result in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_edge_custom_response_data():
    """Edge: Test custom response data returned by HTTP client."""
    custom_data = {"deleted": False, "id": 77, "message": "Already deleted"}
    client = BookStackClient(DummyHTTPClient(response_data=custom_data))
    datasource = BookStackDataSource(client)
    result = await datasource.destroy_recycle_bin_item(77)

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_edge_http_client_not_initialized():
    """Edge: Test ValueError when HTTP client is not initialized."""
    class BadBookStackClient:
        def get_client(self):
            return None
    with pytest.raises(ValueError):
        BookStackDataSource(BadBookStackClient())

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_edge_http_client_missing_base_url():
    """Edge: Test ValueError when HTTP client does not have get_base_url()."""
    class BadHTTPClient:
        pass
    class BadBookStackClient:
        def get_client(self):
            return BadHTTPClient()
    with pytest.raises(ValueError):
        BookStackDataSource(BadBookStackClient())

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_large_scale_concurrent_load():
    """Large Scale: Test with many concurrent deletions."""
    client = BookStackClient(DummyHTTPClient())
    datasource = BookStackDataSource(client)
    ids = list(range(200, 220))  # 20 concurrent deletions
    coros = [datasource.destroy_recycle_bin_item(deletion_id) for deletion_id in ids]
    results = await asyncio.gather(*coros)
    for result in results:
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_large_scale_unique_ids():
    """Large Scale: Ensure each concurrent deletion returns correct unique id."""
    ids = list(range(300, 310))
    client = BookStackClient(DummyHTTPClient())
    datasource = BookStackDataSource(client)
    # Patch DummyHTTPClient to return the correct ID for each call
    async def make_call(deletion_id):
        # Use a new client for each call with the correct response data
        ds = BookStackDataSource(BookStackClient(DummyHTTPClient(response_data={"deleted": True, "id": deletion_id})))
        return await ds.destroy_recycle_bin_item(deletion_id)
    coros = [make_call(deletion_id) for deletion_id in ids]
    results = await asyncio.gather(*coros)
    for i, result in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_throughput_small_load():
    """Throughput: Test function with small concurrent load."""
    client = BookStackClient(DummyHTTPClient())
    datasource = BookStackDataSource(client)
    ids = [1, 2, 3, 4, 5]
    coros = [datasource.destroy_recycle_bin_item(deletion_id) for deletion_id in ids]
    results = await asyncio.gather(*coros)
    for result in results:
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_throughput_medium_load():
    """Throughput: Test function with medium concurrent load."""
    client = BookStackClient(DummyHTTPClient())
    datasource = BookStackDataSource(client)
    ids = list(range(10, 30))  # 20 concurrent deletions
    coros = [datasource.destroy_recycle_bin_item(deletion_id) for deletion_id in ids]
    results = await asyncio.gather(*coros)
    for result in results:
        pass

@pytest.mark.asyncio
async def test_destroy_recycle_bin_item_throughput_high_volume():
    """Throughput: Test function with high volume concurrent load (but <1000)."""
    client = BookStackClient(DummyHTTPClient())
    datasource = BookStackDataSource(client)
    ids = list(range(100, 200))  # 100 concurrent deletions
    coros = [datasource.destroy_recycle_bin_item(deletion_id) for deletion_id in ids]
    results = await asyncio.gather(*coros)
    for result in results:
        pass
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from app.sources.external.bookstack.bookstack import BookStackDataSource

To edit these changes git checkout codeflash/optimize-BookStackDataSource.destroy_recycle_bin_item-mhbo4q9h and push.

Codeflash

The optimization achieves a 10% runtime improvement through two key micro-optimizations that reduce object allocation overhead:

**1. Conditional Header Merging in HTTPClient**
- **Original**: Always creates a new merged dictionary with `{**self.headers, **request.headers}` regardless of whether request headers exist
- **Optimized**: Only creates a merged dictionary when `request.headers` is non-empty, otherwise directly uses `self.headers`
- **Impact**: Eliminates unnecessary dictionary creation and copying when requests have no custom headers (common case)

**2. Direct Header Reference in BookStackDataSource**
- **Original**: `headers = dict(self.http.headers)` creates a defensive copy of headers
- **Optimized**: `headers = self.http.headers` uses direct reference since no mutation occurs
- **Impact**: Eliminates dict copying overhead per request

**3. F-string URL Construction**
- **Original**: `url = self.base_url + "/api/recycle-bin/{deletion_id}".format(deletion_id=deletion_id)`
- **Optimized**: `url = f"{self.base_url}/api/recycle-bin/{deletion_id}"`
- **Impact**: Reduces string manipulation overhead by using more efficient f-string interpolation

The line profiler shows the URL construction improvement (461μs → 268μs, 42% faster) and header handling optimization (230μs → 167μs, 27% faster). These optimizations are particularly effective for high-throughput scenarios where the function is called repeatedly, as evidenced by the throughput improvement from 157,776 to 159,160 operations/second (0.9% increase). The optimizations work best for typical API usage patterns where requests have minimal custom headers and simple URL templating.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 October 29, 2025 07:22
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant