-
-
Notifications
You must be signed in to change notification settings - Fork 14
Class-based template engine. Replaces sqredirect usage, now compile ebpf programs and attach directly #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Add comprehensive design document for issue #131 (Templating eBPF program). Key design decisions: - Class-based template engine (similar to Django migrations) - Stay with BCC (Python), future migration to bpfman.io - Port sqredirect redirect.c logic without new features - Docker-compose test infrastructure with virtual networks The design includes: - Complete class hierarchy (BPFElement, BPFMap, BPFFunction, BPFOperation) - PacketRedirectOperation to replace external sqredirect dependency - 10-phase implementation plan - Docker test environment with gameserver/proxy/client setup Related: #131, #130
Implement core template engine to replace external sqredirect dependency. New package: source_query_proxy/ebpf/ - elements.py: BPFElement, BPFStruct, BPFMap, BPFFunction base classes - program.py: BPFProgram orchestrator for complete C code generation - operations.py: PacketRedirectOperation ports redirect.c logic Key features: - Class-based composable design (Django-style operations) - Ports sqredirect incoming()/outgoing() functions - Steam protocol validation for DDoS protection - UDP checksum recalculation - Supports both port-only and IP+port lookup modes Tests: - Add comprehensive unit tests in test_ebpf_generation.py - Verify code generation for all components - Test both redirect modes (port-only vs IP+port) Breaking changes: - Delete ebpf_template.py (replaced by new template engine) Related: #131
Replace custom checksum code with standard BPF helper bpf_l4_csum_replace. Changes: - operations.py: Use bpf_l4_csum_replace() instead of bpf_csum_diff/csum_fold - Properly handle network byte order with htons() - Calculate correct offset to checksum field Also add WIP version of epbf_new.py with BCC integration (not yet complete). Related: #131
Enhancements to GitHub Actions workflow: - Add separate lint job with flake8 and black checks - Enable coverage reporting with pytest-cov - Upload coverage to Codecov for Python 3.11 - Add verbose test output (-v flag) - Generate XML and terminal coverage reports The new eBPF template engine tests (test_ebpf_generation.py) will be automatically discovered and run across Python 3.8, 3.9, 3.10, and 3.11. Related: #131
Add comprehensive Docker testing infrastructure for eBPF code compilation. New files: - tests/test_ebpf_compilation.py: Integration tests that verify BPF code compiles with BCC. Tests include: * Port-only redirect compilation * IP+port redirect compilation * Syntax validation * Multiple port mappings * Various port combinations - tests/docker/Dockerfile.bpf-test: Container with BCC, kernel headers, and all dependencies needed to compile BPF programs - tests/docker/docker-compose.bpf-test.yml: Orchestrates test execution with both generation and compilation tests - tests/docker/README.md: Complete documentation for Docker test setup, usage instructions, and troubleshooting - .github/workflows/ebpf-tests.yml: GitHub Actions workflow that: * Builds Docker test container * Runs code generation tests * Runs BCC compilation tests * Tests via docker-compose * Triggers on eBPF code changes Features: - ✅ Tests run in isolated Docker environment - ✅ BCC compilation verification without kernel privileges - ✅ Graceful handling when BCC not available - ✅ Automated CI/CD integration - ✅ Both unit and integration test coverage Tests verify: - Generated BPF C code compiles successfully - BPF functions are loadable (incoming/outgoing) - BPF maps can be created and populated - Multiple port mappings in single program - Code syntax is valid Limitations documented: - Cannot test actual packet redirection (needs kernel access) - Cannot test tc attachment (needs privileges) - Full integration tests planned for future Related: #131
Complete implementation of eBPF packet redirection using the class-based template engine, replacing external sqredirect dependency. Changes to source_query_proxy/epbf.py: - Replace subprocess calls to sqredirect with direct BCC usage - Generate single BPF program for all servers using template engine - Populate BPF maps with all port mappings at runtime - Attach BPF programs to tc (traffic control) using pyroute2 - Support both port-only and IP+port lookup modes - Lazy import BCC with helpful error messages - Keep backward compatibility for existing tests Key functions: - _collect_server_mappings(): Gather all server configs from settings - _generate_bpf_program(): Use template engine to generate BPF C code - _populate_maps(): Fill BPF maps with port mappings - _attach_tc_bpf(): Attach incoming/outgoing filters to network interface - run_ebpf_redirection(): Main entry point (async) Features: ✅ Single BPF program for all servers (not per-server) ✅ Runtime map population (dynamic configuration) ✅ Automatic interface detection ✅ Proper tc attachment (ingress + egress) ✅ Error handling and logging ✅ Graceful shutdown on Ctrl+C ✅ Backward compatibility with tests Breaking changes: - Requires BCC installed (python3-bpfcc) - No longer uses external sqredirect executable - config.ebpf.executable and config.ebpf.script_path ignored Removed: - source_query_proxy/epbf_new.py (WIP file, no longer needed) Related: #131
Reviewer's GuideThis PR replaces the external sqredirect dependency with an internal class-based eBPF template engine and direct BCC integration. It refactors epbf.py to use a modular ebpf package (elements, program, operations, runtime, tc, redirector), introduces an async EBPFRedirector for lifecycle management, and extends CI with lint/format checks, coverage reporting, and Docker-based integration tests—backed by new unit and compilation tests and comprehensive design/progress docs. Sequence diagram for eBPF redirection lifecycle using EBPFRedirectorsequenceDiagram
participant App as Application
participant Redirector as EBPFRedirector
participant Runtime as "ebpf.runtime"
participant TC as "ebpf.tc"
participant BCC as BCC
App->>Redirector: async with EBPFRedirector()
Redirector->>Runtime: collect_server_mappings()
Redirector->>Runtime: generate_bpf_program(use_ipport_key)
Redirector->>Runtime: import_bcc()
Redirector->>BCC: Compile BPF program
Redirector->>Runtime: populate_maps(bpf, use_ipport_key, mappings)
Redirector->>TC: attach_tc_bpf(interface, bpf, ipr)
App-->>Redirector: (waits, eBPF active)
App->>Redirector: exit context
Redirector->>TC: cleanup_tc(ipr, ifindex, safe=True)
Redirector->>Runtime: (cleanup resources)
ER diagram for BPF map usage in PacketRedirectOperationerDiagram
PORT_MAP {
u16 port
u16 bind_port
}
ADDR_MAP {
u32 ip
u16 port
u16 bind_port
}
SERVER {
server_port int
bind_port int
bind_ip str
}
SERVER ||--o{ PORT_MAP : "uses port-only mapping"
SERVER ||--o{ ADDR_MAP : "uses IP+port mapping"
PORT_MAP }o--|| SERVER : "maps to bind_port"
ADDR_MAP }o--|| SERVER : "maps to bind_port"
Class diagram for the new eBPF template engine architectureclassDiagram
class BPFElement {
+render() str
}
class BPFStruct {
+name: str
+fields: List[Tuple[str, str]]
+render() str
}
class BPFMap {
+name: str
+key_type: str
+value_type: str
+max_entries: int
+map_type: str
+render() str
}
class BPFFunction {
+name: str
+return_type: str
+params: List[Tuple[str, str]]
+body_parts: List[str]
+add_code(code: str)
+render() str
}
class BPFOperation {
+apply(program: BPFProgram)
}
class PacketRedirectOperation {
+server_port: int
+bind_port: int
+bind_ip: Optional[str]
+use_ipport_key: bool
+apply(program: BPFProgram)
-_create_incoming_function() BPFFunction
-_create_outgoing_function() BPFFunction
}
class BPFProgram {
+name: str
+includes: List[str]
+structs: List[BPFStruct]
+maps: List[BPFMap]
+functions: List[BPFFunction]
+add_include(include: str)
+add_struct(struct: BPFStruct)
+add_map(map: BPFMap)
+add_function(func: BPFFunction)
+apply_operation(operation: BPFOperation)
+render() str
}
BPFStruct --|> BPFElement
BPFMap --|> BPFElement
BPFFunction --|> BPFElement
PacketRedirectOperation --|> BPFOperation
BPFProgram o-- BPFStruct
BPFProgram o-- BPFMap
BPFProgram o-- BPFFunction
BPFOperation <.. BPFProgram: apply
PacketRedirectOperation <.. BPFProgram: apply_operation
Class diagram for the EBPFRedirector lifecycle manager and runtime modulesclassDiagram
class EBPFRedirector {
-_bpf
-_ipr
-_ifindex: Optional[int]
-_fn_incoming
-_fn_outgoing
-_interface: Optional[str]
-_running: bool
+is_running: bool
+interface: Optional[str]
+start()
+stop()
+restart()
+__aenter__()
+__aexit__()
-_cleanup()
}
class runtime {
+import_bcc()
+get_addr_interface(addr: IPv4Address) Optional[str]
+get_default_interface() str
+collect_server_mappings() (bool, str, List[Tuple[int, int, Optional[str]]])
+generate_bpf_program(use_ipport_key: bool) str
+ip_to_int(ip_str: str) int
+populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, Optional[str]]])
}
class tc {
+rollback_tc_bpf(ipr, ifindex, ingress_added, ingress_filter_added, sfq_added)
+attach_tc_bpf(interface: str, bpf, ipr)
+cleanup_tc(ipr, ifindex: int, safe: bool)
}
EBPFRedirector ..> runtime: uses
EBPFRedirector ..> tc: uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider splitting this large PR into smaller, focused changes (template engine core, epbf integration, CI updates) to make review and testing more manageable.
- Replace the use of AssertionError when no interface is found with a more descriptive exception (e.g., config.ConfigurationError) for clearer error handling.
- Improve the shutdown sequence in run_ebpf_redirection to explicitly detach tc qdiscs and unload BPF programs instead of relying on garbage collection and KeyboardInterrupt.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider splitting this large PR into smaller, focused changes (template engine core, epbf integration, CI updates) to make review and testing more manageable.
- Replace the use of AssertionError when no interface is found with a more descriptive exception (e.g., config.ConfigurationError) for clearer error handling.
- Improve the shutdown sequence in run_ebpf_redirection to explicitly detach tc qdiscs and unload BPF programs instead of relying on garbage collection and KeyboardInterrupt.
## Individual Comments
### Comment 1
<location> `source_query_proxy/epbf.py:42-46` </location>
<code_context>
+ return _BPF
+
def _get_addr_interface(addr: IPv4Address):
+ """Get network interface name for given IP address"""
ipdb = pyroute2.IPDB()
</code_context>
<issue_to_address>
**suggestion (bug_risk):** IPDB usage may leak resources if not closed.
Use a context manager (with pyroute2.IPDB() as ipdb) to ensure IPDB is properly closed and resources are released.
```suggestion
def _get_addr_interface(addr: IPv4Address):
"""Get network interface name for given IP address"""
with pyroute2.IPDB() as ipdb:
for idx, addresses in ipdb.ipaddr.items():
for ifaddr, _prefix in addresses:
pass # TODO: implement logic or keep as is
return None
```
</issue_to_address>
### Comment 2
<location> `source_query_proxy/epbf.py:147` </location>
<code_context>
+ return struct.unpack("!I", socket.inet_aton(ip_str))[0]
+
+
+def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
+ """Populate BPF maps with port mappings
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** No error handling for BPF map population failures.
Add error handling or logging for failed BPF map insertions to improve debuggability.
Suggested implementation:
```python
def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
"""Populate BPF maps with port mappings
Logs errors if BPF map insertions fail.
```
```python
def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
"""Populate BPF maps with port mappings
Logs errors if BPF map insertions fail.
"""
for src_port, dst_port, ip_str in mappings:
try:
ip_int = _ip_to_int(ip_str)
key = (ip_int, src_port) if use_ipport_key else src_port
bpf_map = bpf['port_map']
bpf_map[key] = dst_port
except Exception as e:
logging.error(
"Failed to insert mapping into BPF map: src_port=%s, dst_port=%s, ip_str=%s, error=%s",
src_port, dst_port, ip_str, str(e)
)
```
</issue_to_address>
### Comment 3
<location> `source_query_proxy/epbf.py:178` </location>
<code_context>
+ logger.info(f" Map: {server_port} -> {bind_port}")
+
+
+def _attach_tc_bpf(interface: str, bpf) -> None:
+ """Attach BPF programs to tc (traffic control)
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Potential resource leak if tc attachment fails after adding qdisc.
Add cleanup logic to remove the qdisc if filter attachment fails, or document the potential for leftover qdisc.
Suggested implementation:
```python
def _attach_tc_bpf(interface: str, bpf) -> None:
"""Attach BPF programs to tc (traffic control)
If filter attachment fails after adding qdisc, attempts to remove the qdisc to avoid resource leaks.
```
```python
def _attach_tc_bpf(interface: str, bpf) -> None:
"""Attach BPF programs to tc (traffic control)
If filter attachment fails after adding qdisc, attempts to remove the qdisc to avoid resource leaks.
"""
import subprocess
try:
# Add qdisc
subprocess.run(["tc", "qdisc", "add", "dev", interface, "clsact"], check=True)
try:
# Attach filter
subprocess.run([
"tc", "filter", "add", "dev", interface, "ingress",
"bpf", "da", "obj", bpf.filename, "sec", "classifier"
], check=True)
except Exception as filter_exc:
# Cleanup qdisc if filter attachment fails
subprocess.run(["tc", "qdisc", "del", "dev", interface, "clsact"])
raise filter_exc
except Exception as exc:
logger.error(f"Failed to attach tc BPF: {exc}")
raise
```
</issue_to_address>
### Comment 4
<location> `tests/test_ebpf_generation.py:262-273` </location>
<code_context>
+class TestIntegration:
+ """Integration tests for the template engine"""
+
+ def test_apply_multiple_operations(self):
+ """Test applying multiple operations to same program"""
+ program = BPFProgram("multi")
+
+ # Apply redirect operation
+ op1 = PacketRedirectOperation(server_port=27015, bind_port=27016)
+ program.apply_operation(op1)
+
+ # Could apply more operations here in the future
+ # For now, just verify it works
+ code = program.render()
+ assert len(code) > 0
+
+ def test_typical_usage_pattern(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Test coverage for multiple operations is minimal.
Expand the test to apply multiple distinct PacketRedirectOperation instances with varying ports/IPs, and verify that the generated code supports all mappings.
```suggestion
def test_apply_multiple_operations(self):
"""Test applying multiple operations to same program"""
program = BPFProgram("multi")
# Apply multiple distinct redirect operations
op1 = PacketRedirectOperation(server_port=27015, bind_port=27016)
op2 = PacketRedirectOperation(server_port=27017, bind_port=27018, bind_ip="10.0.0.1")
op3 = PacketRedirectOperation(server_port=27019, bind_port=27020, bind_ip="192.168.100.100")
program.apply_operation(op1)
program.apply_operation(op2)
program.apply_operation(op3)
code = program.render()
assert len(code) > 0
# Verify that all mappings are present in the generated code
assert "27015" in code and "27016" in code
assert "27017" in code and "27018" in code and "10.0.0.1" in code
assert "27019" in code and "27020" in code and "192.168.100.100" in code
```
</issue_to_address>
### Comment 5
<location> `tests/test_ebpf_compilation.py:122-131` </location>
<code_context>
+ def test_multiple_ports_same_program(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Test for multiple port mappings does not cover IP+port mode.
Please add a test for multiple IP+port mappings to ensure correct handling of addr_map entries in this mode.
</issue_to_address>
### Comment 6
<location> `tests/test_ebpf_compilation.py:194-202` </location>
<code_context>
+ assert "int outgoing" in bpf_code
+
+
+def test_bcc_availability_reporting():
+ """Report whether BCC is available for testing"""
+ if BCC_AVAILABLE:
+ from bcc import BPF
+
+ print(f"\n✓ BCC is available: {BPF.__file__}")
+ else:
+ print("\n✗ BCC is NOT available - compilation tests will be skipped")
+ print(" Install with: apt-get install python3-bpfcc")
</code_context>
<issue_to_address>
**nitpick (testing):** BCC availability reporting test is not a real test.
This function should include assertions to validate behavior, or be relocated if its purpose is informational rather than testing.
</issue_to_address>
### Comment 7
<location> `tests/test_ebpf_generation.py:240-249` </location>
<code_context>
+ assert "data_end" in code
+ assert "TC_ACT_OK" in code
+
+ def test_complete_generated_code(self):
+ """Test that complete valid C code is generated"""
+ program = BPFProgram("redirect")
+ op = PacketRedirectOperation(
+ server_port=27015,
+ bind_port=27816
+ )
+ op.apply(program)
+ code = program.render()
+
+ # Should be valid C-like code
+ assert code.count("{") == code.count("}") # Balanced braces
+ assert "#include" in code
+ assert "return" in code
+
+ # Should not be empty
+ assert len(code) > 100
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test that parses the generated code with a C parser.
Parsing the generated code with a C parser or linter would help identify subtle syntax issues that basic structural checks may miss.
</issue_to_address>
### Comment 8
<location> `tests/test_ebpf_compilation.py:196-202` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 9
<location> `source_query_proxy/ebpf/elements.py:41-42` </location>
<code_context>
def render(self) -> str:
"""Generate C struct definition"""
lines = [f"struct {self.name} {{"]
for field_type, field_name in self.fields:
lines.append(f" {field_type} {field_name};")
lines.append("};")
return "\n".join(lines)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
```suggestion
lines.extend(
f" {field_type} {field_name};"
for field_type, field_name in self.fields
)
```
</issue_to_address>
### Comment 10
<location> `source_query_proxy/ebpf/elements.py:120` </location>
<code_context>
def render(self) -> str:
"""Generate complete function with signature and body"""
# Build parameter list
param_strs = []
for param_type, param_name in self.params:
param_strs.append(f"{param_type} {param_name}")
params_str = ", ".join(param_strs)
# Build function signature
signature = f"{self.return_type} {self.name}({params_str})"
# Build function body
lines = [signature + " {"]
for code_block in self.body_parts:
# Add code block (already should be indented)
lines.append(code_block)
lines.append("}")
return "\n".join(lines)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
- Simplify generator expression ([`simplify-generator`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-generator/))
</issue_to_address>
### Comment 11
<location> `source_query_proxy/ebpf/program.py:92` </location>
<code_context>
def render(self) -> str:
"""Generate complete BPF C code
Returns:
Complete C code ready for BCC compilation
"""
parts = []
# Includes
for inc in self.includes:
parts.append(f"#include {inc}")
parts.append("")
# Structs
if self.structs:
for struct in self.structs:
parts.append(struct.render())
parts.append("")
# Maps
if self.maps:
for map in self.maps:
parts.append(map.render())
parts.append("")
# Functions
for func in self.functions:
parts.append(func.render())
parts.append("")
return "\n".join(parts)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Merge consecutive list appends into a single extend [×2] ([`merge-list-appends-into-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/merge-list-appends-into-extend/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
</issue_to_address>
### Comment 12
<location> `source_query_proxy/epbf.py:54-60` </location>
<code_context>
def _get_default_interface():
"""Get default network interface"""
ip = pyroute2.IPRoute()
default_routes = ip.get_default_routes()
if default_routes:
idx = default_routes[0].get_attr('RTA_OIF')
interface = ip.get_links(idx)[0].get_attr('IFLA_IFNAME')
return interface
raise RuntimeError("Cannot determine default network interface")
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
</issue_to_address>
### Comment 13
<location> `tests/test_ebpf_compilation.py:42` </location>
<code_context>
def test_port_only_redirect_compiles(self):
"""Test that port-only redirect code compiles with BCC"""
# Generate BPF program
program = BPFProgram("test_port_redirect")
op = PacketRedirectOperation(
server_port=27015, bind_port=27016, use_ipport_key=False
)
program.apply_operation(op)
bpf_code = program.render()
# Verify code is not empty
assert len(bpf_code) > 100
assert "int incoming" in bpf_code
assert "int outgoing" in bpf_code
# Try to compile with BCC
try:
b = BPF(text=bpf_code, debug=0)
# Verify functions are loadable (but don't actually load them)
# This checks that the BPF bytecode is valid
assert hasattr(b, "load_func")
# Check that our functions exist in the compiled program
incoming_fn = b.load_func("incoming", BPF.SCHED_CLS)
outgoing_fn = b.load_func("outgoing", BPF.SCHED_CLS)
assert incoming_fn is not None
assert outgoing_fn is not None
# Check that maps are created
port_map = b.get_table("port_map")
assert port_map is not None
except Exception as e:
pytest.fail(f"BPF compilation failed: {e}\n\nGenerated code:\n{bpf_code}")
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 14
<location> `tests/test_ebpf_compilation.py:83` </location>
<code_context>
def test_ipport_redirect_compiles(self):
"""Test that IP+port redirect code compiles with BCC"""
# Generate BPF program
program = BPFProgram("test_ipport_redirect")
op = PacketRedirectOperation(
server_port=27015,
bind_port=27016,
bind_ip="192.168.1.1",
use_ipport_key=True,
)
program.apply_operation(op)
bpf_code = program.render()
# Verify code is not empty
assert len(bpf_code) > 100
assert "struct addr_key_t" in bpf_code
assert "int incoming" in bpf_code
assert "int outgoing" in bpf_code
# Try to compile with BCC
try:
b = BPF(text=bpf_code, debug=0)
# Verify functions are loadable
incoming_fn = b.load_func("incoming", BPF.SCHED_CLS)
outgoing_fn = b.load_func("outgoing", BPF.SCHED_CLS)
assert incoming_fn is not None
assert outgoing_fn is not None
# Check that addr_map is created
addr_map = b.get_table("addr_map")
assert addr_map is not None
except Exception as e:
pytest.fail(f"BPF compilation failed: {e}\n\nGenerated code:\n{bpf_code}")
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 15
<location> `tests/test_ebpf_compilation.py:135` </location>
<code_context>
def test_multiple_ports_same_program(self):
"""Test that we can create a program handling multiple port mappings"""
# This tests the approach we'll use in epbf.py
program = BPFProgram("multi_port")
# For now, just test that one operation works
# In the future, we need to support multiple mappings in one program
op = PacketRedirectOperation(server_port=27015, bind_port=27016)
program.apply_operation(op)
bpf_code = program.render()
try:
b = BPF(text=bpf_code, debug=0)
# Test that we can populate the map with multiple entries
port_map = b.get_table("port_map")
# Add multiple port mappings
port_map[27015] = 27016
port_map[27016] = 27017
port_map[27017] = 27018
# Verify mappings
assert port_map[27015].value == 27016
assert port_map[27016].value == 27017
assert port_map[27017].value == 27018
except Exception as e:
pytest.fail(f"Multi-port test failed: {e}")
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 16
<location> `tests/test_ebpf_generation.py:45` </location>
<code_context>
def test_simple_hash_map(self):
"""Test generating a BPF_HASH map"""
map = BPFMap("port_map", "u16", "u16", max_entries=1024)
code = map.render()
assert code == "BPF_HASH(port_map, u16, u16, 1024);"
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>
### Comment 17
<location> `tests/test_ebpf_generation.py:51` </location>
<code_context>
def test_struct_key_map(self):
"""Test map with struct key type"""
map = BPFMap("addr_map", "struct addr_key_t", "u16", max_entries=2048)
code = map.render()
assert code == "BPF_HASH(addr_map, struct addr_key_t, u16, 2048);"
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>
### Comment 18
<location> `tests/test_ebpf_generation.py:57` </location>
<code_context>
def test_default_max_entries(self):
"""Test default max_entries value"""
map = BPFMap("test_map", "u32", "u32")
code = map.render()
assert "10240" in code # Default max_entries
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
tests/test_ebpf_generation.py
Outdated
| def test_apply_multiple_operations(self): | ||
| """Test applying multiple operations to same program""" | ||
| program = BPFProgram("multi") | ||
|
|
||
| # Apply redirect operation | ||
| op1 = PacketRedirectOperation(server_port=27015, bind_port=27016) | ||
| program.apply_operation(op1) | ||
|
|
||
| # Could apply more operations here in the future | ||
| # For now, just verify it works | ||
| code = program.render() | ||
| assert len(code) > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Test coverage for multiple operations is minimal.
Expand the test to apply multiple distinct PacketRedirectOperation instances with varying ports/IPs, and verify that the generated code supports all mappings.
| def test_apply_multiple_operations(self): | |
| """Test applying multiple operations to same program""" | |
| program = BPFProgram("multi") | |
| # Apply redirect operation | |
| op1 = PacketRedirectOperation(server_port=27015, bind_port=27016) | |
| program.apply_operation(op1) | |
| # Could apply more operations here in the future | |
| # For now, just verify it works | |
| code = program.render() | |
| assert len(code) > 0 | |
| def test_apply_multiple_operations(self): | |
| """Test applying multiple operations to same program""" | |
| program = BPFProgram("multi") | |
| # Apply multiple distinct redirect operations | |
| op1 = PacketRedirectOperation(server_port=27015, bind_port=27016) | |
| op2 = PacketRedirectOperation(server_port=27017, bind_port=27018, bind_ip="10.0.0.1") | |
| op3 = PacketRedirectOperation(server_port=27019, bind_port=27020, bind_ip="192.168.100.100") | |
| program.apply_operation(op1) | |
| program.apply_operation(op2) | |
| program.apply_operation(op3) | |
| code = program.render() | |
| assert len(code) > 0 | |
| # Verify that all mappings are present in the generated code | |
| assert "27015" in code and "27016" in code | |
| assert "27017" in code and "27018" in code and "10.0.0.1" in code | |
| assert "27019" in code and "27020" in code and "192.168.100.100" in code |
Update epbf.py to match sqredirect's exact BPF attachment method, copied from https://github.com/sqproxy/sqredirect. Key changes to match sqredirect: 1. **BPF mode**: Use BPF.SCHED_ACT instead of BPF.SCHED_CLS - sqredirect uses scheduler action mode, not classifier mode 2. **Qdisc types**: - Incoming: Use "ingress" qdisc (not "clsact") - Outgoing: Use "sfq" (Stochastic Fair Queuing) qdisc - This matches sqredirect's exact setup 3. **Filter type**: Use u32 filters with BPF actions - Instead of direct "bpf" filters - Add action dict with kind="bpf", fd, name, action="ok" - Use protocol=ETH_P_ALL for all protocols - Add target and keys for u32 matching 4. **Error handling**: Catch NetlinkError properly - Check for 'File exists' when adding qdiscs - Allow idempotent attachment (can run multiple times) - Proper error propagation for other errors 5. **Cleanup logic**: - Add _cleanup_tc() function - Register cleanup with atexit - Register signal handlers (SIGTERM, SIGINT) - Safe cleanup mode to ignore 'Invalid argument' errors - Clean up both ingress and sfq qdiscs 6. **IPRoute management**: - Pass IPRoute instance to _attach_tc_bpf - Return (ifindex, fn_incoming, fn_outgoing) for cleanup - Close IPRoute on cleanup This ensures sqproxy uses the exact same proven attachment method as sqredirect, reducing risk of compatibility issues. Related: #131
Address all code review feedback from Sourcery AI: **epbf.py fixes:** - Fix resource leaks by adding context managers to _get_addr_interface() and _get_default_interface() - Replace AssertionError with ConfigurationError for configuration issues - Add comprehensive error handling to _populate_maps() with per-mapping try/except - Improve cleanup sequence with cleanup_done flag to prevent double cleanup - Add explicit cleanup on exceptions (KeyboardInterrupt, CancelledError, etc.) - Ensure ipr.close() happens in finally block **test_ebpf_compilation.py improvements:** - Extract _compile_and_verify_bpf() helper function to reduce code duplication - Change BPF mode from SCHED_CLS to SCHED_ACT (matching sqredirect) - Simplify test methods by using helper function **test_ebpf_generation.py enhancements:** - Expand test_apply_multiple_operations() with better assertions - Add test_multiple_ipport_mappings() for testing IP+port addr_map mode - Verify that single program can handle multiple mappings via runtime map population All changes maintain backward compatibility and follow project code style.
|
@sourcery-ai review new changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `source_query_proxy/epbf.py:147-156` </location>
<code_context>
+def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
</code_context>
<issue_to_address>
**suggestion:** Error handling in map population may mask underlying issues.
Wrapping all exceptions in RuntimeError hides the original error details. Log the full traceback or re-raise the original exception after logging to improve debuggability.
Suggested implementation:
```python
import logging
import traceback
def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
"""Populate BPF maps with port mappings
Args:
bpf: BCC BPF instance
use_ipport_key: Whether to use IP+port or port-only map
mappings: List of (server_port, bind_port, bind_ip) tuples
Raises:
Exception: If map population fails
"""
```
```python
def _populate_maps(bpf, use_ipport_key: bool, mappings: List[Tuple[int, int, str]]):
"""Populate BPF maps with port mappings
Args:
bpf: BCC BPF instance
use_ipport_key: Whether to use IP+port or port-only map
mappings: List of (server_port, bind_port, bind_ip) tuples
Raises:
Exception: If map population fails
"""
try:
# ... existing map population logic ...
pass # Replace with actual implementation
except Exception as exc:
logging.error("Error populating BPF maps:\n%s", traceback.format_exc())
raise
```
Replace the `pass` statement with the actual map population logic from your implementation.
If you previously had a `try/except` block that raised `RuntimeError`, remove that and use the new pattern above.
Ensure that the logging configuration is set up elsewhere in your codebase to capture error logs.
</issue_to_address>
### Comment 2
<location> `source_query_proxy/epbf.py:194-203` </location>
<code_context>
+def _attach_tc_bpf(interface: str, bpf, ipr) -> Tuple[int, any, any]:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** No explicit cleanup of filters or qdiscs on failure.
If an error occurs after adding qdiscs or filters, partial state may remain. Please add rollback logic or clarify cleanup responsibilities.
Suggested implementation:
```python
def _attach_tc_bpf(interface: str, bpf, ipr) -> Tuple[int, any, any]:
"""Attach BPF programs to tc (traffic control)
Uses the same approach as sqredirect:
- ingress qdisc for incoming traffic
- sfq qdisc for outgoing traffic
- u32 filters with BPF actions
Args:
interface: Network interface name
bpf: BCC BPF instance with compiled programs
Returns:
Tuple of (ingress_qdisc_handle, egress_qdisc_handle, filter_handles)
Raises:
RuntimeError if setup fails, with cleanup of partial state.
```
```python
def _attach_tc_bpf(interface: str, bpf, ipr) -> Tuple[int, any, any]:
"""Attach BPF programs to tc (traffic control)
Uses the same approach as sqredirect:
- ingress qdisc for incoming traffic
- sfq qdisc for outgoing traffic
- u32 filters with BPF actions
Args:
interface: Network interface name
bpf: BCC BPF instance with compiled programs
Returns:
Tuple of (ingress_qdisc_handle, egress_qdisc_handle, filter_handles)
Raises:
RuntimeError if setup fails, with cleanup of partial state.
"""
ingress_qdisc_handle = None
egress_qdisc_handle = None
filter_handles = []
try:
# Example: Add ingress qdisc
ingress_qdisc_handle = ipr.tc("add", "ingress", interface)
# Example: Add egress qdisc
egress_qdisc_handle = ipr.tc("add", "sfq", interface)
# Example: Attach filters
filter_handle = ipr.tc("add-filter", "u32", interface, bpf=bpf)
filter_handles.append(filter_handle)
# ... more setup as needed ...
return ingress_qdisc_handle, egress_qdisc_handle, filter_handles
except Exception as e:
# Cleanup partial state
if filter_handles:
for fh in filter_handles:
try:
ipr.tc("del-filter", "u32", interface, handle=fh)
except Exception as cleanup_e:
logger.warning(f"Failed to cleanup filter {fh}: {cleanup_e}")
if ingress_qdisc_handle:
try:
ipr.tc("del", "ingress", interface)
except Exception as cleanup_e:
logger.warning(f"Failed to cleanup ingress qdisc: {cleanup_e}")
if egress_qdisc_handle:
try:
ipr.tc("del", "sfq", interface)
except Exception as cleanup_e:
logger.warning(f"Failed to cleanup egress qdisc: {cleanup_e}")
logger.error(f"Failed to attach tc bpf: {e}")
raise RuntimeError("Failed to attach tc bpf, partial state cleaned up") from e
```
- You may need to adjust the example ipr.tc calls to match your actual API and arguments.
- If you have more than one filter or qdisc type, track and clean up each accordingly.
- Ensure logger is imported and available in this scope.
</issue_to_address>
### Comment 3
<location> `source_query_proxy/epbf.py:314-323` </location>
<code_context>
+async def run_ebpf_redirection():
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Signal handler uses sys.exit(0), which may bypass asyncio cleanup.
Consider replacing sys.exit(0) with asyncio cancellation or allowing the main loop to exit after cleanup to ensure resources are properly released.
Suggested implementation:
```python
import asyncio
import signal
async def run_ebpf_redirection():
"""Main entry point for eBPF redirection
Generates BPF program, compiles it, attaches to network interface,
and keeps it running. Registers cleanup handlers for graceful shutdown.
"""
shutdown_event = asyncio.Event()
def handle_signal():
logger.info("Received shutdown signal, cancelling tasks and cleaning up.")
shutdown_event.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, handle_signal)
try:
# ... your main logic here ...
await shutdown_event.wait()
# Perform any necessary cleanup here
finally:
# Remove signal handlers
for sig in (signal.SIGINT, signal.SIGTERM):
loop.remove_signal_handler(sig)
logger.info("Cleanup complete, exiting gracefully.")
```
If there are other places in the file (or project) where `sys.exit(0)` is called in response to signals or shutdown, those should also be replaced with logic that sets the shutdown event or cancels tasks, allowing the event loop to exit cleanly after cleanup.
You may need to move your main logic inside the `try` block and ensure all resources are released in the `finally` block.
</issue_to_address>
### Comment 4
<location> `tests/test_ebpf_compilation.py:198-204` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 5
<location> `source_query_proxy/ebpf/elements.py:41-42` </location>
<code_context>
def render(self) -> str:
"""Generate C struct definition"""
lines = [f"struct {self.name} {{"]
for field_type, field_name in self.fields:
lines.append(f" {field_type} {field_name};")
lines.append("};")
return "\n".join(lines)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
```suggestion
lines.extend(
f" {field_type} {field_name};"
for field_type, field_name in self.fields
)
```
</issue_to_address>
### Comment 6
<location> `source_query_proxy/ebpf/elements.py:120` </location>
<code_context>
def render(self) -> str:
"""Generate complete function with signature and body"""
# Build parameter list
param_strs = []
for param_type, param_name in self.params:
param_strs.append(f"{param_type} {param_name}")
params_str = ", ".join(param_strs)
# Build function signature
signature = f"{self.return_type} {self.name}({params_str})"
# Build function body
lines = [signature + " {"]
for code_block in self.body_parts:
# Add code block (already should be indented)
lines.append(code_block)
lines.append("}")
return "\n".join(lines)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
- Simplify generator expression ([`simplify-generator`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/simplify-generator/))
</issue_to_address>
### Comment 7
<location> `source_query_proxy/ebpf/program.py:92` </location>
<code_context>
def render(self) -> str:
"""Generate complete BPF C code
Returns:
Complete C code ready for BCC compilation
"""
parts = []
# Includes
for inc in self.includes:
parts.append(f"#include {inc}")
parts.append("")
# Structs
if self.structs:
for struct in self.structs:
parts.append(struct.render())
parts.append("")
# Maps
if self.maps:
for map in self.maps:
parts.append(map.render())
parts.append("")
# Functions
for func in self.functions:
parts.append(func.render())
parts.append("")
return "\n".join(parts)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Merge consecutive list appends into a single extend [×2] ([`merge-list-appends-into-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/merge-list-appends-into-extend/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
</issue_to_address>
### Comment 8
<location> `source_query_proxy/epbf.py:55-59` </location>
<code_context>
def _get_default_interface():
"""Get default network interface"""
with pyroute2.IPRoute() as ipr:
default_routes = ipr.get_default_routes()
if default_routes:
idx = default_routes[0].get_attr('RTA_OIF')
interface = ipr.get_links(idx)[0].get_attr('IFLA_IFNAME')
return interface
raise config.ConfigurationError("Cannot determine default network interface")
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
</issue_to_address>
### Comment 9
<location> `source_query_proxy/epbf.py:301-311` </location>
<code_context>
def _cleanup_tc(ipr, ifindex: int, safe: bool = False):
"""Cleanup tc qdiscs
Args:
ipr: pyroute2 IPRoute instance
ifindex: Interface index
safe: If True, ignore 'Invalid argument' errors
"""
from pyroute2.netlink.exceptions import NetlinkError
try:
ipr.tc("del", "ingress", ifindex, "ffff:")
logger.debug("Removed ingress qdisc")
except NetlinkError as exc:
if not safe or exc.args[1] != 'Invalid argument':
logger.error(f"Failed to remove ingress qdisc: {exc}")
if not safe:
raise
try:
ipr.tc("del", "sfq", ifindex, "1:")
logger.debug("Removed sfq qdisc")
except NetlinkError as exc:
if not safe or exc.args[1] != 'Invalid argument':
logger.error(f"Failed to remove sfq qdisc: {exc}")
if not safe:
raise
</code_context>
<issue_to_address>
**issue (code-quality):** Hoist conditional out of nested conditional [×2] ([`hoist-if-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-if-from-if/))
</issue_to_address>
### Comment 10
<location> `tests/test_ebpf_compilation.py:137` </location>
<code_context>
def test_multiple_ports_same_program(self):
"""Test that we can create a program handling multiple port mappings"""
# This tests the approach we'll use in epbf.py
program = BPFProgram("multi_port")
# For now, just test that one operation works
# In the future, we need to support multiple mappings in one program
op = PacketRedirectOperation(server_port=27015, bind_port=27016)
program.apply_operation(op)
bpf_code = program.render()
try:
b = BPF(text=bpf_code, debug=0)
# Test that we can populate the map with multiple entries
port_map = b.get_table("port_map")
# Add multiple port mappings
port_map[27015] = 27016
port_map[27016] = 27017
port_map[27017] = 27018
# Verify mappings
assert port_map[27015].value == 27016
assert port_map[27016].value == 27017
assert port_map[27017].value == 27018
except Exception as e:
pytest.fail(f"Multi-port test failed: {e}")
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 11
<location> `tests/test_ebpf_generation.py:45` </location>
<code_context>
def test_simple_hash_map(self):
"""Test generating a BPF_HASH map"""
map = BPFMap("port_map", "u16", "u16", max_entries=1024)
code = map.render()
assert code == "BPF_HASH(port_map, u16, u16, 1024);"
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>
### Comment 12
<location> `tests/test_ebpf_generation.py:51` </location>
<code_context>
def test_struct_key_map(self):
"""Test map with struct key type"""
map = BPFMap("addr_map", "struct addr_key_t", "u16", max_entries=2048)
code = map.render()
assert code == "BPF_HASH(addr_map, struct addr_key_t, u16, 2048);"
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>
### Comment 13
<location> `tests/test_ebpf_generation.py:57` </location>
<code_context>
def test_default_max_entries(self):
"""Test default max_entries value"""
map = BPFMap("test_map", "u32", "u32")
code = map.render()
assert "10240" in code # Default max_entries
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `map` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
… review
Major Changes:
- **EBPFRedirector class**: Async lifecycle manager for eBPF redirection
- Supports start(), stop(), restart() operations
- Async context manager (__aenter__, __aexit__)
- Proper state management and idempotent cleanup
- No reliance on atexit/signal handlers (application-level control)
- Enables dynamic reconfiguration and on-the-fly reload
- **run_ebpf_redirection()**: Refactored to use EBPFRedirector
- Now a thin wrapper for backward compatibility
- Uses asyncio.Event for clean shutdown
- Proper signal handling without sys.exit()
Code Review Fixes:
- **epbf.py**:
- Add traceback logging to _populate_maps (don't mask errors)
- Add rollback logic to _attach_tc_bpf on failure
- Use named expression in _get_default_interface
- Hoist conditionals in _cleanup_tc for clarity
- **elements.py**:
- Use list.extend() instead of for-append loops
- Use generator expressions for cleaner code
- **program.py**:
- Use list.extend() with generator expressions
- Avoid intermediate variables
- **test_ebpf_generation.py**:
- Fix builtin shadowing: rename `map` to `bpf_map`
Benefits for sqproxy:
- Clean integration with asyncio event loop
- Supports config reload without restart
- Better resource management
- Testable and reusable
- Production-ready async architecture
Example usage:
# As context manager
async with EBPFRedirector() as redirector:
await asyncio.Event().wait()
# Manual lifecycle
redirector = EBPFRedirector()
await redirector.start()
await redirector.restart() # Reload config
await redirector.stop()
|
@sourcery-ai review new changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider splitting the large epbf.py into smaller modules (e.g. separate helpers, the EBPFRedirector class and CLI entrypoints) to improve readability and maintainability.
- Extract the rollback and cleanup logic in _attach_tc_bpf into dedicated helper functions to simplify the error-handling paths and make the flow easier to follow.
- Add return-type and argument type annotations to the internal helper functions in epbf.py to improve clarity and enable stronger static analysis support.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider splitting the large epbf.py into smaller modules (e.g. separate helpers, the EBPFRedirector class and CLI entrypoints) to improve readability and maintainability.
- Extract the rollback and cleanup logic in _attach_tc_bpf into dedicated helper functions to simplify the error-handling paths and make the flow easier to follow.
- Add return-type and argument type annotations to the internal helper functions in epbf.py to improve clarity and enable stronger static analysis support.
## Individual Comments
### Comment 1
<location> `tests/test_ebpf_compilation.py:198-204` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 2
<location> `source_query_proxy/epbf.py:324` </location>
<code_context>
def _attach_tc_bpf(interface: str, bpf, ipr) -> Tuple[int, any, any]:
"""Attach BPF programs to tc (traffic control)
Uses the same approach as sqredirect:
- ingress qdisc for incoming traffic
- sfq qdisc for outgoing traffic
- u32 filters with BPF actions
Args:
interface: Network interface name
bpf: BCC BPF instance with compiled programs
ipr: pyroute2 IPRoute instance
Returns:
(ifindex, fn_incoming, fn_outgoing) for cleanup
Raises:
RuntimeError: If setup fails, with automatic rollback of partial state
"""
from pyroute2.netlink.exceptions import NetlinkError
from pyroute2.netlink.rtnl import protocols
BPF = _import_bcc()
# Load BPF functions (SCHED_ACT mode like sqredirect)
fn_incoming = bpf.load_func("incoming", BPF.SCHED_ACT)
fn_outgoing = bpf.load_func("outgoing", BPF.SCHED_ACT)
logger.info(f"Loaded BPF functions: incoming={fn_incoming.name}, outgoing={fn_outgoing.name}")
# Get interface index
ifindex = ipr.link_lookup(ifname=interface)[0]
logger.debug(f"Interface {interface} has index {ifindex}")
# Track what we've added for rollback on failure
ingress_added = False
ingress_filter_added = False
sfq_added = False
try:
# Setup incoming traffic hook (ingress qdisc)
try:
ipr.tc("add", "ingress", ifindex, "ffff:")
ingress_added = True
logger.debug("Added ingress qdisc")
except NetlinkError as exc:
if exc.args[1] != 'File exists':
raise
logger.debug("Ingress qdisc already exists")
# Attach incoming BPF filter
action_incoming = {
"kind": "bpf",
"fd": fn_incoming.fd,
"name": fn_incoming.name,
"action": "ok",
}
ipr.tc(
"add-filter",
"u32",
ifindex,
":1",
parent="ffff:",
action=[action_incoming],
protocol=protocols.ETH_P_ALL,
target=0x10000,
keys=["0x0/0x0+0"],
)
ingress_filter_added = True
logger.info(f"✓ Attached incoming BPF filter to {interface} (ingress)")
# Setup outgoing traffic hook (sfq qdisc)
try:
ipr.tc("add", "sfq", ifindex, "1:")
sfq_added = True
logger.debug("Added sfq qdisc")
except NetlinkError as exc:
if exc.args[1] != 'File exists':
raise
logger.debug("SFQ qdisc already exists")
# Attach outgoing BPF filter
action_outgoing = {
"kind": "bpf",
"fd": fn_outgoing.fd,
"name": fn_outgoing.name,
"action": "ok",
}
ipr.tc(
"add-filter",
"u32",
ifindex,
":2",
parent="1:",
action=[action_outgoing],
target=0x10000,
keys=["0x0/0x0+0"],
)
logger.info(f"✓ Attached outgoing BPF filter to {interface} (egress)")
return ifindex, fn_incoming, fn_outgoing
except Exception as e:
# Rollback partial state on failure
logger.error(f"Failed to attach tc bpf, rolling back partial state: {e}")
# Clean up in reverse order of creation
if sfq_added:
try:
ipr.tc("del", "sfq", ifindex, "1:")
logger.debug("Rolled back sfq qdisc")
except Exception as cleanup_e:
logger.warning(f"Failed to rollback sfq qdisc: {cleanup_e}")
if ingress_filter_added:
try:
ipr.tc("del-filter", "u32", ifindex, ":1", parent="ffff:")
logger.debug("Rolled back ingress filter")
except Exception as cleanup_e:
logger.warning(f"Failed to rollback ingress filter: {cleanup_e}")
if ingress_added:
try:
ipr.tc("del", "ingress", ifindex, "ffff:")
logger.debug("Rolled back ingress qdisc")
except Exception as cleanup_e:
logger.warning(f"Failed to rollback ingress qdisc: {cleanup_e}")
raise RuntimeError(f"Failed to attach tc bpf, partial state rolled back") from e
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace f-string with no interpolated values with string ([`remove-redundant-fstring`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-fstring/))
```suggestion
raise RuntimeError("Failed to attach tc bpf, partial state rolled back") from e
```
</issue_to_address>
### Comment 3
<location> `source_query_proxy/epbf.py:341` </location>
<code_context>
def _cleanup_tc(ipr, ifindex: int, safe: bool = False):
"""Cleanup tc qdiscs
Args:
ipr: pyroute2 IPRoute instance
ifindex: Interface index
safe: If True, ignore 'Invalid argument' errors
"""
from pyroute2.netlink.exceptions import NetlinkError
try:
ipr.tc("del", "ingress", ifindex, "ffff:")
logger.debug("Removed ingress qdisc")
except NetlinkError as exc:
if safe and exc.args[1] == 'Invalid argument':
# Silently ignore if qdisc doesn't exist
pass
else:
logger.error(f"Failed to remove ingress qdisc: {exc}")
if not safe:
raise
try:
ipr.tc("del", "sfq", ifindex, "1:")
logger.debug("Removed sfq qdisc")
except NetlinkError as exc:
if safe and exc.args[1] == 'Invalid argument':
# Silently ignore if qdisc doesn't exist
pass
else:
logger.error(f"Failed to remove sfq qdisc: {exc}")
if not safe:
raise
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Swap if/else to remove empty if body [×2] ([`remove-pass-body`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-pass-body/))
- Hoist conditional out of nested conditional [×2] ([`hoist-if-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-if-from-if/))
</issue_to_address>
### Comment 4
<location> `tests/test_ebpf_compilation.py:137` </location>
<code_context>
def test_multiple_ports_same_program(self):
"""Test that we can create a program handling multiple port mappings"""
# This tests the approach we'll use in epbf.py
program = BPFProgram("multi_port")
# For now, just test that one operation works
# In the future, we need to support multiple mappings in one program
op = PacketRedirectOperation(server_port=27015, bind_port=27016)
program.apply_operation(op)
bpf_code = program.render()
try:
b = BPF(text=bpf_code, debug=0)
# Test that we can populate the map with multiple entries
port_map = b.get_table("port_map")
# Add multiple port mappings
port_map[27015] = 27016
port_map[27016] = 27017
port_map[27017] = 27018
# Verify mappings
assert port_map[27015].value == 27016
assert port_map[27016].value == 27017
assert port_map[27017].value == 27018
except Exception as e:
pytest.fail(f"Multi-port test failed: {e}")
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Code Quality Fixes:
- **epbf.py**:
- Remove redundant f-string in RuntimeError
- Swap conditionals in _cleanup_tc to remove empty if bodies
- Add return type annotations to all helper functions:
- _get_addr_interface() -> Optional[str]
- _get_default_interface() -> str
- _populate_maps() -> None
- _cleanup_tc() -> None
- Extract rollback logic into _rollback_tc_bpf() helper
- Cleaner error handling in _attach_tc_bpf
- Easier to test and maintain
- **test_ebpf_compilation.py**:
- Remove try/except conditionals from tests (per Google SWE guidelines)
- Let pytest handle exceptions directly for clearer test intent
- Affected tests:
- test_generated_code_syntax()
- test_multiple_ports_same_program()
- test_various_port_combinations()
Benefits:
- Better type checking and IDE support
- Cleaner error handling paths
- More maintainable code structure
- Tests are now "trivially correct upon inspection"
- Follows Google's testing best practices
All changes maintain backward compatibility and improve code quality.
Address code review comment to improve maintainability by splitting
the large epbf.py file into smaller, single-responsibility modules.
New Structure:
- **source_query_proxy/ebpf/runtime.py** (230 lines)
- Network interface helpers (get_addr_interface, get_default_interface)
- Server configuration collection (collect_server_mappings)
- BPF program generation (generate_bpf_program)
- Map population (populate_maps, ip_to_int)
- BCC lazy import (import_bcc)
- **source_query_proxy/ebpf/tc.py** (200 lines)
- TC attachment logic (attach_tc_bpf)
- Rollback on failure (rollback_tc_bpf)
- TC cleanup (cleanup_tc)
- All traffic control operations in one place
- **source_query_proxy/ebpf/redirector.py** (210 lines)
- EBPFRedirector class (async lifecycle manager)
- start(), stop(), restart() methods
- Async context manager support
- Clean separation of concerns
- **source_query_proxy/epbf.py** (105 lines, was 630 lines)
- Main entry point (run_ebpf_redirection)
- Backward compatibility (get_ebpf_program_run_args)
- Re-exports for compatibility
- Clear documentation of new structure
Benefits:
- 84% reduction in main file size (630 → 105 lines)
- Clear separation of concerns (network, TC, lifecycle)
- Easier to test individual components
- Better discoverability
- Simpler import paths for specific functionality
- Follows single-responsibility principle
Usage (unchanged for backward compatibility):
# Old way still works
from source_query_proxy.epbf import run_ebpf_redirection
await run_ebpf_redirection()
# New way (recommended)
from source_query_proxy.ebpf.redirector import EBPFRedirector
async with EBPFRedirector() as redirector:
await asyncio.Event().wait()
All tests and existing code continue to work without modification.
|
@sourcery-ai review new changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- There’s a lot of duplicated header‐parsing code in PacketRedirectOperation’s incoming and outgoing functions—consider extracting that into a shared helper or template snippet to ease future updates.
- In EBPFRedirector.start (and similarly in run_ebpf_redirection) the broad Exception catches may suppress unexpected errors—narrow them to known failure cases (e.g. compilation or config errors) so real bugs aren’t masked.
- The legacy get_ebpf_program_run_args() helper still lives alongside the new API—once migration is complete you might remove or isolate it in tests to avoid confusion for new users.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- There’s a lot of duplicated header‐parsing code in PacketRedirectOperation’s incoming and outgoing functions—consider extracting that into a shared helper or template snippet to ease future updates.
- In EBPFRedirector.start (and similarly in run_ebpf_redirection) the broad Exception catches may suppress unexpected errors—narrow them to known failure cases (e.g. compilation or config errors) so real bugs aren’t masked.
- The legacy get_ebpf_program_run_args() helper still lives alongside the new API—once migration is complete you might remove or isolate it in tests to avoid confusion for new users.
## Individual Comments
### Comment 1
<location> `source_query_proxy/epbf.py:57-58` </location>
<code_context>
- if interface is not None:
- args += ['-i', interface]
+ # Register signal handlers (let application handle SIGTERM/SIGINT)
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGINT, signal_handler)
- return args
</code_context>
<issue_to_address>
**issue (bug_risk):** Signal handlers are registered synchronously, which may not be safe in all async contexts.
Using signal.signal in async code can cause issues if the event loop isn't in the main thread or when using frameworks like uvloop. Prefer asyncio.add_signal_handler for compatibility with async event loops.
</issue_to_address>
### Comment 2
<location> `source_query_proxy/epbf.py:60-69` </location>
<code_context>
+
+ # Attach to tc
+ logger.info(f"Attaching BPF programs to interface {interface}...")
+ try:
+ self._ifindex, self._fn_incoming, self._fn_outgoing = attach_tc_bpf(
+ interface, self._bpf, self._ipr
+ )
+ except Exception as e:
+ logger.error(f"Failed to attach BPF programs: {e}")
+ self._ipr.close()
+ self._ipr = None
+ raise RuntimeError(f"tc attachment failed: {e}") from e
+
+ logger.info("✓ BPF programs attached successfully")
+ logger.info("=== eBPF redirection is active ===")
+
+ self._running = True
+
+ except Exception:
+ # Cleanup on failure
+ await self._cleanup()
</code_context>
<issue_to_address>
**suggestion (bug_risk):** get_ebpf_program_run_args swallows all exceptions and returns an empty list.
Logging or raising a specific error instead of returning an empty list would improve debuggability and help identify configuration or logic issues.
</issue_to_address>
### Comment 3
<location> `source_query_proxy/ebpf/operations.py:58-68` </location>
<code_context>
+ self.server_port = server_port
+ self.bind_port = bind_port
+ self.bind_ip = bind_ip
+ self.use_ipport_key = use_ipport_key or (bind_ip is not None)
+
+ def apply(self, program: 'BPFProgram'):
</code_context>
<issue_to_address>
**suggestion:** Implicitly setting use_ipport_key based on bind_ip may cause confusion.
If use_ipport_key is set to False but bind_ip is provided, the current logic will override the caller's intent. Please clarify the precedence or update the documentation to reflect this behavior.
```suggestion
"""
Args:
server_port: Game server port to redirect from
bind_port: Proxy bind port to redirect to
bind_ip: Bind IP address (if None, uses port-only lookup)
use_ipport_key: If True, use IP+port composite key instead of port-only.
If False, but bind_ip is provided, use_ipport_key will be set to True.
If both are provided, bind_ip takes precedence and use_ipport_key will be True.
To avoid confusion, explicitly set use_ipport_key when providing bind_ip.
(If use_ipport_key is False and bind_ip is provided, use_ipport_key will be overridden to True.)
"""
self.server_port = server_port
self.bind_port = bind_port
self.bind_ip = bind_ip
self.use_ipport_key = use_ipport_key or (bind_ip is not None)
```
</issue_to_address>
### Comment 4
<location> `source_query_proxy/ebpf/redirector.py:130-133` </location>
<code_context>
+
+ self._running = True
+
+ except Exception:
+ # Cleanup on failure
+ await self._cleanup()
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Catching all exceptions in start() and calling _cleanup may mask the original error.
If _cleanup fails, the original exception could be suppressed. Use try/finally or log both exceptions to improve error visibility.
```suggestion
except Exception as exc:
# Cleanup on failure, ensure both exceptions are visible
try:
await self._cleanup()
except Exception as cleanup_exc:
logger.error(f"Exception during cleanup after error: {cleanup_exc}", exc_info=True)
# Optionally, chain the exceptions for visibility
raise exc from cleanup_exc
raise
```
</issue_to_address>
### Comment 5
<location> `source_query_proxy/ebpf/tc.py:29` </location>
<code_context>
+ sfq_added: Whether sfq qdisc was added
+ """
+ # Clean up in reverse order of creation
+ if sfq_added:
+ try:
+ ipr.tc("del", "sfq", ifindex, "1:")
</code_context>
<issue_to_address>
**suggestion:** rollback_tc_bpf does not handle exceptions during rollback robustly.
Currently, only the last error is logged if multiple rollback steps fail. Aggregating errors or ensuring all cleanup steps run independently would improve robustness.
</issue_to_address>
### Comment 6
<location> `source_query_proxy/ebpf/tc.py:82` </location>
<code_context>
+ logger.info(f"Loaded BPF functions: incoming={fn_incoming.name}, outgoing={fn_outgoing.name}")
+
+ # Get interface index
+ ifindex = ipr.link_lookup(ifname=interface)[0]
+ logger.debug(f"Interface {interface} has index {ifindex}")
+
</code_context>
<issue_to_address>
**issue:** attach_tc_bpf assumes link_lookup always returns a non-empty list.
If link_lookup returns an empty list, accessing the first element will raise an IndexError. Please add a check for this case and raise a clear error if the interface is not found.
</issue_to_address>
### Comment 7
<location> `tests/test_ebpf_generation.py:32-37` </location>
<code_context>
+)
+
+
+class TestBPFStruct:
+ """Test BPFStruct code generation"""
+
+ def test_simple_struct(self):
+ """Test generating a simple struct"""
+ struct = BPFStruct("addr_key_t", [
+ ("u32", "ip"),
+ ("u16", "port")
+ ])
+
+ code = struct.render()
+ assert "struct addr_key_t {" in code
+ assert "u32 ip;" in code
+ assert "u16 port;" in code
+ assert code.endswith("};")
+
+ def test_empty_struct(self):
+ """Test generating an empty struct"""
+ struct = BPFStruct("empty_t", [])
</code_context>
<issue_to_address>
**suggestion (testing):** Missing tests for invalid or edge-case struct field types.
Please add tests for BPFStruct with invalid or edge-case field types, such as empty strings, unsupported types, or duplicate field names, to verify error handling and code generation.
```suggestion
def test_empty_struct(self):
"""Test generating an empty struct"""
struct = BPFStruct("empty_t", [])
code = struct.render()
assert "struct empty_t {" in code
assert "};" in code
def test_invalid_field_type(self):
"""Test struct with an unsupported field type"""
struct = BPFStruct("invalid_type_t", [
("foo32", "ip"),
])
code = struct.render()
# Should still generate code, but with the unsupported type present
assert "foo32 ip;" in code
def test_empty_field_type_or_name(self):
"""Test struct with empty field type or name"""
struct = BPFStruct("empty_field_t", [
("", "ip"),
("u32", ""),
])
code = struct.render()
assert " ip;" in code
assert "u32 ;" in code
def test_duplicate_field_names(self):
"""Test struct with duplicate field names"""
struct = BPFStruct("dup_field_t", [
("u32", "ip"),
("u16", "ip"),
])
code = struct.render()
# Both fields should be present, but with duplicate names
assert code.count("ip;") == 2
```
</issue_to_address>
### Comment 8
<location> `tests/test_ebpf_generation.py:40-49` </location>
<code_context>
+ assert "};" in code
+
+
+class TestBPFMap:
+ """Test BPFMap code generation"""
+
+ def test_simple_hash_map(self):
+ """Test generating a BPF_HASH map"""
+ bpf_map = BPFMap("port_map", "u16", "u16", max_entries=1024)
+ code = bpf_map.render()
+ assert code == "BPF_HASH(port_map, u16, u16, 1024);"
+
+ def test_struct_key_map(self):
+ """Test map with struct key type"""
+ bpf_map = BPFMap("addr_map", "struct addr_key_t", "u16", max_entries=2048)
+ code = bpf_map.render()
+ assert code == "BPF_HASH(addr_map, struct addr_key_t, u16, 2048);"
+
+ def test_default_max_entries(self):
+ """Test default max_entries value"""
+ bpf_map = BPFMap("test_map", "u32", "u32")
</code_context>
<issue_to_address>
**suggestion (testing):** No tests for map type variations or invalid parameters.
Please add tests covering various map_type values and invalid max_entries to ensure proper error handling.
Suggested implementation:
```python
class TestBPFMap:
"""Test BPFMap code generation"""
def test_simple_hash_map(self):
"""Test generating a BPF_HASH map"""
bpf_map = BPFMap("port_map", "u16", "u16", max_entries=1024)
code = bpf_map.render()
assert code == "BPF_HASH(port_map, u16, u16, 1024);"
def test_struct_key_map(self):
"""Test map with struct key type"""
bpf_map = BPFMap("addr_map", "struct addr_key_t", "u16", max_entries=2048)
code = bpf_map.render()
assert code == "BPF_HASH(addr_map, struct addr_key_t, u16, 2048);"
def test_default_max_entries(self):
"""Test default max_entries value"""
bpf_map = BPFMap("test_map", "u32", "u32")
code = bpf_map.render()
assert "10240" in code # Default max_entries
def test_map_type_variations(self):
"""Test generating maps with different map_type values"""
bpf_array = BPFMap("array_map", "u32", "u64", map_type="BPF_ARRAY", max_entries=128)
code_array = bpf_array.render()
assert code_array == "BPF_ARRAY(array_map, u32, u64, 128);"
bpf_percpu_hash = BPFMap("percpu_hash_map", "u32", "u64", map_type="BPF_PERCPU_HASH", max_entries=256)
code_percpu_hash = bpf_percpu_hash.render()
assert code_percpu_hash == "BPF_PERCPU_HASH(percpu_hash_map, u32, u64, 256);"
bpf_lru_hash = BPFMap("lru_hash_map", "u32", "u64", map_type="BPF_LRU_HASH", max_entries=512)
code_lru_hash = bpf_lru_hash.render()
assert code_lru_hash == "BPF_LRU_HASH(lru_hash_map, u32, u64, 512);"
def test_invalid_max_entries(self):
"""Test error handling for invalid max_entries values"""
import pytest
with pytest.raises(ValueError):
BPFMap("neg_map", "u32", "u32", max_entries=-1)
with pytest.raises(ValueError):
BPFMap("zero_map", "u32", "u32", max_entries=0)
with pytest.raises(TypeError):
BPFMap("str_map", "u32", "u32", max_entries="not_an_int")
def test_invalid_map_type(self):
"""Test error handling for invalid map_type values"""
import pytest
with pytest.raises(ValueError):
BPFMap("bad_map", "u32", "u32", map_type="BPF_UNKNOWN", max_entries=10)
```
These tests assume that the BPFMap class raises ValueError for invalid max_entries (negative, zero) and invalid map_type, and TypeError for non-integer max_entries. If the BPFMap implementation does not currently raise these exceptions, you will need to update its constructor to perform these checks and raise the appropriate exceptions.
</issue_to_address>
### Comment 9
<location> `tests/test_ebpf_compilation.py:21-30` </location>
<code_context>
+def _compile_and_verify_bpf(bpf_code, expected_map_name):
</code_context>
<issue_to_address>
**suggestion (testing):** No test for BPF compilation failure or invalid code.
Add a test that triggers a BPF compilation error to confirm proper error handling and reporting.
</issue_to_address>
### Comment 10
<location> `tests/test_ebpf_compilation.py:188-194` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 11
<location> `source_query_proxy/ebpf/tc.py:177` </location>
<code_context>
def cleanup_tc(ipr: Any, ifindex: int, safe: bool = False) -> None:
"""Cleanup tc qdiscs
Args:
ipr: pyroute2 IPRoute instance
ifindex: Interface index
safe: If True, ignore 'Invalid argument' errors
"""
from pyroute2.netlink.exceptions import NetlinkError
try:
ipr.tc("del", "ingress", ifindex, "ffff:")
logger.debug("Removed ingress qdisc")
except NetlinkError as exc:
# Only ignore if safe mode and qdisc doesn't exist
if not (safe and exc.args[1] == 'Invalid argument'):
logger.error(f"Failed to remove ingress qdisc: {exc}")
if not safe:
raise
try:
ipr.tc("del", "sfq", ifindex, "1:")
logger.debug("Removed sfq qdisc")
except NetlinkError as exc:
# Only ignore if safe mode and qdisc doesn't exist
if not (safe and exc.args[1] == 'Invalid argument'):
logger.error(f"Failed to remove sfq qdisc: {exc}")
if not safe:
raise
</code_context>
<issue_to_address>
**issue (code-quality):** Hoist conditional out of nested conditional [×2] ([`hoist-if-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-if-from-if/))
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
This commit addresses all remaining code review feedback for production readiness and improved error handling. **Async Safety & Signal Handling:** - Use asyncio.add_signal_handler instead of signal.signal in run_ebpf_redirection() - Properly remove signal handlers in finally block - Async-safe signal handling compatible with uvloop and all async frameworks **Exception Handling Improvements:** - Narrow exception catches in EBPFRedirector.start() to (RuntimeError, ValueError, OSError) - Chain cleanup exceptions to preserve both original and cleanup errors - Fix get_ebpf_program_run_args() to log and raise instead of swallowing exceptions - Add RuntimeError wrapper with full exc_info for better debuggability **TC Operations Robustness:** - Add check for empty link_lookup result with clear error message - Improve error aggregation in rollback_tc_bpf() - log all errors individually - Add summary error log if multiple rollback steps fail - Hoist conditionals in cleanup_tc() for better readability **Documentation Improvements:** - Clarify use_ipport_key vs bind_ip precedence in PacketRedirectOperation - Document that bind_ip presence overrides explicit use_ipport_key=False - Add inline comment explaining precedence logic **Test Coverage Expansion:** - Add edge case tests for BPFStruct (empty fields, duplicates, unsupported types) - Add edge case tests for BPFMap (different map types, zero/negative max_entries) - Add compilation failure tests (syntax errors, undefined functions) - Tests document current behavior without requiring validation (keeps template engine simple) Changes: - source_query_proxy/epbf.py: Async-safe signals, better exception handling - source_query_proxy/ebpf/redirector.py: Narrow exceptions, chain cleanup errors - source_query_proxy/ebpf/tc.py: Interface validation, error aggregation, hoisted conditionals - source_query_proxy/ebpf/operations.py: Clarified precedence documentation - tests/test_ebpf_generation.py: Edge case tests for template engine - tests/test_ebpf_compilation.py: Compilation failure tests All changes maintain backward compatibility and improve production robustness.
|
@sourcery-ai review new changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `source_query_proxy/ebpf/redirector.py:145-152` </location>
<code_context>
+
+ Safe to call multiple times (idempotent).
+ """
+ if not self._running:
+ logger.debug("eBPF redirection is not running, skipping stop")
+ return
</code_context>
<issue_to_address>
**suggestion (bug_risk):** stop() is idempotent, but may not clean up resources if partially initialized.
Currently, stop() returns early if not running, which may prevent cleanup of partially allocated resources. To ensure proper resource release, always invoke _cleanup(), regardless of running state.
```suggestion
if not self._running:
logger.debug("eBPF redirection is not running, proceeding to cleanup resources")
logger.info("Stopping eBPF redirection and cleaning up resources...")
self._running = False
await self._cleanup()
logger.info("✓ eBPF redirection stopped and resources cleaned up")
```
</issue_to_address>
### Comment 2
<location> `tests/test_ebpf_compilation.py:121-130` </location>
<code_context>
+ def test_multiple_ports_same_program(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Test for multiple port mappings in a single program is present, but does not test IP+port mode.
Please add a test for multiple IP+port mappings in a single program to verify addr_map is populated correctly in all modes.
</issue_to_address>
### Comment 3
<location> `tests/test_ebpf_compilation.py:232-238` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 4
<location> `tests/test_ebpf_generation.py:95-102` </location>
<code_context>
def test_different_map_types(self):
"""Test generating maps with different map_type values"""
# BPF_ARRAY
bpf_array = BPFMap("array_map", "u32", "u64", map_type="BPF_ARRAY", max_entries=128)
code_array = bpf_array.render()
assert code_array == "BPF_ARRAY(array_map, u32, u64, 128);"
# BPF_PERCPU_HASH
bpf_percpu = BPFMap("percpu_map", "u32", "u64", map_type="BPF_PERCPU_HASH", max_entries=256)
code_percpu = bpf_percpu.render()
assert code_percpu == "BPF_PERCPU_HASH(percpu_map, u32, u64, 256);"
</code_context>
<issue_to_address>
**issue (code-quality):** Extract duplicate code into method ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
This commit addresses the remaining code review feedback to improve
robustness and test quality.
**EBPFRedirector.stop() Robustness (Comment 1):**
- Fixed stop() to always cleanup resources, even if not fully started
- Removed early return that could prevent cleanup of partially allocated resources
- Ensures proper resource release in all scenarios (idempotent cleanup)
- Updated logging to clarify cleanup behavior
Before:
if not self._running:
return # Could skip cleanup!
After:
if not self._running:
logger.debug("...proceeding to cleanup resources")
# Always cleanup regardless of running state
await self._cleanup()
**Test Coverage Improvements:**
1. **Multiple IP+port Mappings Test (Comment 2):**
- Added test_multiple_ipport_mappings_same_program()
- Tests addr_map with multiple (IP, port) tuples
- Verifies that single program handles multiple IP+port combinations
- Covers both port-only and IP+port modes now
2. **Removed Conditionals from Tests (Comment 3):**
- Split test_bcc_availability_reporting() into two separate tests
- test_bcc_is_available() - runs only when BCC installed
- test_bcc_not_available() - runs only when BCC not installed
- Uses pytest.mark.skipif instead of conditionals
- Follows Google SWE testing best practices
3. **Extracted Duplicate Code (Comment 4):**
- Created _assert_map_renders_as() helper in TestBPFMap
- Reduces duplication in test_different_map_types()
- Makes test intent clearer and easier to extend
Changes:
- source_query_proxy/ebpf/redirector.py: Always cleanup in stop()
- tests/test_ebpf_compilation.py: Added IP+port test, split conditional test
- tests/test_ebpf_generation.py: Extracted helper method
All tests remain clear and "trivially correct upon inspection".
…entation Complete documentation updates for the transition from external sqredirect to internal eBPF implementation using BCC. Changes: - Add MIGRATION.md: Comprehensive migration guide with step-by-step instructions, troubleshooting, and performance comparison - Update CHANGELOG.md: Add v3.0.0 release notes documenting internal eBPF implementation, breaking changes (none!), and new features - Update README.rst: Replace sqredirect references with BCC installation, update version to 3.0.0, improve eBPF setup instructions - Update PROGRESS.md: Mark documentation updates as complete, update status to reflect completion of implementation and documentation This completes the high-priority documentation tasks for issue #131. The internal eBPF implementation is now fully documented and ready for testing and deployment.
Create complete documentation structure using MkDocs Material theme.
Documentation includes:
- Installation guide with detailed setup instructions
- Quick start guide for rapid deployment
- Configuration reference with all options
- eBPF guides (overview, setup, architecture, performance)
- Migration guide from v2.x to v3.0
- Troubleshooting guide for common issues
- Development guide for contributors
- API reference for Python developers
- Docker deployment guide
- Contributing guidelines
- License information
Structure:
- mkdocs.yml: MkDocs configuration with Material theme
- docs_requirements.txt: Documentation build dependencies
- docs/: Complete documentation in markdown format
- index.md: Home page with project overview
- installation.md: Installation instructions
- quickstart.md: 5-minute setup guide
- configuration.md: Configuration options reference
- ebpf/: eBPF-specific documentation
- overview.md: eBPF concepts and architecture
- setup.md: Step-by-step eBPF setup
- architecture.md: Implementation deep dive
- performance.md: Benchmarks and optimization
- migration.md: v2.x to v3.0 migration guide
- troubleshooting.md: Common issues and solutions
- development.md: Development workflow
- api-reference.md: Python API documentation
- docker.md: Docker and Kubernetes deployment
- contributing.md: Contribution guidelines
- changelog.md: Version history
- license.md: MIT License
Features:
- Dark/light theme toggle
- Code syntax highlighting
- Search functionality
- Navigation tabs and sections
- Mobile-responsive design
- Copy-to-clipboard for code blocks
This completes the documentation for v3.0.0 release.
Summary by Sourcery
Replace the external sqredirect tool with an internal class-based eBPF template engine that generates, compiles, and attaches BPF programs directly via BCC and tc.
New Features:
Enhancements:
CI:
Documentation:
Tests:
Chores: