Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Oct 29, 2025

📄 9% (0.09x) speedup for GZipMiddleware.__call__ in starlette/middleware/gzip.py

⏱️ Runtime : 6.18 milliseconds 5.67 milliseconds (best of 239 runs)

📝 Explanation and details

The optimization eliminates the overhead of creating a Headers object for every HTTP request by directly accessing the raw header data from the ASGI scope.

Key Performance Changes:

  1. Removed Headers Object Creation: The original code creates Headers(scope=scope) which involves parsing and processing all headers into a dictionary-like structure. The optimized version directly accesses scope.get("headers") which is already a list of (bytes, bytes) tuples.

  2. Direct Byte String Operations: Instead of converting headers to strings and using headers.get("Accept-Encoding", ""), the optimization searches for b"accept-encoding" directly in the byte tuples and checks for b"gzip" substring, avoiding string conversions entirely.

  3. Early Loop Termination: The header search stops immediately after finding the accept-encoding header with a break statement, rather than processing all headers through the Headers wrapper.

Why This is Faster:

  • Eliminates per-request object allocation overhead (~5.6% time reduction in header creation)
  • Avoids string encoding/decoding operations
  • Reduces memory allocations and garbage collection pressure
  • Uses more efficient byte string operations instead of string manipulation

Performance Context:
The line profiler shows the Headers(scope=scope) line took 5.6% of total execution time in the original version, which is completely eliminated. The optimization is particularly effective for high-throughput web applications where this middleware processes many requests per second, as evidenced by the 2.1% throughput improvement (142,506 → 145,551 operations/second) and 9% runtime reduction.

This optimization works well across all test scenarios - whether headers contain gzip encoding, use different encodings, or are missing entirely.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 404 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions

import pytest  # used for our unit tests
from starlette.middleware.gzip import GZipMiddleware


# Minimal stub implementations for dependencies
class Headers:
    # Simulates starlette.datastructures.Headers
    def __init__(self, scope):
        self.scope = scope

    def get(self, key, default=None):
        # Looks up headers in the scope dict (scope['headers'] is a list of (bytes, bytes))
        headers = self.scope.get("headers", [])
        key_bytes = key.lower().encode("latin1")
        for k, v in headers:
            if k.lower() == key_bytes:
                return v.decode("latin1")
        return default

# Dummy ASGIApp implementation for testing
async def dummy_app(scope, receive, send):
    # Simulate a simple ASGI response
    await send({"type": "http.response.start", "status": 200, "headers": [(b"content-type", b"text/plain")]})
    await send({"type": "http.response.body", "body": b"Hello, world!"})

# Dummy responder classes for testing
class GZipResponder:
    def __init__(self, app, minimum_size, compresslevel=9):
        self.app = app
        self.minimum_size = minimum_size
        self.compresslevel = compresslevel
        self.called = False

    async def __call__(self, scope, receive, send):
        self.called = True
        # Simulate gzip response
        await send({"type": "http.response.start", "status": 200, "headers": [(b"content-encoding", b"gzip")]})
        await send({"type": "http.response.body", "body": b"\x1f\x8bHello, world!"})

class IdentityResponder:
    def __init__(self, app, minimum_size):
        self.app = app
        self.minimum_size = minimum_size
        self.called = False

    async def __call__(self, scope, receive, send):
        self.called = True
        # Simulate identity response
        await send({"type": "http.response.start", "status": 200, "headers": [(b"content-encoding", b"identity")]})
        await send({"type": "http.response.body", "body": b"Hello, world!"})
from starlette.middleware.gzip import GZipMiddleware


# Helper for ASGI receive/send
class DummySend:
    def __init__(self):
        self.messages = []

    async def __call__(self, message):
        self.messages.append(message)

class DummyReceive:
    def __init__(self, messages=None):
        self.messages = messages or []
        self.index = 0

    async def __call__(self):
        if self.index < len(self.messages):
            msg = self.messages[self.index]
            self.index += 1
            return msg
        return {}

# ------------------- UNIT TESTS -------------------

@pytest.mark.asyncio

async def test___call___basic_identity_header():
    # Test that IdentityResponder is used when Accept-Encoding does not include 'gzip'
    scope = {
        "type": "http",
        "headers": [(b"accept-encoding", b"deflate")]
    }
    send = DummySend()
    receive = DummyReceive()
    middleware = GZipMiddleware(dummy_app)
    await middleware.__call__(scope, receive, send)
    # Check that the response includes identity encoding
    found = any(
        msg.get("type") == "http.response.start" and
        any(h[0] == b"content-encoding" and h[1] == b"identity" for h in msg.get("headers", []))
        for msg in send.messages
    )

@pytest.mark.asyncio
async def test___call___basic_no_accept_encoding():
    # Test that IdentityResponder is used when Accept-Encoding header is missing
    scope = {
        "type": "http",
        "headers": []
    }
    send = DummySend()
    receive = DummyReceive()
    middleware = GZipMiddleware(dummy_app)
    await middleware.__call__(scope, receive, send)
    found = any(
        msg.get("type") == "http.response.start" and
        any(h[0] == b"content-encoding" and h[1] == b"identity" for h in msg.get("headers", []))
        for msg in send.messages
    )

@pytest.mark.asyncio
async def test___call___non_http_scope():
    # Test that app is called directly for non-http scope
    scope = {
        "type": "websocket",
        "headers": [(b"accept-encoding", b"gzip")]
    }
    send = DummySend()
    receive = DummyReceive()
    middleware = GZipMiddleware(dummy_app)
    await middleware.__call__(scope, receive, send)
    # Should produce a normal dummy_app response (no content-encoding header)
    found = any(
        msg.get("type") == "http.response.start" and
        any(h[0] == b"content-type" for h in msg.get("headers", []))
        for msg in send.messages
    )

@pytest.mark.asyncio
async def test___call___case_insensitive_header_lookup():
    # Test that Accept-Encoding header lookup is case-insensitive
    scope = {
        "type": "http",
        "headers": [(b"AcCePt-EnCoDiNg", b"gzip")]
    }
    send = DummySend()
    receive = DummyReceive()
    middleware = GZipMiddleware(dummy_app)
    await middleware.__call__(scope, receive, send)
    found = any(
        msg.get("type") == "http.response.start" and
        any(h[0] == b"content-encoding" and h[1] == b"gzip" for h in msg.get("headers", []))
        for msg in send.messages
    )

@pytest.mark.asyncio
async def test___call___empty_headers_list():
    # Test with empty headers list (should use IdentityResponder)
    scope = {
        "type": "http",
        "headers": []
    }
    send = DummySend()
    receive = DummyReceive()
    middleware = GZipMiddleware(dummy_app)
    await middleware.__call__(scope, receive, send)
    found = any(
        msg.get("type") == "http.response.start" and
        any(h[0] == b"content-encoding" and h[1] == b"identity" for h in msg.get("headers", []))
        for msg in send.messages
    )

@pytest.mark.asyncio
async def test___call___concurrent_requests():
    # Test concurrent execution with mixed Accept-Encoding headers
    scopes = [
        {"type": "http", "headers": [(b"accept-encoding", b"gzip")]},
        {"type": "http", "headers": [(b"accept-encoding", b"deflate")]},
        {"type": "http", "headers": []},
        {"type": "http", "headers": [(b"accept-encoding", b"gzip, deflate")]}
    ]
    sends = [DummySend() for _ in scopes]
    receives = [DummyReceive() for _ in scopes]
    middleware = GZipMiddleware(dummy_app)
    coros = [
        middleware.__call__(scope, receive, send)
        for scope, receive, send in zip(scopes, receives, sends)
    ]
    await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test___call___invalid_scope_type():
    # Test with invalid scope type (should call app directly)
    scope = {
        "type": "invalid",
        "headers": [(b"accept-encoding", b"gzip")]
    }
    send = DummySend()
    receive = DummyReceive()
    middleware = GZipMiddleware(dummy_app)
    await middleware.__call__(scope, receive, send)
    found = any(
        msg.get("type") == "http.response.start"
        for msg in send.messages
    )

@pytest.mark.asyncio
async def test___call___empty_accept_encoding_value():
    # Test with empty Accept-Encoding value (should use IdentityResponder)
    scope = {
        "type": "http",
        "headers": [(b"accept-encoding", b"")]
    }
    send = DummySend()
    receive = DummyReceive()
    middleware = GZipMiddleware(dummy_app)
    await middleware.__call__(scope, receive, send)
    found = any(
        msg.get("type") == "http.response.start" and
        any(h[0] == b"content-encoding" and h[1] == b"identity" for h in msg.get("headers", []))
        for msg in send.messages
    )

@pytest.mark.asyncio
async def test___call___large_scale_concurrent_requests():
    # Test large scale concurrent requests (up to 100 requests)
    scopes = []
    for i in range(100):
        if i % 2 == 0:
            scopes.append({"type": "http", "headers": [(b"accept-encoding", b"gzip")]})
        else:
            scopes.append({"type": "http", "headers": [(b"accept-encoding", b"deflate")]})
    sends = [DummySend() for _ in scopes]
    receives = [DummyReceive() for _ in scopes]
    middleware = GZipMiddleware(dummy_app)
    coros = [
        middleware.__call__(scope, receive, send)
        for scope, receive, send in zip(scopes, receives, sends)
    ]
    await asyncio.gather(*coros)
    # Check correct responder used for each request
    for i, send in enumerate(sends):
        if i % 2 == 0:
            pass
        else:
            pass

@pytest.mark.asyncio



#------------------------------------------------
import asyncio  # used to run async functions

import pytest  # used for our unit tests
from starlette.middleware.gzip import GZipMiddleware


# Dummy classes and types to support the function under test
class DummyResponder:
    """A dummy responder that records calls for testing."""
    def __init__(self, app, minimum_size, compresslevel=None):
        self.app = app
        self.minimum_size = minimum_size
        self.compresslevel = compresslevel
        self.called = False
        self.scope = None
        self.receive = None
        self.send = None

    async def __call__(self, scope, receive, send):
        self.called = True
        self.scope = scope
        self.receive = receive
        self.send = send
        # Simulate sending a response
        await send({"type": "http.response.start", "status": 200, "headers": []})
        await send({"type": "http.response.body", "body": b"dummy", "more_body": False})

class DummyApp:
    """A dummy ASGI app that records calls for testing."""
    def __init__(self):
        self.called = False
        self.scope = None
        self.receive = None
        self.send = None

    async def __call__(self, scope, receive, send):
        self.called = True
        self.scope = scope
        self.receive = receive
        self.send = send
        await send({"type": "http.response.start", "status": 200, "headers": []})
        await send({"type": "http.response.body", "body": b"dummy", "more_body": False})

# Patch the GZipResponder and IdentityResponder for testing
class GZipResponder(DummyResponder):
    pass

class IdentityResponder(DummyResponder):
    pass

# Minimal Headers implementation for testing
class Headers:
    def __init__(self, scope):
        self._headers = {}
        for k, v in scope.get("headers", []):
            self._headers[k.decode("latin1")] = v.decode("latin1")
        self.scope = scope

    def get(self, key, default=None):
        return self._headers.get(key.lower(), default)
from starlette.middleware.gzip import GZipMiddleware


# Helper async send function to record messages
class SendRecorder:
    def __init__(self):
        self.messages = []

    async def __call__(self, message):
        self.messages.append(message)

# Helper async receive function for ASGI
async def dummy_receive():
    return {"type": "http.request"}

# ------------------- UNIT TESTS -------------------

# 1. Basic Test Cases

@pytest.mark.asyncio

async def test___call___basic_identity_header():
    """Test that IdentityResponder is used when Accept-Encoding does not contain gzip."""
    app = DummyApp()
    middleware = GZipMiddleware(app)
    send_recorder = SendRecorder()
    scope = {
        "type": "http",
        "headers": [
            (b"accept-encoding", b"deflate"),
        ]
    }
    await middleware.__call__(scope, dummy_receive, send_recorder.__call__)

@pytest.mark.asyncio
async def test___call___basic_no_accept_encoding():
    """Test that IdentityResponder is used when Accept-Encoding is missing."""
    app = DummyApp()
    middleware = GZipMiddleware(app)
    send_recorder = SendRecorder()
    scope = {
        "type": "http",
        "headers": [
            (b"user-agent", b"pytest"),
        ]
    }
    await middleware.__call__(scope, dummy_receive, send_recorder.__call__)

# 2. Edge Test Cases

@pytest.mark.asyncio
async def test___call___non_http_scope():
    """Test that non-http scope calls app directly."""
    app = DummyApp()
    middleware = GZipMiddleware(app)
    send_recorder = SendRecorder()
    scope = {
        "type": "websocket",
        "headers": [
            (b"accept-encoding", b"gzip"),
        ]
    }
    await middleware.__call__(scope, dummy_receive, send_recorder.__call__)

@pytest.mark.asyncio
async def test___call___empty_headers():
    """Test that IdentityResponder is used when headers are empty."""
    app = DummyApp()
    middleware = GZipMiddleware(app)
    send_recorder = SendRecorder()
    scope = {
        "type": "http",
        "headers": [],
    }
    await middleware.__call__(scope, dummy_receive, send_recorder.__call__)

@pytest.mark.asyncio
async def test___call___accept_encoding_case_insensitive():
    """Test Accept-Encoding header is case-insensitive."""
    app = DummyApp()
    middleware = GZipMiddleware(app)
    send_recorder = SendRecorder()
    scope = {
        "type": "http",
        "headers": [
            (b"Accept-Encoding", b"gzip"),
        ]
    }
    await middleware.__call__(scope, dummy_receive, send_recorder.__call__)

@pytest.mark.asyncio

async def test___call___invalid_headers_format():
    """Test that invalid header format does not break the middleware."""
    app = DummyApp()
    middleware = GZipMiddleware(app)
    send_recorder = SendRecorder()
    # headers are not bytes
    scope = {
        "type": "http",
        "headers": [
            ("accept-encoding", "gzip"),
        ],
    }
    # Should not raise, but headers will be missing
    await middleware.__call__(scope, dummy_receive, send_recorder.__call__)

# 3. Large Scale Test Cases

@pytest.mark.asyncio





#------------------------------------------------
from starlette.middleware.gzip import GZipMiddleware

To edit these changes git checkout codeflash/optimize-GZipMiddleware.__call__-mhc9anyn and push.

Codeflash

The optimization eliminates the overhead of creating a `Headers` object for every HTTP request by directly accessing the raw header data from the ASGI scope.

**Key Performance Changes:**

1. **Removed Headers Object Creation:** The original code creates `Headers(scope=scope)` which involves parsing and processing all headers into a dictionary-like structure. The optimized version directly accesses `scope.get("headers")` which is already a list of `(bytes, bytes)` tuples.

2. **Direct Byte String Operations:** Instead of converting headers to strings and using `headers.get("Accept-Encoding", "")`, the optimization searches for `b"accept-encoding"` directly in the byte tuples and checks for `b"gzip"` substring, avoiding string conversions entirely.

3. **Early Loop Termination:** The header search stops immediately after finding the `accept-encoding` header with a `break` statement, rather than processing all headers through the `Headers` wrapper.

**Why This is Faster:**
- Eliminates per-request object allocation overhead (~5.6% time reduction in header creation)
- Avoids string encoding/decoding operations 
- Reduces memory allocations and garbage collection pressure
- Uses more efficient byte string operations instead of string manipulation

**Performance Context:**
The line profiler shows the `Headers(scope=scope)` line took 5.6% of total execution time in the original version, which is completely eliminated. The optimization is particularly effective for high-throughput web applications where this middleware processes many requests per second, as evidenced by the 2.1% throughput improvement (142,506 → 145,551 operations/second) and 9% runtime reduction.

This optimization works well across all test scenarios - whether headers contain gzip encoding, use different encodings, or are missing entirely.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 October 29, 2025 17:15
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash labels Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant