Skip to content

Commit e895917

Browse files
authored
Merge branch 'master' into ck-releasedrafter
2 parents 5432d6c + a10452a commit e895917

File tree

6 files changed

+254
-47
lines changed

6 files changed

+254
-47
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ tox-poetry = "^0.3.0"
4141
bandit = "^1.7.0"
4242
pylint = "^2.8.2"
4343
vulture = "^2.3"
44+
scikit-image = "==0.16.2"
4445

4546
[tool.poetry.urls]
4647
url = "https://redisai.io"

redisai/client.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ def pipeline(self, transaction: bool = True, shard_hint: bool = None) -> "Pipeli
7070
)
7171

7272
def dag(
73-
self, load: Sequence = None, persist: Sequence = None, readonly: bool = False
73+
self,
74+
load: Sequence = None,
75+
persist: Sequence = None,
76+
routing: AnyStr = None,
77+
timeout: int = None,
78+
readonly: bool = False
7479
) -> "Dag":
7580
"""
7681
It returns a DAG object on which other DAG-allowed operations can be called. For
@@ -81,7 +86,16 @@ def dag(
8186
load : Union[AnyStr, List[AnyStr]]
8287
Load the list of given values from the keyspace to DAG scope
8388
persist : Union[AnyStr, List[AnyStr]]
84-
Write the list of given key, values to the keyspace from DAG scope
89+
For each tensor key in the given list, write its values to the keyspace from
90+
DAG scope after the DAG execution is finished.
91+
routing : AnyStr
92+
Denotes a key to be used in the DAG or a tag that will assist in routing the dag
93+
execution command to the right shard. Redis will verify that all potential key
94+
accesses are done to within the target shard.
95+
timeout : int
96+
The max number on milisecinds that may pass before the request is prossced
97+
(meaning that the result will not be computed after that time and TIMEDOUT
98+
is returned in that case)
8599
readonly : bool
86100
If True, it triggers AI.DAGRUN_RO, the read only DAG which cannot write (PERSIST) to
87101
the keyspace. But since it can't write, it can execute on replicas
@@ -105,9 +119,7 @@ def dag(
105119
>>> # You can even chain the operations
106120
>>> result = dag.tensorset(**akwargs).modelrun(**bkwargs).tensorget(**ckwargs).run()
107121
"""
108-
return Dag(
109-
load, persist, self.execute_command, readonly, self.enable_postprocess
110-
)
122+
return Dag(load, persist, routing, timeout, self.execute_command, readonly)
111123

112124
def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str:
113125
"""

redisai/dag.py

Lines changed: 91 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,57 @@
55

66
from redisai import command_builder as builder
77
from redisai.postprocessor import Processor
8+
from deprecated import deprecated
9+
import warnings
810

911
processor = Processor()
1012

1113

1214
class Dag:
13-
def __init__(self, load, persist, executor, readonly=False, postprocess=True):
15+
def __init__(self, load, persist, routing, timeout, executor, readonly=False):
1416
self.result_processors = []
15-
self.enable_postprocess = postprocess
16-
if readonly:
17-
if persist:
18-
raise RuntimeError(
19-
"READONLY requests cannot write (duh!) and should not "
20-
"have PERSISTing values"
21-
)
22-
self.commands = ["AI.DAGRUN_RO"]
17+
self.enable_postprocess = True
18+
self.deprecatedDagrunMode = load is None and persist is None and routing is None
19+
self.readonly = readonly
20+
self.executor = executor
21+
22+
if readonly and persist:
23+
raise RuntimeError(
24+
"READONLY requests cannot write (duh!) and should not "
25+
"have PERSISTing values"
26+
)
27+
28+
if self.deprecatedDagrunMode:
29+
# Throw warning about using deprecated dagrun
30+
warnings.warn("Creating Dag without any of LOAD, PERSIST and ROUTING arguments"
31+
"is allowed only in deprecated AI.DAGRUN or AI.DAGRUN_RO commands", DeprecationWarning)
32+
# Use dagrun
33+
if readonly:
34+
self.commands = ["AI.DAGRUN_RO"]
35+
else:
36+
self.commands = ["AI.DAGRUN"]
2337
else:
24-
self.commands = ["AI.DAGRUN"]
25-
if load:
38+
# Use dagexecute
39+
if readonly:
40+
self.commands = ["AI.DAGEXECUTE_RO"]
41+
else:
42+
self.commands = ["AI.DAGEXECUTE"]
43+
if load is not None:
2644
if not isinstance(load, (list, tuple)):
2745
self.commands += ["LOAD", 1, load]
2846
else:
2947
self.commands += ["LOAD", len(load), *load]
30-
if persist:
48+
if persist is not None:
3149
if not isinstance(persist, (list, tuple)):
32-
self.commands += ["PERSIST", 1, persist, "|>"]
50+
self.commands += ["PERSIST", 1, persist]
3351
else:
34-
self.commands += ["PERSIST", len(persist), *persist, "|>"]
35-
else:
36-
self.commands.append("|>")
37-
self.executor = executor
52+
self.commands += ["PERSIST", len(persist), *persist]
53+
if routing is not None:
54+
self.commands += ["ROUTING", routing]
55+
if timeout is not None:
56+
self.commands += ["TIMEOUT", timeout]
57+
58+
self.commands.append("|>")
3859

3960
def tensorset(
4061
self,
@@ -69,20 +90,71 @@ def tensorget(
6990
)
7091
return self
7192

93+
@deprecated(version="1.2.0", reason="Use modelexecute instead")
7294
def modelrun(
95+
self,
96+
key: AnyStr,
97+
inputs: Union[AnyStr, List[AnyStr]],
98+
outputs: Union[AnyStr, List[AnyStr]],
99+
) -> Any:
100+
if self.deprecatedDagrunMode:
101+
args = builder.modelrun(key, inputs, outputs)
102+
self.commands.extend(args)
103+
self.commands.append("|>")
104+
self.result_processors.append(bytes.decode)
105+
return self
106+
else:
107+
return self.modelexecute(key, inputs, outputs)
108+
109+
def modelexecute(
73110
self,
74111
key: AnyStr,
75112
inputs: Union[AnyStr, List[AnyStr]],
76113
outputs: Union[AnyStr, List[AnyStr]],
77114
) -> Any:
78-
args = builder.modelrun(key, inputs, outputs)
115+
if self.deprecatedDagrunMode:
116+
raise RuntimeError(
117+
"You are using deprecated version of DAG, that does not supports MODELEXECUTE."
118+
"The new version requires giving at least one of LOAD, PERSIST and ROUTING"
119+
"arguments when constructing the Dag"
120+
)
121+
args = builder.modelexecute(key, inputs, outputs, None)
79122
self.commands.extend(args)
80123
self.commands.append("|>")
81124
self.result_processors.append(bytes.decode)
82125
return self
83126

127+
def scriptexecute(
128+
self,
129+
key: AnyStr,
130+
function: str,
131+
keys: Union[AnyStr, Sequence[AnyStr]] = None,
132+
inputs: Union[AnyStr, Sequence[AnyStr]] = None,
133+
args: Union[AnyStr, Sequence[AnyStr]] = None,
134+
outputs: Union[AnyStr, List[AnyStr]] = None,
135+
) -> Any:
136+
if self.readonly:
137+
raise RuntimeError(
138+
"AI.SCRIPTEXECUTE cannot be used in readonly mode"
139+
)
140+
if self.deprecatedDagrunMode:
141+
raise RuntimeError(
142+
"You are using deprecated version of DAG, that does not supports SCRIPTEXECUTE."
143+
"The new version requires giving at least one of LOAD, PERSIST and ROUTING"
144+
"arguments when constructing the Dag"
145+
)
146+
args = builder.scriptexecute(key, function, keys, inputs, args, outputs, None)
147+
self.commands.extend(args)
148+
self.commands.append("|>")
149+
self.result_processors.append(bytes.decode)
150+
return self
151+
152+
@deprecated(version="1.2.0", reason="Use execute instead")
84153
def run(self):
85-
commands = self.commands[:-1] # removing the last "|>
154+
return self.execute()
155+
156+
def execute(self):
157+
commands = self.commands[:-1] # removing the last "|>"
86158
results = self.executor(*commands)
87159
if self.enable_postprocess:
88160
out = []

0 commit comments

Comments
 (0)