Skip to content

Commit e4faf3a

Browse files
authored
RESP3 modules support (#2803)
* start cleaning * clean sone callbacks * response callbacks * modules * tests * finish sync search tests * linters * async modules * linters * revert redismod-url change
1 parent 2a935eb commit e4faf3a

16 files changed

+3460
-1579
lines changed

redis/commands/bf/__init__.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,22 @@ def __init__(self, client, **kwargs):
9797
# CMS_INCRBY: spaceHolder,
9898
# CMS_QUERY: spaceHolder,
9999
CMS_MERGE: bool_ok,
100+
}
101+
102+
RESP2_MODULE_CALLBACKS = {
100103
CMS_INFO: CMSInfo,
101104
}
105+
RESP3_MODULE_CALLBACKS = {}
102106

103107
self.client = client
104108
self.commandmixin = CMSCommands
105109
self.execute_command = client.execute_command
106110

111+
if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
112+
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
113+
else:
114+
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)
115+
107116
for k, v in MODULE_CALLBACKS.items():
108117
self.client.set_response_callback(k, v)
109118

@@ -114,18 +123,27 @@ def __init__(self, client, **kwargs):
114123
# Set the module commands' callbacks
115124
MODULE_CALLBACKS = {
116125
TOPK_RESERVE: bool_ok,
117-
TOPK_ADD: parse_to_list,
118-
TOPK_INCRBY: parse_to_list,
119126
# TOPK_QUERY: spaceHolder,
120127
# TOPK_COUNT: spaceHolder,
128+
}
129+
130+
RESP2_MODULE_CALLBACKS = {
131+
TOPK_ADD: parse_to_list,
132+
TOPK_INCRBY: parse_to_list,
121133
TOPK_LIST: parse_to_list,
122134
TOPK_INFO: TopKInfo,
123135
}
136+
RESP3_MODULE_CALLBACKS = {}
124137

125138
self.client = client
126139
self.commandmixin = TOPKCommands
127140
self.execute_command = client.execute_command
128141

142+
if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
143+
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
144+
else:
145+
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)
146+
129147
for k, v in MODULE_CALLBACKS.items():
130148
self.client.set_response_callback(k, v)
131149

@@ -145,13 +163,22 @@ def __init__(self, client, **kwargs):
145163
# CF_COUNT: spaceHolder,
146164
# CF_SCANDUMP: spaceHolder,
147165
# CF_LOADCHUNK: spaceHolder,
166+
}
167+
168+
RESP2_MODULE_CALLBACKS = {
148169
CF_INFO: CFInfo,
149170
}
171+
RESP3_MODULE_CALLBACKS = {}
150172

151173
self.client = client
152174
self.commandmixin = CFCommands
153175
self.execute_command = client.execute_command
154176

177+
if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
178+
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
179+
else:
180+
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)
181+
155182
for k, v in MODULE_CALLBACKS.items():
156183
self.client.set_response_callback(k, v)
157184

@@ -165,22 +192,29 @@ def __init__(self, client, **kwargs):
165192
# TDIGEST_RESET: bool_ok,
166193
# TDIGEST_ADD: spaceHolder,
167194
# TDIGEST_MERGE: spaceHolder,
195+
}
196+
197+
RESP2_MODULE_CALLBACKS = {
198+
TDIGEST_BYRANK: parse_to_list,
199+
TDIGEST_BYREVRANK: parse_to_list,
168200
TDIGEST_CDF: parse_to_list,
169201
TDIGEST_QUANTILE: parse_to_list,
170202
TDIGEST_MIN: float,
171203
TDIGEST_MAX: float,
172204
TDIGEST_TRIMMED_MEAN: float,
173205
TDIGEST_INFO: TDigestInfo,
174-
TDIGEST_RANK: parse_to_list,
175-
TDIGEST_REVRANK: parse_to_list,
176-
TDIGEST_BYRANK: parse_to_list,
177-
TDIGEST_BYREVRANK: parse_to_list,
178206
}
207+
RESP3_MODULE_CALLBACKS = {}
179208

180209
self.client = client
181210
self.commandmixin = TDigestCommands
182211
self.execute_command = client.execute_command
183212

213+
if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
214+
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
215+
else:
216+
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)
217+
184218
for k, v in MODULE_CALLBACKS.items():
185219
self.client.set_response_callback(k, v)
186220

@@ -199,12 +233,21 @@ def __init__(self, client, **kwargs):
199233
# BF_SCANDUMP: spaceHolder,
200234
# BF_LOADCHUNK: spaceHolder,
201235
# BF_CARD: spaceHolder,
236+
}
237+
238+
RESP2_MODULE_CALLBACKS = {
202239
BF_INFO: BFInfo,
203240
}
241+
RESP3_MODULE_CALLBACKS = {}
204242

205243
self.client = client
206244
self.commandmixin = BFCommands
207245
self.execute_command = client.execute_command
208246

247+
if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
248+
MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
249+
else:
250+
MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)
251+
209252
for k, v in MODULE_CALLBACKS.items():
210253
self.client.set_response_callback(k, v)

redis/commands/bf/commands.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
class BFCommands:
6161
"""Bloom Filter commands."""
6262

63-
# region Bloom Filter Functions
6463
def create(self, key, errorRate, capacity, expansion=None, noScale=None):
6564
"""
6665
Create a new Bloom Filter `key` with desired probability of false positives
@@ -178,7 +177,6 @@ def card(self, key):
178177
class CFCommands:
179178
"""Cuckoo Filter commands."""
180179

181-
# region Cuckoo Filter Functions
182180
def create(
183181
self, key, capacity, expansion=None, bucket_size=None, max_iterations=None
184182
):
@@ -488,7 +486,6 @@ def byrevrank(self, key, rank, *ranks):
488486
class CMSCommands:
489487
"""Count-Min Sketch Commands"""
490488

491-
# region Count-Min Sketch Functions
492489
def initbydim(self, key, width, depth):
493490
"""
494491
Initialize a Count-Min Sketch `key` to dimensions (`width`, `depth`) specified by user.

redis/commands/bf/info.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@ def __init__(self, args):
1616
self.insertedNum = response["Number of items inserted"]
1717
self.expansionRate = response["Expansion rate"]
1818

19+
def get(self, item):
20+
try:
21+
return self.__getitem__(item)
22+
except AttributeError:
23+
return None
24+
25+
def __getitem__(self, item):
26+
return getattr(self, item)
27+
1928

2029
class CFInfo(object):
2130
size = None
@@ -38,6 +47,15 @@ def __init__(self, args):
3847
self.expansionRate = response["Expansion rate"]
3948
self.maxIteration = response["Max iterations"]
4049

50+
def get(self, item):
51+
try:
52+
return self.__getitem__(item)
53+
except AttributeError:
54+
return None
55+
56+
def __getitem__(self, item):
57+
return getattr(self, item)
58+
4159

4260
class CMSInfo(object):
4361
width = None
@@ -50,6 +68,9 @@ def __init__(self, args):
5068
self.depth = response["depth"]
5169
self.count = response["count"]
5270

71+
def __getitem__(self, item):
72+
return getattr(self, item)
73+
5374

5475
class TopKInfo(object):
5576
k = None
@@ -64,6 +85,9 @@ def __init__(self, args):
6485
self.depth = response["depth"]
6586
self.decay = response["decay"]
6687

88+
def __getitem__(self, item):
89+
return getattr(self, item)
90+
6791

6892
class TDigestInfo(object):
6993
compression = None
@@ -85,3 +109,12 @@ def __init__(self, args):
85109
self.unmerged_weight = response["Unmerged weight"]
86110
self.total_compressions = response["Total compressions"]
87111
self.memory_usage = response["Memory usage"]
112+
113+
def get(self, item):
114+
try:
115+
return self.__getitem__(item)
116+
except AttributeError:
117+
return None
118+
119+
def __getitem__(self, item):
120+
return getattr(self, item)

redis/commands/json/__init__.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,33 +32,50 @@ def __init__(
3232
"""
3333
# Set the module commands' callbacks
3434
self.MODULE_CALLBACKS = {
35-
"JSON.CLEAR": int,
36-
"JSON.DEL": int,
37-
"JSON.FORGET": int,
38-
"JSON.GET": self._decode,
35+
"JSON.ARRPOP": self._decode,
3936
"JSON.MGET": bulk_of_jsons(self._decode),
4037
"JSON.SET": lambda r: r and nativestr(r) == "OK",
41-
"JSON.NUMINCRBY": self._decode,
42-
"JSON.NUMMULTBY": self._decode,
38+
"JSON.DEBUG": self._decode,
4339
"JSON.TOGGLE": self._decode,
44-
"JSON.STRAPPEND": self._decode,
45-
"JSON.STRLEN": self._decode,
40+
"JSON.RESP": self._decode,
41+
}
42+
43+
RESP2_MODULE_CALLBACKS = {
44+
"JSON.ARRTRIM": self._decode,
45+
"JSON.OBJLEN": self._decode,
4646
"JSON.ARRAPPEND": self._decode,
4747
"JSON.ARRINDEX": self._decode,
4848
"JSON.ARRINSERT": self._decode,
49+
"JSON.TOGGLE": self._decode,
50+
"JSON.STRAPPEND": self._decode,
51+
"JSON.STRLEN": self._decode,
4952
"JSON.ARRLEN": self._decode,
50-
"JSON.ARRPOP": self._decode,
51-
"JSON.ARRTRIM": self._decode,
52-
"JSON.OBJLEN": self._decode,
53+
"JSON.CLEAR": int,
54+
"JSON.DEL": int,
55+
"JSON.FORGET": int,
56+
"JSON.NUMINCRBY": self._decode,
57+
"JSON.NUMMULTBY": self._decode,
5358
"JSON.OBJKEYS": self._decode,
54-
"JSON.RESP": self._decode,
55-
"JSON.DEBUG": self._decode,
59+
"JSON.GET": self._decode,
60+
}
61+
62+
RESP3_MODULE_CALLBACKS = {
63+
"JSON.GET": lambda response: [
64+
[self._decode(r) for r in res] for res in response
65+
]
66+
if response
67+
else response
5668
}
5769

5870
self.client = client
5971
self.execute_command = client.execute_command
6072
self.MODULE_VERSION = version
6173

74+
if self.client.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
75+
self.MODULE_CALLBACKS.update(RESP3_MODULE_CALLBACKS)
76+
else:
77+
self.MODULE_CALLBACKS.update(RESP2_MODULE_CALLBACKS)
78+
6279
for key, value in self.MODULE_CALLBACKS.items():
6380
self.client.set_response_callback(key, value)
6481

redis/commands/search/__init__.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
import redis
22

33
from ...asyncio.client import Pipeline as AsyncioPipeline
4-
from .commands import AsyncSearchCommands, SearchCommands
4+
from .commands import (
5+
AGGREGATE_CMD,
6+
CONFIG_CMD,
7+
INFO_CMD,
8+
PROFILE_CMD,
9+
SEARCH_CMD,
10+
SPELLCHECK_CMD,
11+
SYNDUMP_CMD,
12+
AsyncSearchCommands,
13+
SearchCommands,
14+
)
515

616

717
class Search(SearchCommands):
@@ -90,6 +100,15 @@ def __init__(self, client, index_name="idx"):
90100
self.index_name = index_name
91101
self.execute_command = client.execute_command
92102
self._pipeline = client.pipeline
103+
self.RESP2_MODULE_CALLBACKS = {
104+
INFO_CMD: self._parse_info,
105+
SEARCH_CMD: self._parse_search,
106+
AGGREGATE_CMD: self._parse_aggregate,
107+
PROFILE_CMD: self._parse_profile,
108+
SPELLCHECK_CMD: self._parse_spellcheck,
109+
CONFIG_CMD: self._parse_config_get,
110+
SYNDUMP_CMD: self._parse_syndump,
111+
}
93112

94113
def pipeline(self, transaction=True, shard_hint=None):
95114
"""Creates a pipeline for the SEARCH module, that can be used for executing

0 commit comments

Comments
 (0)