Skip to content

Conversation

@spumer
Copy link
Collaborator

@spumer spumer commented Nov 8, 2025

Summary by Sourcery

Replace the external sqredirect tool with an internal class-based eBPF template engine that generates, compiles, and attaches BPF programs directly via BCC and tc.

New Features:

  • Introduce a class-based eBPF template engine under source_query_proxy/ebpf, including BPFStruct, BPFMap, BPFFunction, BPFProgram, and PacketRedirectOperation
  • Add EBPFRedirector for async lifecycle management of eBPF program compilation, map population, and tc attachment
  • Support direct BCC compilation and pyroute2-based tc filters without external subprocesses

Enhancements:

  • Refactor epbf.py to delegate network helpers and BPF logic into dedicated runtime, tc, and redirector modules
  • Preserve legacy run_ebpf_redirection and get_ebpf_program_run_args for backward compatibility
  • Remove epbf_template.py and sqredirect usage

CI:

  • Add linting (flake8 + black) and coverage reporting in GitHub Actions tests.yml
  • Add a new ebpf-tests.yml workflow to build a Docker image and run eBPF generation and compilation tests

Documentation:

  • Add DESIGN_EBPF_TEMPLATE.md and PROGRESS.md to document architecture and implementation status
  • Document Docker-based eBPF test environment with README in tests/docker

Tests:

  • Add comprehensive unit tests for template engine (test_ebpf_generation.py) and integration tests for BCC compilation (test_ebpf_compilation.py)
  • Provide Docker Compose setup for eBPF compilation tests

Chores:

  • Remove sqredirect subprocess calls and epbf_template.py
  • Re-export legacy API in init.py for deprecation path

Add comprehensive design document for issue #131 (Templating eBPF program).

Key design decisions:
- Class-based template engine (similar to Django migrations)
- Stay with BCC (Python), future migration to bpfman.io
- Port sqredirect redirect.c logic without new features
- Docker-compose test infrastructure with virtual networks

The design includes:
- Complete class hierarchy (BPFElement, BPFMap, BPFFunction, BPFOperation)
- PacketRedirectOperation to replace external sqredirect dependency
- 10-phase implementation plan
- Docker test environment with gameserver/proxy/client setup

Related: #131, #130
Implement core template engine to replace external sqredirect dependency.

New package: source_query_proxy/ebpf/
- elements.py: BPFElement, BPFStruct, BPFMap, BPFFunction base classes
- program.py: BPFProgram orchestrator for complete C code generation
- operations.py: PacketRedirectOperation ports redirect.c logic

Key features:
- Class-based composable design (Django-style operations)
- Ports sqredirect incoming()/outgoing() functions
- Steam protocol validation for DDoS protection
- UDP checksum recalculation
- Supports both port-only and IP+port lookup modes

Tests:
- Add comprehensive unit tests in test_ebpf_generation.py
- Verify code generation for all components
- Test both redirect modes (port-only vs IP+port)

Breaking changes:
- Delete ebpf_template.py (replaced by new template engine)

Related: #131
Replace custom checksum code with standard BPF helper bpf_l4_csum_replace.

Changes:
- operations.py: Use bpf_l4_csum_replace() instead of bpf_csum_diff/csum_fold
- Properly handle network byte order with htons()
- Calculate correct offset to checksum field

Also add WIP version of epbf_new.py with BCC integration (not yet complete).

Related: #131
Enhancements to GitHub Actions workflow:
- Add separate lint job with flake8 and black checks
- Enable coverage reporting with pytest-cov
- Upload coverage to Codecov for Python 3.11
- Add verbose test output (-v flag)
- Generate XML and terminal coverage reports

The new eBPF template engine tests (test_ebpf_generation.py) will be
automatically discovered and run across Python 3.8, 3.9, 3.10, and 3.11.

Related: #131
Add comprehensive Docker testing infrastructure for eBPF code compilation.

New files:
- tests/test_ebpf_compilation.py: Integration tests that verify BPF code
  compiles with BCC. Tests include:
  * Port-only redirect compilation
  * IP+port redirect compilation
  * Syntax validation
  * Multiple port mappings
  * Various port combinations

- tests/docker/Dockerfile.bpf-test: Container with BCC, kernel headers,
  and all dependencies needed to compile BPF programs

- tests/docker/docker-compose.bpf-test.yml: Orchestrates test execution
  with both generation and compilation tests

- tests/docker/README.md: Complete documentation for Docker test setup,
  usage instructions, and troubleshooting

- .github/workflows/ebpf-tests.yml: GitHub Actions workflow that:
  * Builds Docker test container
  * Runs code generation tests
  * Runs BCC compilation tests
  * Tests via docker-compose
  * Triggers on eBPF code changes

Features:
- ✅ Tests run in isolated Docker environment
- ✅ BCC compilation verification without kernel privileges
- ✅ Graceful handling when BCC not available
- ✅ Automated CI/CD integration
- ✅ Both unit and integration test coverage

Tests verify:
- Generated BPF C code compiles successfully
- BPF functions are loadable (incoming/outgoing)
- BPF maps can be created and populated
- Multiple port mappings in single program
- Code syntax is valid

Limitations documented:
- Cannot test actual packet redirection (needs kernel access)
- Cannot test tc attachment (needs privileges)
- Full integration tests planned for future

Related: #131
Complete implementation of eBPF packet redirection using the class-based
template engine, replacing external sqredirect dependency.

Changes to source_query_proxy/epbf.py:
- Replace subprocess calls to sqredirect with direct BCC usage
- Generate single BPF program for all servers using template engine
- Populate BPF maps with all port mappings at runtime
- Attach BPF programs to tc (traffic control) using pyroute2
- Support both port-only and IP+port lookup modes
- Lazy import BCC with helpful error messages
- Keep backward compatibility for existing tests

Key functions:
- _collect_server_mappings(): Gather all server configs from settings
- _generate_bpf_program(): Use template engine to generate BPF C code
- _populate_maps(): Fill BPF maps with port mappings
- _attach_tc_bpf(): Attach incoming/outgoing filters to network interface
- run_ebpf_redirection(): Main entry point (async)

Features:
✅ Single BPF program for all servers (not per-server)
✅ Runtime map population (dynamic configuration)
✅ Automatic interface detection
✅ Proper tc attachment (ingress + egress)
✅ Error handling and logging
✅ Graceful shutdown on Ctrl+C
✅ Backward compatibility with tests

Breaking changes:
- Requires BCC installed (python3-bpfcc)
- No longer uses external sqredirect executable
- config.ebpf.executable and config.ebpf.script_path ignored

Removed:
- source_query_proxy/epbf_new.py (WIP file, no longer needed)

Related: #131
@sourcery-ai
Copy link

sourcery-ai bot commented Nov 8, 2025

Reviewer's Guide

This PR replaces the external sqredirect dependency with an internal class-based eBPF template engine and direct BCC integration. It refactors epbf.py to use a modular ebpf package (elements, program, operations, runtime, tc, redirector), introduces an async EBPFRedirector for lifecycle management, and extends CI with lint/format checks, coverage reporting, and Docker-based integration tests—backed by new unit and compilation tests and comprehensive design/progress docs.

Sequence diagram for eBPF redirection lifecycle using EBPFRedirector

sequenceDiagram
    participant App as Application
    participant Redirector as EBPFRedirector
    participant Runtime as "ebpf.runtime"
    participant TC as "ebpf.tc"
    participant BCC as BCC
    App->>Redirector: async with EBPFRedirector()
    Redirector->>Runtime: collect_server_mappings()
    Redirector->>Runtime: generate_bpf_program(use_ipport_key)
    Redirector->>Runtime: import_bcc()
    Redirector->>BCC: Compile BPF program
    Redirector->>Runtime: populate_maps(bpf, use_ipport_key, mappings)
    Redirector->>TC: attach_tc_bpf(interface, bpf, ipr)
    App-->>Redirector: (waits, eBPF active)
    App->>Redirector: exit context
    Redirector->>TC: cleanup_tc(ipr, ifindex, safe=True)
    Redirector->>Runtime: (cleanup resources)
Loading

ER diagram for BPF map usage in PacketRedirectOperation

erDiagram
    PORT_MAP {
        u16 port
        u16 bind_port
    }
    ADDR_MAP {
        u32 ip
        u16 port
        u16 bind_port
    }
    SERVER {
        server_port int
        bind_port int
        bind_ip str
    }
    SERVER ||--o{ PORT_MAP : "uses port-only mapping"
    SERVER ||--o{ ADDR_MAP : "uses IP+port mapping"
    PORT_MAP }o--|| SERVER : "maps to bind_port"
    ADDR_MAP }o--|| SERVER : "maps to bind_port"
Loading

Class diagram for the new eBPF template engine architecture

classDiagram
    class BPFElement {
        +render() str
    }
    class BPFStruct {
        +name: str
        +fields: List[Tuple[str, str]]
        +render() str
    }
    class BPFMap {
        +name: str
        +key_type: str
        +value_type: str
        +max_entries: int
        +map_type: str
        +render() str
    }
    class BPFFunction {
        +name: str
        +return_type: str
        +params: List[Tuple[str, str]]
        +body_parts: List[str]
        +add_code(code: str)
        +render() str
    }
    class BPFOperation {
        +apply(program: BPFProgram)
    }
    class PacketRedirectOperation {
        +server_port: int
        +bind_port: int
        +bind_ip: Optional[str]
        +use_ipport_key: bool
        +apply(program: BPFProgram)
        -_create_incoming_function() BPFFunction
        -_create_outgoing_function() BPFFunction
    }
    class BPFProgram {
        +name: str
        +includes: List[str]
        +structs: List[BPFStruct]
        +maps: List[BPFMap]
        +functions: List[BPFFunction]
        +add_include(include: str)
        +add_struct(struct: BPFStruct)
        +add_map(map: BPFMap)
        +add_function(func: BPFFunction)
        +apply_operation(operation: BPFOperation)
        +render() str
    }
    BPFStruct --|> BPFElement
    BPFMap --|> BPFElement
    BPFFunction --|> BPFElement
    PacketRedirectOperation --|> BPFOperation
    BPFProgram o-- BPFStruct
    BPFProgram o-- BPFMap
    BPFProgram o-- BPFFunction
    BPFOperation <.. BPFProgram: apply
    PacketRedirectOperation <.. BPFProgram: apply_operation
Loading

Class diagram for the EBPFRedirector lifecycle manager and runtime modules

classDiagram
    class EBPFRedirector {
        -_bpf
        -_ipr
        -_ifindex: Optional[int]
        -_fn_incoming
        -_fn_outgoing
        -_interface: Optional[str]
        -_running: bool
        +is_running: bool
        +interface: Optional[str]
        +start()
        +stop()
        +restart()
        +__aenter__()
        +__aexit__()
        -_cleanup()
    }
    class runtime {
        +import_bcc()
        +get_addr_interface(addr: IPv4Address) Optional[str]
        +get_default_interface() str
        +collect_server_mappings() (bool, str, List[Tuple[int, int, Optional[str]]])
        +generate_bpf_program(use_ipport_key: bool) str
        +ip_to_int(ip_str: str) int
        +populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, Optional[str]]])
    }
    class tc {
        +rollback_tc_bpf(ipr, ifindex, ingress_added, ingress_filter_added, sfq_added)
        +attach_tc_bpf(interface: str, bpf, ipr)
        +cleanup_tc(ipr, ifindex: int, safe: bool)
    }
    EBPFRedirector ..> runtime: uses
    EBPFRedirector ..> tc: uses
Loading

File-Level Changes

Change Details Files
Add class-based template engine for generating eBPF C code
  • Define BPFElement base with BPFStruct, BPFMap, BPFFunction for code snippets
  • Implement BPFProgram orchestrator to assemble includes, structs, maps, functions
  • Create PacketRedirectOperation to encapsulate redirect.c logic into composable operations
  • Add unit tests for all template engine components
source_query_proxy/ebpf/elements.py
source_query_proxy/ebpf/program.py
source_query_proxy/ebpf/operations.py
tests/test_ebpf_generation.py
Introduce EBPFRedirector lifecycle manager with direct BCC and tc integration
  • Add runtime helpers for interface discovery, config collection, code generation, map population
  • Implement tc module to attach and clean up BPF programs via pyroute2
  • Create redirector module with async start/stop/restart and backward-compatible wrappers
  • Refactor epbf.py to use EBPFRedirector, remove sqredirect subprocess calls
source_query_proxy/ebpf/runtime.py
source_query_proxy/ebpf/tc.py
source_query_proxy/ebpf/redirector.py
source_query_proxy/epbf.py
Enhance CI with linting, formatting, coverage, and Docker-based eBPF integration tests
  • Add lint job with flake8 and black in tests.yml
  • Enable pytest coverage reporting and Codecov upload
  • Introduce ebpf-tests.yml for Docker-based compilation tests
  • Provide Docker test infrastructure and compilation tests for BCC
.github/workflows/tests.yml
.github/workflows/ebpf-tests.yml
tests/docker/Dockerfile.bpf-test
tests/docker/docker-compose.bpf-test.yml
tests/docker/README.md
tests/test_ebpf_compilation.py
Add architecture design and implementation progress documentation
  • Document template engine design in DESIGN_EBPF_TEMPLATE.md
  • Track full implementation status in PROGRESS.md
DESIGN_EBPF_TEMPLATE.md
PROGRESS.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Consider splitting this large PR into smaller, focused changes (template engine core, epbf integration, CI updates) to make review and testing more manageable.
  • Replace the use of AssertionError when no interface is found with a more descriptive exception (e.g., config.ConfigurationError) for clearer error handling.
  • Improve the shutdown sequence in run_ebpf_redirection to explicitly detach tc qdiscs and unload BPF programs instead of relying on garbage collection and KeyboardInterrupt.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider splitting this large PR into smaller, focused changes (template engine core, epbf integration, CI updates) to make review and testing more manageable.
- Replace the use of AssertionError when no interface is found with a more descriptive exception (e.g., config.ConfigurationError) for clearer error handling.
- Improve the shutdown sequence in run_ebpf_redirection to explicitly detach tc qdiscs and unload BPF programs instead of relying on garbage collection and KeyboardInterrupt.

## Individual Comments

### Comment 1
<location> `source_query_proxy/epbf.py:42-46` </location>
<code_context>
+    return _BPF
+

 def _get_addr_interface(addr: IPv4Address):
+    """Get network interface name for given IP address"""
     ipdb = pyroute2.IPDB()
</code_context>

<issue_to_address>
**suggestion (bug_risk):** IPDB usage may leak resources if not closed.

Use a context manager (with pyroute2.IPDB() as ipdb) to ensure IPDB is properly closed and resources are released.

```suggestion
def _get_addr_interface(addr: IPv4Address):
    """Get network interface name for given IP address"""
    with pyroute2.IPDB() as ipdb:
        for idx, addresses in ipdb.ipaddr.items():
            for ifaddr, _prefix in addresses:
                pass  # TODO: implement logic or keep as is
        return None
```
</issue_to_address>

### Comment 2
<location> `source_query_proxy/epbf.py:147` </location>
<code_context>
+    return struct.unpack("!I", socket.inet_aton(ip_str))[0]
+
+
+def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
+    """Populate BPF maps with port mappings
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** No error handling for BPF map population failures.

Add error handling or logging for failed BPF map insertions to improve debuggability.

Suggested implementation:

```python
def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
    """Populate BPF maps with port mappings

    Logs errors if BPF map insertions fail.

```

```python
def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
    """Populate BPF maps with port mappings

    Logs errors if BPF map insertions fail.
    """
    for src_port, dst_port, ip_str in mappings:
        try:
            ip_int = _ip_to_int(ip_str)
            key = (ip_int, src_port) if use_ipport_key else src_port
            bpf_map = bpf['port_map']
            bpf_map[key] = dst_port
        except Exception as e:
            logging.error(
                "Failed to insert mapping into BPF map: src_port=%s, dst_port=%s, ip_str=%s, error=%s",
                src_port, dst_port, ip_str, str(e)
            )

```
</issue_to_address>

### Comment 3
<location> `source_query_proxy/epbf.py:178` </location>
<code_context>
+            logger.info(f"  Map: {server_port} -> {bind_port}")
+
+
+def _attach_tc_bpf(interface: str, bpf) -> None:
+    """Attach BPF programs to tc (traffic control)
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Potential resource leak if tc attachment fails after adding qdisc.

Add cleanup logic to remove the qdisc if filter attachment fails, or document the potential for leftover qdisc.

Suggested implementation:

```python
def _attach_tc_bpf(interface: str, bpf) -> None:
    """Attach BPF programs to tc (traffic control)

    If filter attachment fails after adding qdisc, attempts to remove the qdisc to avoid resource leaks.

```

```python
def _attach_tc_bpf(interface: str, bpf) -> None:
    """Attach BPF programs to tc (traffic control)

    If filter attachment fails after adding qdisc, attempts to remove the qdisc to avoid resource leaks.
    """
    import subprocess

    try:
        # Add qdisc
        subprocess.run(["tc", "qdisc", "add", "dev", interface, "clsact"], check=True)
        try:
            # Attach filter
            subprocess.run([
                "tc", "filter", "add", "dev", interface, "ingress",
                "bpf", "da", "obj", bpf.filename, "sec", "classifier"
            ], check=True)
        except Exception as filter_exc:
            # Cleanup qdisc if filter attachment fails
            subprocess.run(["tc", "qdisc", "del", "dev", interface, "clsact"])
            raise filter_exc
    except Exception as exc:
        logger.error(f"Failed to attach tc BPF: {exc}")
        raise

```
</issue_to_address>

### Comment 4
<location> `tests/test_ebpf_generation.py:262-273` </location>
<code_context>
+class TestIntegration:
+    """Integration tests for the template engine"""
+
+    def test_apply_multiple_operations(self):
+        """Test applying multiple operations to same program"""
+        program = BPFProgram("multi")
+
+        # Apply redirect operation
+        op1 = PacketRedirectOperation(server_port=27015, bind_port=27016)
+        program.apply_operation(op1)
+
+        # Could apply more operations here in the future
+        # For now, just verify it works
+        code = program.render()
+        assert len(code) > 0
+
+    def test_typical_usage_pattern(self):
</code_context>

<issue_to_address>
**suggestion (testing):** Test coverage for multiple operations is minimal.

Expand the test to apply multiple distinct PacketRedirectOperation instances with varying ports/IPs, and verify that the generated code supports all mappings.

```suggestion
    def test_apply_multiple_operations(self):
        """Test applying multiple operations to same program"""
        program = BPFProgram("multi")

        # Apply multiple distinct redirect operations
        op1 = PacketRedirectOperation(server_port=27015, bind_port=27016)
        op2 = PacketRedirectOperation(server_port=27017, bind_port=27018, bind_ip="10.0.0.1")
        op3 = PacketRedirectOperation(server_port=27019, bind_port=27020, bind_ip="192.168.100.100")
        program.apply_operation(op1)
        program.apply_operation(op2)
        program.apply_operation(op3)

        code = program.render()
        assert len(code) > 0

        # Verify that all mappings are present in the generated code
        assert "27015" in code and "27016" in code
        assert "27017" in code and "27018" in code and "10.0.0.1" in code
        assert "27019" in code and "27020" in code and "192.168.100.100" in code
```
</issue_to_address>

### Comment 5
<location> `tests/test_ebpf_compilation.py:122-131` </location>
<code_context>
+    def test_multiple_ports_same_program(self):
</code_context>

<issue_to_address>
**suggestion (testing):** Test for multiple port mappings does not cover IP+port mode.

Please add a test for multiple IP+port mappings to ensure correct handling of addr_map entries in this mode.
</issue_to_address>

### Comment 6
<location> `tests/test_ebpf_compilation.py:194-202` </location>
<code_context>
+    assert "int outgoing" in bpf_code
+
+
+def test_bcc_availability_reporting():
+    """Report whether BCC is available for testing"""
+    if BCC_AVAILABLE:
+        from bcc import BPF
+
+        print(f"\n✓ BCC is available: {BPF.__file__}")
+    else:
+        print("\n✗ BCC is NOT available - compilation tests will be skipped")
+        print("  Install with: apt-get install python3-bpfcc")
</code_context>

<issue_to_address>
**nitpick (testing):** BCC availability reporting test is not a real test.

This function should include assertions to validate behavior, or be relocated if its purpose is informational rather than testing.
</issue_to_address>

### Comment 7
<location> `tests/test_ebpf_generation.py:240-249` </location>
<code_context>
+        assert "data_end" in code
+        assert "TC_ACT_OK" in code
+
+    def test_complete_generated_code(self):
+        """Test that complete valid C code is generated"""
+        program = BPFProgram("redirect")
+        op = PacketRedirectOperation(
+            server_port=27015,
+            bind_port=27816
+        )
+        op.apply(program)
+        code = program.render()
+
+        # Should be valid C-like code
+        assert code.count("{") == code.count("}")  # Balanced braces
+        assert "#include" in code
+        assert "return" in code
+
+        # Should not be empty
+        assert len(code) > 100
+
+
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding a test that parses the generated code with a C parser.

Parsing the generated code with a C parser or linter would help identify subtle syntax issues that basic structural checks may miss.
</issue_to_address>

### Comment 8
<location> `tests/test_ebpf_compilation.py:196-202` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 9
<location> `source_query_proxy/ebpf/elements.py:41-42` </location>
<code_context>
    def render(self) -> str:
        """Generate C struct definition"""
        lines = [f"struct {self.name} {{"]
        for field_type, field_name in self.fields:
            lines.append(f"    {field_type} {field_name};")
        lines.append("};")
        return "\n".join(lines)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))

```suggestion
        lines.extend(
            f"    {field_type} {field_name};"
            for field_type, field_name in self.fields
        )
```
</issue_to_address>

### Comment 10
<location> `source_query_proxy/ebpf/elements.py:120` </location>
<code_context>
    def render(self) -> str:
        """Generate complete function with signature and body"""
        # Build parameter list
        param_strs = []
        for param_type, param_name in self.params:
            param_strs.append(f"{param_type} {param_name}")
        params_str = ", ".join(param_strs)

        # Build function signature
        signature = f"{self.return_type} {self.name}({params_str})"

        # Build function body
        lines = [signature + " {"]
        for code_block in self.body_parts:
            # Add code block (already should be indented)
            lines.append(code_block)
        lines.append("}")

        return "\n".join(lines)

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
- Simplify generator expression ([`simplify-generator`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-generator/))
</issue_to_address>

### Comment 11
<location> `source_query_proxy/ebpf/program.py:92` </location>
<code_context>
    def render(self) -> str:
        """Generate complete BPF C code

        Returns:
            Complete C code ready for BCC compilation
        """
        parts = []

        # Includes
        for inc in self.includes:
            parts.append(f"#include {inc}")
        parts.append("")

        # Structs
        if self.structs:
            for struct in self.structs:
                parts.append(struct.render())
                parts.append("")

        # Maps
        if self.maps:
            for map in self.maps:
                parts.append(map.render())
            parts.append("")

        # Functions
        for func in self.functions:
            parts.append(func.render())
            parts.append("")

        return "\n".join(parts)

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Merge consecutive list appends into a single extend [×2] ([`merge-list-appends-into-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/merge-list-appends-into-extend/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
</issue_to_address>

### Comment 12
<location> `source_query_proxy/epbf.py:54-60` </location>
<code_context>
def _get_default_interface():
    """Get default network interface"""
    ip = pyroute2.IPRoute()
    default_routes = ip.get_default_routes()
    if default_routes:
        idx = default_routes[0].get_attr('RTA_OIF')
        interface = ip.get_links(idx)[0].get_attr('IFLA_IFNAME')
        return interface
    raise RuntimeError("Cannot determine default network interface")

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
</issue_to_address>

### Comment 13
<location> `tests/test_ebpf_compilation.py:42` </location>
<code_context>
    def test_port_only_redirect_compiles(self):
        """Test that port-only redirect code compiles with BCC"""
        # Generate BPF program
        program = BPFProgram("test_port_redirect")
        op = PacketRedirectOperation(
            server_port=27015, bind_port=27016, use_ipport_key=False
        )
        program.apply_operation(op)
        bpf_code = program.render()

        # Verify code is not empty
        assert len(bpf_code) > 100
        assert "int incoming" in bpf_code
        assert "int outgoing" in bpf_code

        # Try to compile with BCC
        try:
            b = BPF(text=bpf_code, debug=0)

            # Verify functions are loadable (but don't actually load them)
            # This checks that the BPF bytecode is valid
            assert hasattr(b, "load_func")

            # Check that our functions exist in the compiled program
            incoming_fn = b.load_func("incoming", BPF.SCHED_CLS)
            outgoing_fn = b.load_func("outgoing", BPF.SCHED_CLS)

            assert incoming_fn is not None
            assert outgoing_fn is not None

            # Check that maps are created
            port_map = b.get_table("port_map")
            assert port_map is not None

        except Exception as e:
            pytest.fail(f"BPF compilation failed: {e}\n\nGenerated code:\n{bpf_code}")

</code_context>

<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>

### Comment 14
<location> `tests/test_ebpf_compilation.py:83` </location>
<code_context>
    def test_ipport_redirect_compiles(self):
        """Test that IP+port redirect code compiles with BCC"""
        # Generate BPF program
        program = BPFProgram("test_ipport_redirect")
        op = PacketRedirectOperation(
            server_port=27015,
            bind_port=27016,
            bind_ip="192.168.1.1",
            use_ipport_key=True,
        )
        program.apply_operation(op)
        bpf_code = program.render()

        # Verify code is not empty
        assert len(bpf_code) > 100
        assert "struct addr_key_t" in bpf_code
        assert "int incoming" in bpf_code
        assert "int outgoing" in bpf_code

        # Try to compile with BCC
        try:
            b = BPF(text=bpf_code, debug=0)

            # Verify functions are loadable
            incoming_fn = b.load_func("incoming", BPF.SCHED_CLS)
            outgoing_fn = b.load_func("outgoing", BPF.SCHED_CLS)

            assert incoming_fn is not None
            assert outgoing_fn is not None

            # Check that addr_map is created
            addr_map = b.get_table("addr_map")
            assert addr_map is not None

        except Exception as e:
            pytest.fail(f"BPF compilation failed: {e}\n\nGenerated code:\n{bpf_code}")

</code_context>

<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>

### Comment 15
<location> `tests/test_ebpf_compilation.py:135` </location>
<code_context>
    def test_multiple_ports_same_program(self):
        """Test that we can create a program handling multiple port mappings"""
        # This tests the approach we'll use in epbf.py
        program = BPFProgram("multi_port")

        # For now, just test that one operation works
        # In the future, we need to support multiple mappings in one program
        op = PacketRedirectOperation(server_port=27015, bind_port=27016)
        program.apply_operation(op)

        bpf_code = program.render()

        try:
            b = BPF(text=bpf_code, debug=0)

            # Test that we can populate the map with multiple entries
            port_map = b.get_table("port_map")

            # Add multiple port mappings
            port_map[27015] = 27016
            port_map[27016] = 27017
            port_map[27017] = 27018

            # Verify mappings
            assert port_map[27015].value == 27016
            assert port_map[27016].value == 27017
            assert port_map[27017].value == 27018

        except Exception as e:
            pytest.fail(f"Multi-port test failed: {e}")

</code_context>

<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>

### Comment 16
<location> `tests/test_ebpf_generation.py:45` </location>
<code_context>
    def test_simple_hash_map(self):
        """Test generating a BPF_HASH map"""
        map = BPFMap("port_map", "u16", "u16", max_entries=1024)
        code = map.render()
        assert code == "BPF_HASH(port_map, u16, u16, 1024);"

</code_context>

<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))

<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:

```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.

How can you solve this?

Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>

### Comment 17
<location> `tests/test_ebpf_generation.py:51` </location>
<code_context>
    def test_struct_key_map(self):
        """Test map with struct key type"""
        map = BPFMap("addr_map", "struct addr_key_t", "u16", max_entries=2048)
        code = map.render()
        assert code == "BPF_HASH(addr_map, struct addr_key_t, u16, 2048);"

</code_context>

<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))

<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:

```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.

How can you solve this?

Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>

### Comment 18
<location> `tests/test_ebpf_generation.py:57` </location>
<code_context>
    def test_default_max_entries(self):
        """Test default max_entries value"""
        map = BPFMap("test_map", "u32", "u32")
        code = map.render()
        assert "10240" in code  # Default max_entries

</code_context>

<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))

<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:

```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.

How can you solve this?

Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 262 to 273
def test_apply_multiple_operations(self):
"""Test applying multiple operations to same program"""
program = BPFProgram("multi")

# Apply redirect operation
op1 = PacketRedirectOperation(server_port=27015, bind_port=27016)
program.apply_operation(op1)

# Could apply more operations here in the future
# For now, just verify it works
code = program.render()
assert len(code) > 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Test coverage for multiple operations is minimal.

Expand the test to apply multiple distinct PacketRedirectOperation instances with varying ports/IPs, and verify that the generated code supports all mappings.

Suggested change
def test_apply_multiple_operations(self):
"""Test applying multiple operations to same program"""
program = BPFProgram("multi")
# Apply redirect operation
op1 = PacketRedirectOperation(server_port=27015, bind_port=27016)
program.apply_operation(op1)
# Could apply more operations here in the future
# For now, just verify it works
code = program.render()
assert len(code) > 0
def test_apply_multiple_operations(self):
"""Test applying multiple operations to same program"""
program = BPFProgram("multi")
# Apply multiple distinct redirect operations
op1 = PacketRedirectOperation(server_port=27015, bind_port=27016)
op2 = PacketRedirectOperation(server_port=27017, bind_port=27018, bind_ip="10.0.0.1")
op3 = PacketRedirectOperation(server_port=27019, bind_port=27020, bind_ip="192.168.100.100")
program.apply_operation(op1)
program.apply_operation(op2)
program.apply_operation(op3)
code = program.render()
assert len(code) > 0
# Verify that all mappings are present in the generated code
assert "27015" in code and "27016" in code
assert "27017" in code and "27018" in code and "10.0.0.1" in code
assert "27019" in code and "27020" in code and "192.168.100.100" in code

Update epbf.py to match sqredirect's exact BPF attachment method,
copied from https://github.com/sqproxy/sqredirect.

Key changes to match sqredirect:

1. **BPF mode**: Use BPF.SCHED_ACT instead of BPF.SCHED_CLS
   - sqredirect uses scheduler action mode, not classifier mode

2. **Qdisc types**:
   - Incoming: Use "ingress" qdisc (not "clsact")
   - Outgoing: Use "sfq" (Stochastic Fair Queuing) qdisc
   - This matches sqredirect's exact setup

3. **Filter type**: Use u32 filters with BPF actions
   - Instead of direct "bpf" filters
   - Add action dict with kind="bpf", fd, name, action="ok"
   - Use protocol=ETH_P_ALL for all protocols
   - Add target and keys for u32 matching

4. **Error handling**: Catch NetlinkError properly
   - Check for 'File exists' when adding qdiscs
   - Allow idempotent attachment (can run multiple times)
   - Proper error propagation for other errors

5. **Cleanup logic**:
   - Add _cleanup_tc() function
   - Register cleanup with atexit
   - Register signal handlers (SIGTERM, SIGINT)
   - Safe cleanup mode to ignore 'Invalid argument' errors
   - Clean up both ingress and sfq qdiscs

6. **IPRoute management**:
   - Pass IPRoute instance to _attach_tc_bpf
   - Return (ifindex, fn_incoming, fn_outgoing) for cleanup
   - Close IPRoute on cleanup

This ensures sqproxy uses the exact same proven attachment method as
sqredirect, reducing risk of compatibility issues.

Related: #131
Address all code review feedback from Sourcery AI:

**epbf.py fixes:**
- Fix resource leaks by adding context managers to _get_addr_interface() and _get_default_interface()
- Replace AssertionError with ConfigurationError for configuration issues
- Add comprehensive error handling to _populate_maps() with per-mapping try/except
- Improve cleanup sequence with cleanup_done flag to prevent double cleanup
- Add explicit cleanup on exceptions (KeyboardInterrupt, CancelledError, etc.)
- Ensure ipr.close() happens in finally block

**test_ebpf_compilation.py improvements:**
- Extract _compile_and_verify_bpf() helper function to reduce code duplication
- Change BPF mode from SCHED_CLS to SCHED_ACT (matching sqredirect)
- Simplify test methods by using helper function

**test_ebpf_generation.py enhancements:**
- Expand test_apply_multiple_operations() with better assertions
- Add test_multiple_ipport_mappings() for testing IP+port addr_map mode
- Verify that single program can handle multiple mappings via runtime map population

All changes maintain backward compatibility and follow project code style.
@spumer
Copy link
Collaborator Author

spumer commented Nov 8, 2025

@sourcery-ai review new changes

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `source_query_proxy/epbf.py:147-156` </location>
<code_context>
+def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
</code_context>

<issue_to_address>
**suggestion:** Error handling in map population may mask underlying issues.

Wrapping all exceptions in RuntimeError hides the original error details. Log the full traceback or re-raise the original exception after logging to improve debuggability.

Suggested implementation:

```python
import logging
import traceback

def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
    """Populate BPF maps with port mappings

    Args:
        bpf: BCC BPF instance
        use_ipport_key: Whether to use IP+port or port-only map
        mappings: List of (server_port, bind_port, bind_ip) tuples

    Raises:
        Exception: If map population fails
    """

```

```python
def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
    """Populate BPF maps with port mappings

    Args:
        bpf: BCC BPF instance
        use_ipport_key: Whether to use IP+port or port-only map
        mappings: List of (server_port, bind_port, bind_ip) tuples

    Raises:
        Exception: If map population fails
    """
    try:
        # ... existing map population logic ...
        pass  # Replace with actual implementation
    except Exception as exc:
        logging.error("Error populating BPF maps:\n%s", traceback.format_exc())
        raise

```

Replace the `pass` statement with the actual map population logic from your implementation. 
If you previously had a `try/except` block that raised `RuntimeError`, remove that and use the new pattern above.
Ensure that the logging configuration is set up elsewhere in your codebase to capture error logs.
</issue_to_address>

### Comment 2
<location> `source_query_proxy/epbf.py:194-203` </location>
<code_context>
+def _attach_tc_bpf(interface: str, bpf, ipr) -> Tuple[int, any, any]:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** No explicit cleanup of filters or qdiscs on failure.

If an error occurs after adding qdiscs or filters, partial state may remain. Please add rollback logic or clarify cleanup responsibilities.

Suggested implementation:

```python
def _attach_tc_bpf(interface: str, bpf, ipr) -> Tuple[int, any, any]:
    """Attach BPF programs to tc (traffic control)

    Uses the same approach as sqredirect:
    - ingress qdisc for incoming traffic
    - sfq qdisc for outgoing traffic
    - u32 filters with BPF actions

    Args:
        interface: Network interface name
        bpf: BCC BPF instance with compiled programs

    Returns:
        Tuple of (ingress_qdisc_handle, egress_qdisc_handle, filter_handles)

    Raises:
        RuntimeError if setup fails, with cleanup of partial state.

```

```python
def _attach_tc_bpf(interface: str, bpf, ipr) -> Tuple[int, any, any]:
    """Attach BPF programs to tc (traffic control)

    Uses the same approach as sqredirect:
    - ingress qdisc for incoming traffic
    - sfq qdisc for outgoing traffic
    - u32 filters with BPF actions

    Args:
        interface: Network interface name
        bpf: BCC BPF instance with compiled programs

    Returns:
        Tuple of (ingress_qdisc_handle, egress_qdisc_handle, filter_handles)

    Raises:
        RuntimeError if setup fails, with cleanup of partial state.
    """
    ingress_qdisc_handle = None
    egress_qdisc_handle = None
    filter_handles = []

    try:
        # Example: Add ingress qdisc
        ingress_qdisc_handle = ipr.tc("add", "ingress", interface)
        # Example: Add egress qdisc
        egress_qdisc_handle = ipr.tc("add", "sfq", interface)
        # Example: Attach filters
        filter_handle = ipr.tc("add-filter", "u32", interface, bpf=bpf)
        filter_handles.append(filter_handle)
        # ... more setup as needed ...
        return ingress_qdisc_handle, egress_qdisc_handle, filter_handles
    except Exception as e:
        # Cleanup partial state
        if filter_handles:
            for fh in filter_handles:
                try:
                    ipr.tc("del-filter", "u32", interface, handle=fh)
                except Exception as cleanup_e:
                    logger.warning(f"Failed to cleanup filter {fh}: {cleanup_e}")
        if ingress_qdisc_handle:
            try:
                ipr.tc("del", "ingress", interface)
            except Exception as cleanup_e:
                logger.warning(f"Failed to cleanup ingress qdisc: {cleanup_e}")
        if egress_qdisc_handle:
            try:
                ipr.tc("del", "sfq", interface)
            except Exception as cleanup_e:
                logger.warning(f"Failed to cleanup egress qdisc: {cleanup_e}")
        logger.error(f"Failed to attach tc bpf: {e}")
        raise RuntimeError("Failed to attach tc bpf, partial state cleaned up") from e

```

- You may need to adjust the example ipr.tc calls to match your actual API and arguments.
- If you have more than one filter or qdisc type, track and clean up each accordingly.
- Ensure logger is imported and available in this scope.
</issue_to_address>

### Comment 3
<location> `source_query_proxy/epbf.py:314-323` </location>
<code_context>
+async def run_ebpf_redirection():
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Signal handler uses sys.exit(0), which may bypass asyncio cleanup.

Consider replacing sys.exit(0) with asyncio cancellation or allowing the main loop to exit after cleanup to ensure resources are properly released.

Suggested implementation:

```python
import asyncio
import signal

async def run_ebpf_redirection():
    """Main entry point for eBPF redirection

    Generates BPF program, compiles it, attaches to network interface,
    and keeps it running. Registers cleanup handlers for graceful shutdown.
    """
    shutdown_event = asyncio.Event()

    def handle_signal():
        logger.info("Received shutdown signal, cancelling tasks and cleaning up.")
        shutdown_event.set()

    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, handle_signal)

    try:
        # ... your main logic here ...
        await shutdown_event.wait()
        # Perform any necessary cleanup here
    finally:
        # Remove signal handlers
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.remove_signal_handler(sig)
        logger.info("Cleanup complete, exiting gracefully.")

```

If there are other places in the file (or project) where `sys.exit(0)` is called in response to signals or shutdown, those should also be replaced with logic that sets the shutdown event or cancels tasks, allowing the event loop to exit cleanly after cleanup.
You may need to move your main logic inside the `try` block and ensure all resources are released in the `finally` block.
</issue_to_address>

### Comment 4
<location> `tests/test_ebpf_compilation.py:198-204` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 5
<location> `source_query_proxy/ebpf/elements.py:41-42` </location>
<code_context>
    def render(self) -> str:
        """Generate C struct definition"""
        lines = [f"struct {self.name} {{"]
        for field_type, field_name in self.fields:
            lines.append(f"    {field_type} {field_name};")
        lines.append("};")
        return "\n".join(lines)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))

```suggestion
        lines.extend(
            f"    {field_type} {field_name};"
            for field_type, field_name in self.fields
        )
```
</issue_to_address>

### Comment 6
<location> `source_query_proxy/ebpf/elements.py:120` </location>
<code_context>
    def render(self) -> str:
        """Generate complete function with signature and body"""
        # Build parameter list
        param_strs = []
        for param_type, param_name in self.params:
            param_strs.append(f"{param_type} {param_name}")
        params_str = ", ".join(param_strs)

        # Build function signature
        signature = f"{self.return_type} {self.name}({params_str})"

        # Build function body
        lines = [signature + " {"]
        for code_block in self.body_parts:
            # Add code block (already should be indented)
            lines.append(code_block)
        lines.append("}")

        return "\n".join(lines)

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
- Simplify generator expression ([`simplify-generator`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-generator/))
</issue_to_address>

### Comment 7
<location> `source_query_proxy/ebpf/program.py:92` </location>
<code_context>
    def render(self) -> str:
        """Generate complete BPF C code

        Returns:
            Complete C code ready for BCC compilation
        """
        parts = []

        # Includes
        for inc in self.includes:
            parts.append(f"#include {inc}")
        parts.append("")

        # Structs
        if self.structs:
            for struct in self.structs:
                parts.append(struct.render())
                parts.append("")

        # Maps
        if self.maps:
            for map in self.maps:
                parts.append(map.render())
            parts.append("")

        # Functions
        for func in self.functions:
            parts.append(func.render())
            parts.append("")

        return "\n".join(parts)

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Merge consecutive list appends into a single extend [×2] ([`merge-list-appends-into-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/merge-list-appends-into-extend/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
</issue_to_address>

### Comment 8
<location> `source_query_proxy/epbf.py:55-59` </location>
<code_context>
def _get_default_interface():
    """Get default network interface"""
    with pyroute2.IPRoute() as ipr:
        default_routes = ipr.get_default_routes()
        if default_routes:
            idx = default_routes[0].get_attr('RTA_OIF')
            interface = ipr.get_links(idx)[0].get_attr('IFLA_IFNAME')
            return interface
    raise config.ConfigurationError("Cannot determine default network interface")

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
</issue_to_address>

### Comment 9
<location> `source_query_proxy/epbf.py:301-311` </location>
<code_context>
def _cleanup_tc(ipr, ifindex: int, safe: bool = False):
    """Cleanup tc qdiscs

    Args:
        ipr: pyroute2 IPRoute instance
        ifindex: Interface index
        safe: If True, ignore 'Invalid argument' errors
    """
    from pyroute2.netlink.exceptions import NetlinkError

    try:
        ipr.tc("del", "ingress", ifindex, "ffff:")
        logger.debug("Removed ingress qdisc")
    except NetlinkError as exc:
        if not safe or exc.args[1] != 'Invalid argument':
            logger.error(f"Failed to remove ingress qdisc: {exc}")
            if not safe:
                raise

    try:
        ipr.tc("del", "sfq", ifindex, "1:")
        logger.debug("Removed sfq qdisc")
    except NetlinkError as exc:
        if not safe or exc.args[1] != 'Invalid argument':
            logger.error(f"Failed to remove sfq qdisc: {exc}")
            if not safe:
                raise

</code_context>

<issue_to_address>
**issue (code-quality):** Hoist conditional out of nested conditional [×2] ([`hoist-if-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-if-from-if/))
</issue_to_address>

### Comment 10
<location> `tests/test_ebpf_compilation.py:137` </location>
<code_context>
    def test_multiple_ports_same_program(self):
        """Test that we can create a program handling multiple port mappings"""
        # This tests the approach we'll use in epbf.py
        program = BPFProgram("multi_port")

        # For now, just test that one operation works
        # In the future, we need to support multiple mappings in one program
        op = PacketRedirectOperation(server_port=27015, bind_port=27016)
        program.apply_operation(op)

        bpf_code = program.render()

        try:
            b = BPF(text=bpf_code, debug=0)

            # Test that we can populate the map with multiple entries
            port_map = b.get_table("port_map")

            # Add multiple port mappings
            port_map[27015] = 27016
            port_map[27016] = 27017
            port_map[27017] = 27018

            # Verify mappings
            assert port_map[27015].value == 27016
            assert port_map[27016].value == 27017
            assert port_map[27017].value == 27018

        except Exception as e:
            pytest.fail(f"Multi-port test failed: {e}")

</code_context>

<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>

### Comment 11
<location> `tests/test_ebpf_generation.py:45` </location>
<code_context>
    def test_simple_hash_map(self):
        """Test generating a BPF_HASH map"""
        map = BPFMap("port_map", "u16", "u16", max_entries=1024)
        code = map.render()
        assert code == "BPF_HASH(port_map, u16, u16, 1024);"

</code_context>

<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))

<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:

```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.

How can you solve this?

Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>

### Comment 12
<location> `tests/test_ebpf_generation.py:51` </location>
<code_context>
    def test_struct_key_map(self):
        """Test map with struct key type"""
        map = BPFMap("addr_map", "struct addr_key_t", "u16", max_entries=2048)
        code = map.render()
        assert code == "BPF_HASH(addr_map, struct addr_key_t, u16, 2048);"

</code_context>

<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))

<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:

```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.

How can you solve this?

Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>

### Comment 13
<location> `tests/test_ebpf_generation.py:57` </location>
<code_context>
    def test_default_max_entries(self):
        """Test default max_entries value"""
        map = BPFMap("test_map", "u32", "u32")
        code = map.render()
        assert "10240" in code  # Default max_entries

</code_context>

<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))

<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:

```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.

How can you solve this?

Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

… review

Major Changes:
- **EBPFRedirector class**: Async lifecycle manager for eBPF redirection
  - Supports start(), stop(), restart() operations
  - Async context manager (__aenter__, __aexit__)
  - Proper state management and idempotent cleanup
  - No reliance on atexit/signal handlers (application-level control)
  - Enables dynamic reconfiguration and on-the-fly reload

- **run_ebpf_redirection()**: Refactored to use EBPFRedirector
  - Now a thin wrapper for backward compatibility
  - Uses asyncio.Event for clean shutdown
  - Proper signal handling without sys.exit()

Code Review Fixes:
- **epbf.py**:
  - Add traceback logging to _populate_maps (don't mask errors)
  - Add rollback logic to _attach_tc_bpf on failure
  - Use named expression in _get_default_interface
  - Hoist conditionals in _cleanup_tc for clarity

- **elements.py**:
  - Use list.extend() instead of for-append loops
  - Use generator expressions for cleaner code

- **program.py**:
  - Use list.extend() with generator expressions
  - Avoid intermediate variables

- **test_ebpf_generation.py**:
  - Fix builtin shadowing: rename `map` to `bpf_map`

Benefits for sqproxy:
- Clean integration with asyncio event loop
- Supports config reload without restart
- Better resource management
- Testable and reusable
- Production-ready async architecture

Example usage:
    # As context manager
    async with EBPFRedirector() as redirector:
        await asyncio.Event().wait()

    # Manual lifecycle
    redirector = EBPFRedirector()
    await redirector.start()
    await redirector.restart()  # Reload config
    await redirector.stop()
@spumer
Copy link
Collaborator Author

spumer commented Nov 8, 2025

@sourcery-ai review new changes

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Consider splitting the large epbf.py into smaller modules (e.g. separate helpers, the EBPFRedirector class and CLI entrypoints) to improve readability and maintainability.
  • Extract the rollback and cleanup logic in _attach_tc_bpf into dedicated helper functions to simplify the error-handling paths and make the flow easier to follow.
  • Add return-type and argument type annotations to the internal helper functions in epbf.py to improve clarity and enable stronger static analysis support.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider splitting the large epbf.py into smaller modules (e.g. separate helpers, the EBPFRedirector class and CLI entrypoints) to improve readability and maintainability.
- Extract the rollback and cleanup logic in _attach_tc_bpf into dedicated helper functions to simplify the error-handling paths and make the flow easier to follow.
- Add return-type and argument type annotations to the internal helper functions in epbf.py to improve clarity and enable stronger static analysis support.

## Individual Comments

### Comment 1
<location> `tests/test_ebpf_compilation.py:198-204` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 2
<location> `source_query_proxy/epbf.py:324` </location>
<code_context>
def _attach_tc_bpf(interface: str, bpf, ipr) -> Tuple[int, any, any]:
    """Attach BPF programs to tc (traffic control)

    Uses the same approach as sqredirect:
    - ingress qdisc for incoming traffic
    - sfq qdisc for outgoing traffic
    - u32 filters with BPF actions

    Args:
        interface: Network interface name
        bpf: BCC BPF instance with compiled programs
        ipr: pyroute2 IPRoute instance

    Returns:
        (ifindex, fn_incoming, fn_outgoing) for cleanup

    Raises:
        RuntimeError: If setup fails, with automatic rollback of partial state
    """
    from pyroute2.netlink.exceptions import NetlinkError
    from pyroute2.netlink.rtnl import protocols

    BPF = _import_bcc()

    # Load BPF functions (SCHED_ACT mode like sqredirect)
    fn_incoming = bpf.load_func("incoming", BPF.SCHED_ACT)
    fn_outgoing = bpf.load_func("outgoing", BPF.SCHED_ACT)

    logger.info(f"Loaded BPF functions: incoming={fn_incoming.name}, outgoing={fn_outgoing.name}")

    # Get interface index
    ifindex = ipr.link_lookup(ifname=interface)[0]
    logger.debug(f"Interface {interface} has index {ifindex}")

    # Track what we've added for rollback on failure
    ingress_added = False
    ingress_filter_added = False
    sfq_added = False

    try:
        # Setup incoming traffic hook (ingress qdisc)
        try:
            ipr.tc("add", "ingress", ifindex, "ffff:")
            ingress_added = True
            logger.debug("Added ingress qdisc")
        except NetlinkError as exc:
            if exc.args[1] != 'File exists':
                raise
            logger.debug("Ingress qdisc already exists")

        # Attach incoming BPF filter
        action_incoming = {
            "kind": "bpf",
            "fd": fn_incoming.fd,
            "name": fn_incoming.name,
            "action": "ok",
        }
        ipr.tc(
            "add-filter",
            "u32",
            ifindex,
            ":1",
            parent="ffff:",
            action=[action_incoming],
            protocol=protocols.ETH_P_ALL,
            target=0x10000,
            keys=["0x0/0x0+0"],
        )
        ingress_filter_added = True
        logger.info(f"✓ Attached incoming BPF filter to {interface} (ingress)")

        # Setup outgoing traffic hook (sfq qdisc)
        try:
            ipr.tc("add", "sfq", ifindex, "1:")
            sfq_added = True
            logger.debug("Added sfq qdisc")
        except NetlinkError as exc:
            if exc.args[1] != 'File exists':
                raise
            logger.debug("SFQ qdisc already exists")

        # Attach outgoing BPF filter
        action_outgoing = {
            "kind": "bpf",
            "fd": fn_outgoing.fd,
            "name": fn_outgoing.name,
            "action": "ok",
        }
        ipr.tc(
            "add-filter",
            "u32",
            ifindex,
            ":2",
            parent="1:",
            action=[action_outgoing],
            target=0x10000,
            keys=["0x0/0x0+0"],
        )
        logger.info(f"✓ Attached outgoing BPF filter to {interface} (egress)")

        return ifindex, fn_incoming, fn_outgoing

    except Exception as e:
        # Rollback partial state on failure
        logger.error(f"Failed to attach tc bpf, rolling back partial state: {e}")

        # Clean up in reverse order of creation
        if sfq_added:
            try:
                ipr.tc("del", "sfq", ifindex, "1:")
                logger.debug("Rolled back sfq qdisc")
            except Exception as cleanup_e:
                logger.warning(f"Failed to rollback sfq qdisc: {cleanup_e}")

        if ingress_filter_added:
            try:
                ipr.tc("del-filter", "u32", ifindex, ":1", parent="ffff:")
                logger.debug("Rolled back ingress filter")
            except Exception as cleanup_e:
                logger.warning(f"Failed to rollback ingress filter: {cleanup_e}")

        if ingress_added:
            try:
                ipr.tc("del", "ingress", ifindex, "ffff:")
                logger.debug("Rolled back ingress qdisc")
            except Exception as cleanup_e:
                logger.warning(f"Failed to rollback ingress qdisc: {cleanup_e}")

        raise RuntimeError(f"Failed to attach tc bpf, partial state rolled back") from e

</code_context>

<issue_to_address>
**suggestion (code-quality):** Replace f-string with no interpolated values with string ([`remove-redundant-fstring`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-fstring/))

```suggestion
        raise RuntimeError("Failed to attach tc bpf, partial state rolled back") from e
```
</issue_to_address>

### Comment 3
<location> `source_query_proxy/epbf.py:341` </location>
<code_context>
def _cleanup_tc(ipr, ifindex: int, safe: bool = False):
    """Cleanup tc qdiscs

    Args:
        ipr: pyroute2 IPRoute instance
        ifindex: Interface index
        safe: If True, ignore 'Invalid argument' errors
    """
    from pyroute2.netlink.exceptions import NetlinkError

    try:
        ipr.tc("del", "ingress", ifindex, "ffff:")
        logger.debug("Removed ingress qdisc")
    except NetlinkError as exc:
        if safe and exc.args[1] == 'Invalid argument':
            # Silently ignore if qdisc doesn't exist
            pass
        else:
            logger.error(f"Failed to remove ingress qdisc: {exc}")
            if not safe:
                raise

    try:
        ipr.tc("del", "sfq", ifindex, "1:")
        logger.debug("Removed sfq qdisc")
    except NetlinkError as exc:
        if safe and exc.args[1] == 'Invalid argument':
            # Silently ignore if qdisc doesn't exist
            pass
        else:
            logger.error(f"Failed to remove sfq qdisc: {exc}")
            if not safe:
                raise

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Swap if/else to remove empty if body [×2] ([`remove-pass-body`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-pass-body/))
- Hoist conditional out of nested conditional [×2] ([`hoist-if-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-if-from-if/))
</issue_to_address>

### Comment 4
<location> `tests/test_ebpf_compilation.py:137` </location>
<code_context>
    def test_multiple_ports_same_program(self):
        """Test that we can create a program handling multiple port mappings"""
        # This tests the approach we'll use in epbf.py
        program = BPFProgram("multi_port")

        # For now, just test that one operation works
        # In the future, we need to support multiple mappings in one program
        op = PacketRedirectOperation(server_port=27015, bind_port=27016)
        program.apply_operation(op)

        bpf_code = program.render()

        try:
            b = BPF(text=bpf_code, debug=0)

            # Test that we can populate the map with multiple entries
            port_map = b.get_table("port_map")

            # Add multiple port mappings
            port_map[27015] = 27016
            port_map[27016] = 27017
            port_map[27017] = 27018

            # Verify mappings
            assert port_map[27015].value == 27016
            assert port_map[27016].value == 27017
            assert port_map[27017].value == 27018

        except Exception as e:
            pytest.fail(f"Multi-port test failed: {e}")

</code_context>

<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Code Quality Fixes:
- **epbf.py**:
  - Remove redundant f-string in RuntimeError
  - Swap conditionals in _cleanup_tc to remove empty if bodies
  - Add return type annotations to all helper functions:
    - _get_addr_interface() -> Optional[str]
    - _get_default_interface() -> str
    - _populate_maps() -> None
    - _cleanup_tc() -> None
  - Extract rollback logic into _rollback_tc_bpf() helper
    - Cleaner error handling in _attach_tc_bpf
    - Easier to test and maintain

- **test_ebpf_compilation.py**:
  - Remove try/except conditionals from tests (per Google SWE guidelines)
  - Let pytest handle exceptions directly for clearer test intent
  - Affected tests:
    - test_generated_code_syntax()
    - test_multiple_ports_same_program()
    - test_various_port_combinations()

Benefits:
- Better type checking and IDE support
- Cleaner error handling paths
- More maintainable code structure
- Tests are now "trivially correct upon inspection"
- Follows Google's testing best practices

All changes maintain backward compatibility and improve code quality.
Address code review comment to improve maintainability by splitting
the large epbf.py file into smaller, single-responsibility modules.

New Structure:
- **source_query_proxy/ebpf/runtime.py** (230 lines)
  - Network interface helpers (get_addr_interface, get_default_interface)
  - Server configuration collection (collect_server_mappings)
  - BPF program generation (generate_bpf_program)
  - Map population (populate_maps, ip_to_int)
  - BCC lazy import (import_bcc)

- **source_query_proxy/ebpf/tc.py** (200 lines)
  - TC attachment logic (attach_tc_bpf)
  - Rollback on failure (rollback_tc_bpf)
  - TC cleanup (cleanup_tc)
  - All traffic control operations in one place

- **source_query_proxy/ebpf/redirector.py** (210 lines)
  - EBPFRedirector class (async lifecycle manager)
  - start(), stop(), restart() methods
  - Async context manager support
  - Clean separation of concerns

- **source_query_proxy/epbf.py** (105 lines, was 630 lines)
  - Main entry point (run_ebpf_redirection)
  - Backward compatibility (get_ebpf_program_run_args)
  - Re-exports for compatibility
  - Clear documentation of new structure

Benefits:
- 84% reduction in main file size (630 → 105 lines)
- Clear separation of concerns (network, TC, lifecycle)
- Easier to test individual components
- Better discoverability
- Simpler import paths for specific functionality
- Follows single-responsibility principle

Usage (unchanged for backward compatibility):
    # Old way still works
    from source_query_proxy.epbf import run_ebpf_redirection
    await run_ebpf_redirection()

    # New way (recommended)
    from source_query_proxy.ebpf.redirector import EBPFRedirector
    async with EBPFRedirector() as redirector:
        await asyncio.Event().wait()

All tests and existing code continue to work without modification.
@spumer
Copy link
Collaborator Author

spumer commented Nov 8, 2025

@sourcery-ai review new changes

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • There’s a lot of duplicated header‐parsing code in PacketRedirectOperation’s incoming and outgoing functions—consider extracting that into a shared helper or template snippet to ease future updates.
  • In EBPFRedirector.start (and similarly in run_ebpf_redirection) the broad Exception catches may suppress unexpected errors—narrow them to known failure cases (e.g. compilation or config errors) so real bugs aren’t masked.
  • The legacy get_ebpf_program_run_args() helper still lives alongside the new API—once migration is complete you might remove or isolate it in tests to avoid confusion for new users.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- There’s a lot of duplicated header‐parsing code in PacketRedirectOperation’s incoming and outgoing functions—consider extracting that into a shared helper or template snippet to ease future updates.
- In EBPFRedirector.start (and similarly in run_ebpf_redirection) the broad Exception catches may suppress unexpected errors—narrow them to known failure cases (e.g. compilation or config errors) so real bugs aren’t masked.
- The legacy get_ebpf_program_run_args() helper still lives alongside the new API—once migration is complete you might remove or isolate it in tests to avoid confusion for new users.

## Individual Comments

### Comment 1
<location> `source_query_proxy/epbf.py:57-58` </location>
<code_context>
-    if interface is not None:
-        args += ['-i', interface]
+    # Register signal handlers (let application handle SIGTERM/SIGINT)
+    signal.signal(signal.SIGTERM, signal_handler)
+    signal.signal(signal.SIGINT, signal_handler)

-    return args
</code_context>

<issue_to_address>
**issue (bug_risk):** Signal handlers are registered synchronously, which may not be safe in all async contexts.

Using signal.signal in async code can cause issues if the event loop isn't in the main thread or when using frameworks like uvloop. Prefer asyncio.add_signal_handler for compatibility with async event loops.
</issue_to_address>

### Comment 2
<location> `source_query_proxy/epbf.py:60-69` </location>
<code_context>
+
+            # Attach to tc
+            logger.info(f"Attaching BPF programs to interface {interface}...")
+            try:
+                self._ifindex, self._fn_incoming, self._fn_outgoing = attach_tc_bpf(
+                    interface, self._bpf, self._ipr
+                )
+            except Exception as e:
+                logger.error(f"Failed to attach BPF programs: {e}")
+                self._ipr.close()
+                self._ipr = None
+                raise RuntimeError(f"tc attachment failed: {e}") from e
+
+            logger.info("✓ BPF programs attached successfully")
+            logger.info("=== eBPF redirection is active ===")
+
+            self._running = True
+
+        except Exception:
+            # Cleanup on failure
+            await self._cleanup()
</code_context>

<issue_to_address>
**suggestion (bug_risk):** get_ebpf_program_run_args swallows all exceptions and returns an empty list.

Logging or raising a specific error instead of returning an empty list would improve debuggability and help identify configuration or logic issues.
</issue_to_address>

### Comment 3
<location> `source_query_proxy/ebpf/operations.py:58-68` </location>
<code_context>
+        self.server_port = server_port
+        self.bind_port = bind_port
+        self.bind_ip = bind_ip
+        self.use_ipport_key = use_ipport_key or (bind_ip is not None)
+
+    def apply(self, program: 'BPFProgram'):
</code_context>

<issue_to_address>
**suggestion:** Implicitly setting use_ipport_key based on bind_ip may cause confusion.

If use_ipport_key is set to False but bind_ip is provided, the current logic will override the caller's intent. Please clarify the precedence or update the documentation to reflect this behavior.

```suggestion
        """
        Args:
            server_port: Game server port to redirect from
            bind_port: Proxy bind port to redirect to
            bind_ip: Bind IP address (if None, uses port-only lookup)
            use_ipport_key: If True, use IP+port composite key instead of port-only.
                If False, but bind_ip is provided, use_ipport_key will be set to True.
                If both are provided, bind_ip takes precedence and use_ipport_key will be True.
                To avoid confusion, explicitly set use_ipport_key when providing bind_ip.
                (If use_ipport_key is False and bind_ip is provided, use_ipport_key will be overridden to True.)
        """
        self.server_port = server_port
        self.bind_port = bind_port
        self.bind_ip = bind_ip
        self.use_ipport_key = use_ipport_key or (bind_ip is not None)
```
</issue_to_address>

### Comment 4
<location> `source_query_proxy/ebpf/redirector.py:130-133` </location>
<code_context>
+
+            self._running = True
+
+        except Exception:
+            # Cleanup on failure
+            await self._cleanup()
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Catching all exceptions in start() and calling _cleanup may mask the original error.

If _cleanup fails, the original exception could be suppressed. Use try/finally or log both exceptions to improve error visibility.

```suggestion
        except Exception as exc:
            # Cleanup on failure, ensure both exceptions are visible
            try:
                await self._cleanup()
            except Exception as cleanup_exc:
                logger.error(f"Exception during cleanup after error: {cleanup_exc}", exc_info=True)
                # Optionally, chain the exceptions for visibility
                raise exc from cleanup_exc
            raise
```
</issue_to_address>

### Comment 5
<location> `source_query_proxy/ebpf/tc.py:29` </location>
<code_context>
+        sfq_added: Whether sfq qdisc was added
+    """
+    # Clean up in reverse order of creation
+    if sfq_added:
+        try:
+            ipr.tc("del", "sfq", ifindex, "1:")
</code_context>

<issue_to_address>
**suggestion:** rollback_tc_bpf does not handle exceptions during rollback robustly.

Currently, only the last error is logged if multiple rollback steps fail. Aggregating errors or ensuring all cleanup steps run independently would improve robustness.
</issue_to_address>

### Comment 6
<location> `source_query_proxy/ebpf/tc.py:82` </location>
<code_context>
+    logger.info(f"Loaded BPF functions: incoming={fn_incoming.name}, outgoing={fn_outgoing.name}")
+
+    # Get interface index
+    ifindex = ipr.link_lookup(ifname=interface)[0]
+    logger.debug(f"Interface {interface} has index {ifindex}")
+
</code_context>

<issue_to_address>
**issue:** attach_tc_bpf assumes link_lookup always returns a non-empty list.

If link_lookup returns an empty list, accessing the first element will raise an IndexError. Please add a check for this case and raise a clear error if the interface is not found.
</issue_to_address>

### Comment 7
<location> `tests/test_ebpf_generation.py:32-37` </location>
<code_context>
+)
+
+
+class TestBPFStruct:
+    """Test BPFStruct code generation"""
+
+    def test_simple_struct(self):
+        """Test generating a simple struct"""
+        struct = BPFStruct("addr_key_t", [
+            ("u32", "ip"),
+            ("u16", "port")
+        ])
+
+        code = struct.render()
+        assert "struct addr_key_t {" in code
+        assert "u32 ip;" in code
+        assert "u16 port;" in code
+        assert code.endswith("};")
+
+    def test_empty_struct(self):
+        """Test generating an empty struct"""
+        struct = BPFStruct("empty_t", [])
</code_context>

<issue_to_address>
**suggestion (testing):** Missing tests for invalid or edge-case struct field types.

Please add tests for BPFStruct with invalid or edge-case field types, such as empty strings, unsupported types, or duplicate field names, to verify error handling and code generation.

```suggestion
    def test_empty_struct(self):
        """Test generating an empty struct"""
        struct = BPFStruct("empty_t", [])
        code = struct.render()
        assert "struct empty_t {" in code
        assert "};" in code

    def test_invalid_field_type(self):
        """Test struct with an unsupported field type"""
        struct = BPFStruct("invalid_type_t", [
            ("foo32", "ip"),
        ])
        code = struct.render()
        # Should still generate code, but with the unsupported type present
        assert "foo32 ip;" in code

    def test_empty_field_type_or_name(self):
        """Test struct with empty field type or name"""
        struct = BPFStruct("empty_field_t", [
            ("", "ip"),
            ("u32", ""),
        ])
        code = struct.render()
        assert " ip;" in code
        assert "u32 ;" in code

    def test_duplicate_field_names(self):
        """Test struct with duplicate field names"""
        struct = BPFStruct("dup_field_t", [
            ("u32", "ip"),
            ("u16", "ip"),
        ])
        code = struct.render()
        # Both fields should be present, but with duplicate names
        assert code.count("ip;") == 2
```
</issue_to_address>

### Comment 8
<location> `tests/test_ebpf_generation.py:40-49` </location>
<code_context>
+        assert "};" in code
+
+
+class TestBPFMap:
+    """Test BPFMap code generation"""
+
+    def test_simple_hash_map(self):
+        """Test generating a BPF_HASH map"""
+        bpf_map = BPFMap("port_map", "u16", "u16", max_entries=1024)
+        code = bpf_map.render()
+        assert code == "BPF_HASH(port_map, u16, u16, 1024);"
+
+    def test_struct_key_map(self):
+        """Test map with struct key type"""
+        bpf_map = BPFMap("addr_map", "struct addr_key_t", "u16", max_entries=2048)
+        code = bpf_map.render()
+        assert code == "BPF_HASH(addr_map, struct addr_key_t, u16, 2048);"
+
+    def test_default_max_entries(self):
+        """Test default max_entries value"""
+        bpf_map = BPFMap("test_map", "u32", "u32")
</code_context>

<issue_to_address>
**suggestion (testing):** No tests for map type variations or invalid parameters.

Please add tests covering various map_type values and invalid max_entries to ensure proper error handling.

Suggested implementation:

```python
class TestBPFMap:
    """Test BPFMap code generation"""

    def test_simple_hash_map(self):
        """Test generating a BPF_HASH map"""
        bpf_map = BPFMap("port_map", "u16", "u16", max_entries=1024)
        code = bpf_map.render()
        assert code == "BPF_HASH(port_map, u16, u16, 1024);"

    def test_struct_key_map(self):
        """Test map with struct key type"""
        bpf_map = BPFMap("addr_map", "struct addr_key_t", "u16", max_entries=2048)
        code = bpf_map.render()
        assert code == "BPF_HASH(addr_map, struct addr_key_t, u16, 2048);"

    def test_default_max_entries(self):
        """Test default max_entries value"""
        bpf_map = BPFMap("test_map", "u32", "u32")
        code = bpf_map.render()
        assert "10240" in code  # Default max_entries

    def test_map_type_variations(self):
        """Test generating maps with different map_type values"""
        bpf_array = BPFMap("array_map", "u32", "u64", map_type="BPF_ARRAY", max_entries=128)
        code_array = bpf_array.render()
        assert code_array == "BPF_ARRAY(array_map, u32, u64, 128);"

        bpf_percpu_hash = BPFMap("percpu_hash_map", "u32", "u64", map_type="BPF_PERCPU_HASH", max_entries=256)
        code_percpu_hash = bpf_percpu_hash.render()
        assert code_percpu_hash == "BPF_PERCPU_HASH(percpu_hash_map, u32, u64, 256);"

        bpf_lru_hash = BPFMap("lru_hash_map", "u32", "u64", map_type="BPF_LRU_HASH", max_entries=512)
        code_lru_hash = bpf_lru_hash.render()
        assert code_lru_hash == "BPF_LRU_HASH(lru_hash_map, u32, u64, 512);"

    def test_invalid_max_entries(self):
        """Test error handling for invalid max_entries values"""
        import pytest

        with pytest.raises(ValueError):
            BPFMap("neg_map", "u32", "u32", max_entries=-1)

        with pytest.raises(ValueError):
            BPFMap("zero_map", "u32", "u32", max_entries=0)

        with pytest.raises(TypeError):
            BPFMap("str_map", "u32", "u32", max_entries="not_an_int")

    def test_invalid_map_type(self):
        """Test error handling for invalid map_type values"""
        import pytest

        with pytest.raises(ValueError):
            BPFMap("bad_map", "u32", "u32", map_type="BPF_UNKNOWN", max_entries=10)

```

These tests assume that the BPFMap class raises ValueError for invalid max_entries (negative, zero) and invalid map_type, and TypeError for non-integer max_entries. If the BPFMap implementation does not currently raise these exceptions, you will need to update its constructor to perform these checks and raise the appropriate exceptions.
</issue_to_address>

### Comment 9
<location> `tests/test_ebpf_compilation.py:21-30` </location>
<code_context>
+def _compile_and_verify_bpf(bpf_code, expected_map_name):
</code_context>

<issue_to_address>
**suggestion (testing):** No test for BPF compilation failure or invalid code.

Add a test that triggers a BPF compilation error to confirm proper error handling and reporting.
</issue_to_address>

### Comment 10
<location> `tests/test_ebpf_compilation.py:188-194` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 11
<location> `source_query_proxy/ebpf/tc.py:177` </location>
<code_context>
def cleanup_tc(ipr: Any, ifindex: int, safe: bool = False) -> None:
    """Cleanup tc qdiscs

    Args:
        ipr: pyroute2 IPRoute instance
        ifindex: Interface index
        safe: If True, ignore 'Invalid argument' errors
    """
    from pyroute2.netlink.exceptions import NetlinkError

    try:
        ipr.tc("del", "ingress", ifindex, "ffff:")
        logger.debug("Removed ingress qdisc")
    except NetlinkError as exc:
        # Only ignore if safe mode and qdisc doesn't exist
        if not (safe and exc.args[1] == 'Invalid argument'):
            logger.error(f"Failed to remove ingress qdisc: {exc}")
            if not safe:
                raise

    try:
        ipr.tc("del", "sfq", ifindex, "1:")
        logger.debug("Removed sfq qdisc")
    except NetlinkError as exc:
        # Only ignore if safe mode and qdisc doesn't exist
        if not (safe and exc.args[1] == 'Invalid argument'):
            logger.error(f"Failed to remove sfq qdisc: {exc}")
            if not safe:
                raise

</code_context>

<issue_to_address>
**issue (code-quality):** Hoist conditional out of nested conditional [×2] ([`hoist-if-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-if-from-if/))
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

This commit addresses all remaining code review feedback for production
readiness and improved error handling.

**Async Safety & Signal Handling:**
- Use asyncio.add_signal_handler instead of signal.signal in run_ebpf_redirection()
- Properly remove signal handlers in finally block
- Async-safe signal handling compatible with uvloop and all async frameworks

**Exception Handling Improvements:**
- Narrow exception catches in EBPFRedirector.start() to (RuntimeError, ValueError, OSError)
- Chain cleanup exceptions to preserve both original and cleanup errors
- Fix get_ebpf_program_run_args() to log and raise instead of swallowing exceptions
- Add RuntimeError wrapper with full exc_info for better debuggability

**TC Operations Robustness:**
- Add check for empty link_lookup result with clear error message
- Improve error aggregation in rollback_tc_bpf() - log all errors individually
- Add summary error log if multiple rollback steps fail
- Hoist conditionals in cleanup_tc() for better readability

**Documentation Improvements:**
- Clarify use_ipport_key vs bind_ip precedence in PacketRedirectOperation
- Document that bind_ip presence overrides explicit use_ipport_key=False
- Add inline comment explaining precedence logic

**Test Coverage Expansion:**
- Add edge case tests for BPFStruct (empty fields, duplicates, unsupported types)
- Add edge case tests for BPFMap (different map types, zero/negative max_entries)
- Add compilation failure tests (syntax errors, undefined functions)
- Tests document current behavior without requiring validation (keeps template engine simple)

Changes:
- source_query_proxy/epbf.py: Async-safe signals, better exception handling
- source_query_proxy/ebpf/redirector.py: Narrow exceptions, chain cleanup errors
- source_query_proxy/ebpf/tc.py: Interface validation, error aggregation, hoisted conditionals
- source_query_proxy/ebpf/operations.py: Clarified precedence documentation
- tests/test_ebpf_generation.py: Edge case tests for template engine
- tests/test_ebpf_compilation.py: Compilation failure tests

All changes maintain backward compatibility and improve production robustness.
@spumer
Copy link
Collaborator Author

spumer commented Nov 9, 2025

@sourcery-ai review new changes

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `source_query_proxy/ebpf/redirector.py:145-152` </location>
<code_context>
+
+        Safe to call multiple times (idempotent).
+        """
+        if not self._running:
+            logger.debug("eBPF redirection is not running, skipping stop")
+            return
</code_context>

<issue_to_address>
**suggestion (bug_risk):** stop() is idempotent, but may not clean up resources if partially initialized.

Currently, stop() returns early if not running, which may prevent cleanup of partially allocated resources. To ensure proper resource release, always invoke _cleanup(), regardless of running state.

```suggestion
        if not self._running:
            logger.debug("eBPF redirection is not running, proceeding to cleanup resources")

        logger.info("Stopping eBPF redirection and cleaning up resources...")
        self._running = False
        await self._cleanup()
        logger.info("✓ eBPF redirection stopped and resources cleaned up")
```
</issue_to_address>

### Comment 2
<location> `tests/test_ebpf_compilation.py:121-130` </location>
<code_context>
+    def test_multiple_ports_same_program(self):
</code_context>

<issue_to_address>
**suggestion (testing):** Test for multiple port mappings in a single program is present, but does not test IP+port mode.

Please add a test for multiple IP+port mappings in a single program to verify addr_map is populated correctly in all modes.
</issue_to_address>

### Comment 3
<location> `tests/test_ebpf_compilation.py:232-238` </location>
<code_context>

</code_context>

<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))

<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals

Some ways to fix this:

* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.

> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>

### Comment 4
<location> `tests/test_ebpf_generation.py:95-102` </location>
<code_context>
    def test_different_map_types(self):
        """Test generating maps with different map_type values"""
        # BPF_ARRAY
        bpf_array = BPFMap("array_map", "u32", "u64", map_type="BPF_ARRAY", max_entries=128)
        code_array = bpf_array.render()
        assert code_array == "BPF_ARRAY(array_map, u32, u64, 128);"

        # BPF_PERCPU_HASH
        bpf_percpu = BPFMap("percpu_map", "u32", "u64", map_type="BPF_PERCPU_HASH", max_entries=256)
        code_percpu = bpf_percpu.render()
        assert code_percpu == "BPF_PERCPU_HASH(percpu_map, u32, u64, 256);"

</code_context>

<issue_to_address>
**issue (code-quality):** Extract duplicate code into method ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

This commit addresses the remaining code review feedback to improve
robustness and test quality.

**EBPFRedirector.stop() Robustness (Comment 1):**
- Fixed stop() to always cleanup resources, even if not fully started
- Removed early return that could prevent cleanup of partially allocated resources
- Ensures proper resource release in all scenarios (idempotent cleanup)
- Updated logging to clarify cleanup behavior

Before:
    if not self._running:
        return  # Could skip cleanup!

After:
    if not self._running:
        logger.debug("...proceeding to cleanup resources")
    # Always cleanup regardless of running state
    await self._cleanup()

**Test Coverage Improvements:**

1. **Multiple IP+port Mappings Test (Comment 2):**
   - Added test_multiple_ipport_mappings_same_program()
   - Tests addr_map with multiple (IP, port) tuples
   - Verifies that single program handles multiple IP+port combinations
   - Covers both port-only and IP+port modes now

2. **Removed Conditionals from Tests (Comment 3):**
   - Split test_bcc_availability_reporting() into two separate tests
   - test_bcc_is_available() - runs only when BCC installed
   - test_bcc_not_available() - runs only when BCC not installed
   - Uses pytest.mark.skipif instead of conditionals
   - Follows Google SWE testing best practices

3. **Extracted Duplicate Code (Comment 4):**
   - Created _assert_map_renders_as() helper in TestBPFMap
   - Reduces duplication in test_different_map_types()
   - Makes test intent clearer and easier to extend

Changes:
- source_query_proxy/ebpf/redirector.py: Always cleanup in stop()
- tests/test_ebpf_compilation.py: Added IP+port test, split conditional test
- tests/test_ebpf_generation.py: Extracted helper method

All tests remain clear and "trivially correct upon inspection".
…entation

Complete documentation updates for the transition from external sqredirect
to internal eBPF implementation using BCC.

Changes:
- Add MIGRATION.md: Comprehensive migration guide with step-by-step
  instructions, troubleshooting, and performance comparison
- Update CHANGELOG.md: Add v3.0.0 release notes documenting internal
  eBPF implementation, breaking changes (none!), and new features
- Update README.rst: Replace sqredirect references with BCC installation,
  update version to 3.0.0, improve eBPF setup instructions
- Update PROGRESS.md: Mark documentation updates as complete, update
  status to reflect completion of implementation and documentation

This completes the high-priority documentation tasks for issue #131.
The internal eBPF implementation is now fully documented and ready for
testing and deployment.
Create complete documentation structure using MkDocs Material theme.

Documentation includes:
- Installation guide with detailed setup instructions
- Quick start guide for rapid deployment
- Configuration reference with all options
- eBPF guides (overview, setup, architecture, performance)
- Migration guide from v2.x to v3.0
- Troubleshooting guide for common issues
- Development guide for contributors
- API reference for Python developers
- Docker deployment guide
- Contributing guidelines
- License information

Structure:
- mkdocs.yml: MkDocs configuration with Material theme
- docs_requirements.txt: Documentation build dependencies
- docs/: Complete documentation in markdown format
  - index.md: Home page with project overview
  - installation.md: Installation instructions
  - quickstart.md: 5-minute setup guide
  - configuration.md: Configuration options reference
  - ebpf/: eBPF-specific documentation
    - overview.md: eBPF concepts and architecture
    - setup.md: Step-by-step eBPF setup
    - architecture.md: Implementation deep dive
    - performance.md: Benchmarks and optimization
  - migration.md: v2.x to v3.0 migration guide
  - troubleshooting.md: Common issues and solutions
  - development.md: Development workflow
  - api-reference.md: Python API documentation
  - docker.md: Docker and Kubernetes deployment
  - contributing.md: Contribution guidelines
  - changelog.md: Version history
  - license.md: MIT License

Features:
- Dark/light theme toggle
- Code syntax highlighting
- Search functionality
- Navigation tabs and sections
- Mobile-responsive design
- Copy-to-clipboard for code blocks

This completes the documentation for v3.0.0 release.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants