Skip to content

Commit b1254d7

Browse files
committed
add synchronous tests
1 parent cdc4acf commit b1254d7

File tree

1 file changed

+128
-0
lines changed

1 file changed

+128
-0
lines changed

tests/test_cluster.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import binascii
22
import datetime
3+
import select
4+
import socket
5+
import socketserver
6+
import threading
37
import warnings
48
from queue import LifoQueue, Queue
59
from time import sleep
610
from unittest.mock import DEFAULT, Mock, call, patch
11+
from urllib.parse import urlparse
712

813
import pytest
914

@@ -53,6 +58,85 @@
5358
]
5459

5560

61+
class ProxyRequestHandler(socketserver.BaseRequestHandler):
62+
def recv(self, sock):
63+
"""A recv with a timeout"""
64+
r = select.select([sock], [], [], 0.01)
65+
if not r[0]:
66+
return None
67+
return sock.recv(1000)
68+
69+
def handle(self):
70+
self.server.proxy.n_connections += 1
71+
conn = socket.create_connection(self.server.proxy.redis_addr)
72+
stop = False
73+
74+
def from_server():
75+
# read from server and pass to client
76+
while not stop:
77+
data = self.recv(conn)
78+
if data is None:
79+
continue
80+
if not data:
81+
self.request.shutdown(socket.SHUT_WR)
82+
return
83+
self.request.sendall(data)
84+
85+
thread = threading.Thread(target=from_server)
86+
thread.start()
87+
try:
88+
while True:
89+
# read from client and send to server
90+
data = self.request.recv(1000)
91+
if not data:
92+
return
93+
conn.sendall(data)
94+
finally:
95+
conn.shutdown(socket.SHUT_WR)
96+
stop = True # for safety
97+
thread.join()
98+
conn.close()
99+
100+
101+
class NodeProxy:
102+
"""A class to proxy a node connection to a different port"""
103+
104+
def __init__(self, addr, redis_addr):
105+
self.addr = addr
106+
self.redis_addr = redis_addr
107+
self.server = socketserver.ThreadingTCPServer(self.addr, ProxyRequestHandler)
108+
self.server.proxy = self
109+
self.server.socket_reuse_address = True
110+
self.thread = None
111+
self.n_connections = 0
112+
113+
def start(self):
114+
# test that we can connect to redis
115+
s = socket.create_connection(self.redis_addr, timeout=2)
116+
s.close()
117+
# Start a thread with the server -- that thread will then start one
118+
# more thread for each request
119+
self.thread = threading.Thread(target=self.server.serve_forever)
120+
# Exit the server thread when the main thread terminates
121+
self.thread.daemon = True
122+
self.thread.start()
123+
124+
def close(self):
125+
self.server.shutdown()
126+
127+
128+
@pytest.fixture
129+
def redis_addr(request):
130+
redis_url = request.config.getoption("--redis-url")
131+
scheme, netloc = urlparse(redis_url)[:2]
132+
assert scheme == "redis"
133+
if ":" in netloc:
134+
host, port = netloc.split(":")
135+
return host, int(port)
136+
else:
137+
return netloc, 6379
138+
139+
56140
@pytest.fixture()
57141
def slowlog(request, r):
58142
"""
@@ -823,6 +907,50 @@ def raise_connection_error():
823907
assert "myself" not in nodes.get(curr_default_node.name).get("flags")
824908
assert r.get_default_node() != curr_default_node
825909

910+
def test_host_port_remap(self, request, redis_addr):
911+
"""Test that we can create a rediscluster object with
912+
a host-port remapper and map connections through proxy objects
913+
"""
914+
915+
# we remap the first n nodes
916+
offset = 1000
917+
n = 6
918+
ports = [redis_addr[1] + i for i in range(n)]
919+
920+
def host_port_remap(host, port):
921+
# remap first three nodes to our local proxy
922+
# old = host, port
923+
if int(port) in ports:
924+
host, port = "127.0.0.1", int(port) + offset
925+
# print(f"{old} {host, port}")
926+
return host, port
927+
928+
# create the proxies
929+
proxies = [
930+
NodeProxy(("127.0.0.1", port + offset), (redis_addr[0], port))
931+
for port in ports
932+
]
933+
for p in proxies:
934+
p.start()
935+
try:
936+
# create cluster:
937+
r = _get_client(
938+
RedisCluster, request, flushdb=False, host_port_remap=host_port_remap
939+
)
940+
try:
941+
assert r.ping() is True
942+
assert r.set("byte_string", b"giraffe")
943+
assert r.get("byte_string") == b"giraffe"
944+
finally:
945+
r.close()
946+
finally:
947+
for p in proxies:
948+
p.close()
949+
950+
# verify that the proxies were indeed used
951+
n_used = sum((1 if p.n_connections else 0) for p in proxies)
952+
assert n_used > 1
953+
826954

827955
@pytest.mark.onlycluster
828956
class TestClusterRedisCommands:

0 commit comments

Comments
 (0)