Skip to content

Commit 4bfc623

Browse files
committed
refactor utils tests to use embedded twisted server; minor polishing
1 parent 6db748c commit 4bfc623

File tree

8 files changed

+135
-106
lines changed

8 files changed

+135
-106
lines changed

paradedb/localstack_paradedb/extension.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import socket
33

44
from localstack_extensions.utils.docker import ProxiedDockerContainerExtension
5-
from werkzeug.datastructures import Headers
65
from localstack import config
76

87
# Environment variables for configuration
@@ -83,13 +82,6 @@ def tcp_connection_matcher(self, data: bytes) -> bool:
8382

8483
return False
8584

86-
def http2_request_matcher(self, headers: Headers) -> bool:
87-
"""
88-
Define whether an HTTP2 request should be proxied based on request headers.
89-
Not used for TCP connections - see tcp_connection_matcher instead.
90-
"""
91-
return False
92-
9385
def _check_tcp_port(self, host: str, port: int, timeout: float = 2.0) -> None:
9486
"""Check if a TCP port is accepting connections."""
9587
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

utils/localstack_extensions/utils/docker.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import re
22
import logging
3-
from abc import abstractmethod
43
from functools import cache
54
from typing import Callable
65
import requests
@@ -188,9 +187,14 @@ def _setup_tcp_protocol_routing(self):
188187
f"Registered TCP extension {self.name} -> {self.container_host}:{target_port} on gateway"
189188
)
190189

191-
@abstractmethod
192190
def http2_request_matcher(self, headers: Headers) -> bool:
193-
"""Define whether an HTTP2 request should be proxied, based on request headers."""
191+
"""
192+
Define whether an HTTP2 request should be proxied, based on request headers.
193+
194+
Default implementation returns False (no HTTP2 proxying).
195+
Override this method in subclasses that need HTTP2 proxying.
196+
"""
197+
return False
194198

195199
def on_platform_shutdown(self):
196200
self._remove_container()

utils/tests/integration/conftest.py

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,18 @@
99
"""
1010

1111
import socket
12-
import pytest
12+
import threading
13+
import time
1314

15+
import pytest
1416
from hyperframe.frame import Frame
15-
from werkzeug.datastructures import Headers
16-
from localstack_extensions.utils.docker import ProxiedDockerContainerExtension
17+
from localstack.utils.net import get_free_tcp_port
18+
from rolo import Router
19+
from rolo.gateway import Gateway
20+
from twisted.internet import reactor
21+
from twisted.web import server as twisted_server
1722

23+
from localstack_extensions.utils.docker import ProxiedDockerContainerExtension
1824

1925
GRPCBIN_IMAGE = "moul/grpcbin"
2026
GRPCBIN_INSECURE_PORT = 9000 # HTTP/2 without TLS
@@ -53,51 +59,75 @@ def _tcp_health_check():
5359
image_name=GRPCBIN_IMAGE,
5460
container_ports=[GRPCBIN_INSECURE_PORT, GRPCBIN_SECURE_PORT],
5561
health_check_fn=_tcp_health_check,
62+
tcp_ports=[GRPCBIN_INSECURE_PORT], # Enable raw TCP proxying for gRPC/HTTP2
5663
)
5764

58-
def http2_request_matcher(self, headers: Headers) -> bool:
59-
"""
60-
gRPC services use direct TCP connections, not HTTP gateway routing.
61-
This method is not used in these tests but is required by the base class.
62-
"""
63-
return False
65+
def tcp_connection_matcher(self, data: bytes) -> bool:
66+
"""Detect HTTP/2 connection preface to route gRPC/HTTP2 traffic."""
67+
# HTTP/2 connections start with the connection preface
68+
if len(data) >= len(HTTP2_PREFACE):
69+
return data.startswith(HTTP2_PREFACE)
70+
# Also match if we have partial preface data (for early detection)
71+
return len(data) > 0 and HTTP2_PREFACE.startswith(data)
6472

6573

6674
@pytest.fixture(scope="session")
67-
def grpcbin_extension():
75+
def grpcbin_extension_server():
6876
"""
69-
Start grpcbin using ProxiedDockerContainerExtension.
77+
Start grpcbin using ProxiedDockerContainerExtension with a test gateway server.
7078
71-
This tests the Docker container management capabilities while providing
72-
a realistic gRPC/HTTP2 test service for integration tests.
79+
This tests the Docker container management and proxy capabilities by:
80+
1. Starting the grpcbin container via the extension
81+
2. Setting up a Gateway with the extension's routes and TCP patches
82+
3. Serving the Gateway on a test port via Twisted
83+
4. Returning server info for end-to-end testing
7384
"""
7485
extension = GrpcbinExtension()
7586

76-
# Start the container using the extension infrastructure
77-
extension.start_container()
87+
# Create router and update with extension routes
88+
# This will start the grpcbin container and apply TCP protocol patches
89+
router = Router()
90+
extension.update_gateway_routes(router)
7891

79-
yield extension
92+
# Create a Gateway with proper TCP support
93+
# The TCP patches are applied by update_gateway_routes above
94+
gateway = Gateway(router)
8095

81-
# Cleanup
82-
extension.on_platform_shutdown()
96+
# Start gateway on a test port using Twisted
97+
test_port = get_free_tcp_port()
98+
site = twisted_server.Site(gateway)
99+
listener = reactor.listenTCP(test_port, site)
83100

101+
# Run reactor in background thread
102+
def run_reactor():
103+
reactor.run(installSignalHandlers=False)
84104

85-
@pytest.fixture
86-
def grpcbin_host(grpcbin_extension):
87-
"""Return the host address for the grpcbin container."""
88-
return grpcbin_extension.container_host
105+
reactor_thread = threading.Thread(target=run_reactor, daemon=True)
106+
reactor_thread.start()
89107

108+
# Wait for reactor to start - not ideal, but should work as a simple solution
109+
time.sleep(0.5)
90110

91-
@pytest.fixture
92-
def grpcbin_insecure_port(grpcbin_extension):
93-
"""Return the insecure (HTTP/2 without TLS) port for grpcbin."""
94-
return GRPCBIN_INSECURE_PORT
111+
# Return server information for tests
112+
server_info = {
113+
"port": test_port,
114+
"url": f"http://localhost:{test_port}",
115+
"extension": extension,
116+
"listener": listener,
117+
}
95118

119+
yield server_info
96120

97-
@pytest.fixture
98-
def grpcbin_secure_port(grpcbin_extension):
99-
"""Return the secure (HTTP/2 with TLS) port for grpcbin."""
100-
return GRPCBIN_SECURE_PORT
121+
# Cleanup
122+
reactor.callFromThread(reactor.stop)
123+
time.sleep(0.5)
124+
extension.on_platform_shutdown()
125+
126+
127+
@pytest.fixture(scope="session")
128+
def grpcbin_extension(grpcbin_extension_server):
129+
"""Return the extension instance from the server fixture."""
130+
return grpcbin_extension_server["extension"]
101131

102132

103133
def parse_server_frames(data: bytes) -> list:

utils/tests/integration/test_extension_integration.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import socket
99

10+
from werkzeug.datastructures import Headers
11+
1012

1113
class TestProxiedDockerContainerExtension:
1214
"""Tests for ProxiedDockerContainerExtension using the GrpcbinExtension."""
@@ -27,21 +29,20 @@ def test_extension_container_host_is_accessible(self, grpcbin_extension):
2729
"localhost.localstack.cloud",
2830
) or grpcbin_extension.container_host.startswith("172.")
2931

30-
def test_extension_ports_are_reachable(self, grpcbin_host, grpcbin_insecure_port):
31-
"""Test that the extension's ports are reachable via TCP."""
32+
def test_extension_ports_are_reachable(self, grpcbin_extension_server):
33+
"""Test that the gateway port is reachable via TCP."""
34+
gateway_port = grpcbin_extension_server["port"]
3235
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3336
sock.settimeout(2.0)
3437
try:
35-
sock.connect((grpcbin_host, grpcbin_insecure_port))
38+
sock.connect(("localhost", gateway_port))
3639
sock.close()
3740
# Connection successful
3841
except (socket.timeout, socket.error) as e:
39-
raise AssertionError(f"Could not connect to grpcbin port: {e}")
42+
raise AssertionError(f"Could not connect to gateway port: {e}")
4043

4144
def test_extension_implements_required_methods(self, grpcbin_extension):
4245
"""Test that the extension properly implements the required abstract methods."""
43-
from werkzeug.datastructures import Headers
44-
4546
# http2_request_matcher should be callable
4647
result = grpcbin_extension.http2_request_matcher(Headers())
4748
assert result is False, "gRPC services should not proxy through HTTP gateway"

utils/tests/integration/test_grpc_e2e.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
class TestGrpcEndToEnd:
1515
"""End-to-end tests making actual gRPC calls to grpcbin."""
1616

17-
def test_grpc_empty_call(self, grpcbin_host, grpcbin_insecure_port):
18-
"""Test making a gRPC call to grpcbin's Empty service."""
19-
# Create a channel to grpcbin
20-
channel = grpc.insecure_channel(f"{grpcbin_host}:{grpcbin_insecure_port}")
17+
def test_grpc_empty_call(self, grpcbin_extension_server):
18+
"""Test making a gRPC call to grpcbin's Empty service via the gateway."""
19+
# Create a channel to grpcbin through the gateway
20+
gateway_port = grpcbin_extension_server["port"]
21+
channel = grpc.insecure_channel(f"localhost:{gateway_port}")
2122

2223
try:
2324
# Use grpc.channel_ready_future to verify connection
@@ -43,9 +44,10 @@ def test_grpc_empty_call(self, grpcbin_host, grpcbin_insecure_port):
4344
finally:
4445
channel.close()
4546

46-
def test_grpc_index_call(self, grpcbin_host, grpcbin_insecure_port):
47+
def test_grpc_index_call(self, grpcbin_extension_server):
4748
"""Test calling grpcbin's Index service which returns server info."""
48-
channel = grpc.insecure_channel(f"{grpcbin_host}:{grpcbin_insecure_port}")
49+
gateway_port = grpcbin_extension_server["port"]
50+
channel = grpc.insecure_channel(f"localhost:{gateway_port}")
4951

5052
try:
5153
# Verify channel is ready
@@ -68,9 +70,10 @@ def test_grpc_index_call(self, grpcbin_host, grpcbin_insecure_port):
6870
finally:
6971
channel.close()
7072

71-
def test_grpc_concurrent_calls(self, grpcbin_host, grpcbin_insecure_port):
73+
def test_grpc_concurrent_calls(self, grpcbin_extension_server):
7274
"""Test making multiple concurrent gRPC calls."""
73-
channel = grpc.insecure_channel(f"{grpcbin_host}:{grpcbin_insecure_port}")
75+
gateway_port = grpcbin_extension_server["port"]
76+
channel = grpc.insecure_channel(f"localhost:{gateway_port}")
7477

7578
try:
7679
# Verify channel is ready
@@ -97,9 +100,10 @@ def test_grpc_concurrent_calls(self, grpcbin_host, grpcbin_insecure_port):
97100
finally:
98101
channel.close()
99102

100-
def test_grpc_connection_reuse(self, grpcbin_host, grpcbin_insecure_port):
103+
def test_grpc_connection_reuse(self, grpcbin_extension_server):
101104
"""Test that a single gRPC channel can handle multiple sequential calls."""
102-
channel = grpc.insecure_channel(f"{grpcbin_host}:{grpcbin_insecure_port}")
105+
gateway_port = grpcbin_extension_server["port"]
106+
channel = grpc.insecure_channel(f"localhost:{gateway_port}")
103107

104108
try:
105109
# Verify channel is ready

0 commit comments

Comments
 (0)