24
24
no_type_check ,
25
25
)
26
26
27
- import aioredis
28
- from aioredis .client import Pipeline
27
+ from more_itertools import ichunked
29
28
from pydantic import BaseModel , validator
30
29
from pydantic .fields import FieldInfo as PydanticFieldInfo
31
30
from pydantic .fields import ModelField , Undefined , UndefinedType
35
34
from typing_extensions import Protocol , get_args , get_origin
36
35
from ulid import ULID
37
36
37
+ from .. import redis
38
38
from ..checks import has_redis_json , has_redisearch
39
39
from ..connections import get_redis_connection
40
- from ..unasync_util import ASYNC_MODE
40
+ from ..util import ASYNC_MODE
41
41
from .encoders import jsonable_encoder
42
42
from .render_tree import render_tree
43
43
from .token_escaper import TokenEscaper
@@ -760,6 +760,9 @@ async def all(self, batch_size=DEFAULT_PAGE_SIZE):
760
760
return await query .execute ()
761
761
return await self .execute ()
762
762
763
+ async def page (self , offset = 0 , limit = 10 ):
764
+ return await self .copy (offset = offset , limit = limit ).execute ()
765
+
763
766
def sort_by (self , * fields : str ):
764
767
if not fields :
765
768
return self
@@ -975,7 +978,7 @@ class BaseMeta(Protocol):
975
978
global_key_prefix : str
976
979
model_key_prefix : str
977
980
primary_key_pattern : str
978
- database : aioredis .Redis
981
+ database : redis .Redis
979
982
primary_key : PrimaryKey
980
983
primary_key_creator_cls : Type [PrimaryKeyCreator ]
981
984
index_name : str
@@ -994,7 +997,7 @@ class DefaultMeta:
994
997
global_key_prefix : Optional [str ] = None
995
998
model_key_prefix : Optional [str ] = None
996
999
primary_key_pattern : Optional [str ] = None
997
- database : Optional [aioredis .Redis ] = None
1000
+ database : Optional [redis .Redis ] = None
998
1001
primary_key : Optional [PrimaryKey ] = None
999
1002
primary_key_creator_cls : Optional [Type [PrimaryKeyCreator ]] = None
1000
1003
index_name : Optional [str ] = None
@@ -1102,6 +1105,7 @@ class Config:
1102
1105
extra = "allow"
1103
1106
1104
1107
def __init__ (__pydantic_self__ , ** data : Any ) -> None :
1108
+ data = {key : val for key , val in data .items () if val }
1105
1109
super ().__init__ (** data )
1106
1110
__pydantic_self__ .validate_primary_key ()
1107
1111
@@ -1115,9 +1119,17 @@ def key(self):
1115
1119
return self .make_primary_key (pk )
1116
1120
1117
1121
@classmethod
1118
- async def delete (cls , pk : Any ) -> int :
1122
+ async def _delete (cls , db , * pks ):
1123
+ return await db .delete (* pks )
1124
+
1125
+ @classmethod
1126
+ async def delete (
1127
+ cls , pk : Any , pipeline : Optional [redis .client .Pipeline ] = None
1128
+ ) -> int :
1119
1129
"""Delete data at this key."""
1120
- return await cls .db ().delete (cls .make_primary_key (pk ))
1130
+ db = cls ._get_db (pipeline )
1131
+
1132
+ return await cls ._delete (db , cls .make_primary_key (pk ))
1121
1133
1122
1134
@classmethod
1123
1135
async def get (cls , pk : Any ) -> "RedisModel" :
@@ -1127,14 +1139,15 @@ async def update(self, **field_values):
1127
1139
"""Update this model instance with the specified key-value pairs."""
1128
1140
raise NotImplementedError
1129
1141
1130
- async def save (self , pipeline : Optional [Pipeline ] = None ) -> "RedisModel" :
1142
+ async def save (
1143
+ self , pipeline : Optional [redis .client .Pipeline ] = None
1144
+ ) -> "RedisModel" :
1131
1145
raise NotImplementedError
1132
1146
1133
- async def expire (self , num_seconds : int , pipeline : Optional [Pipeline ] = None ):
1134
- if pipeline is None :
1135
- db = self .db ()
1136
- else :
1137
- db = pipeline
1147
+ async def expire (
1148
+ self , num_seconds : int , pipeline : Optional [redis .client .Pipeline ] = None
1149
+ ):
1150
+ db = self ._get_db (pipeline )
1138
1151
1139
1152
# TODO: Wrap any Redis response errors in a custom exception?
1140
1153
await db .expire (self .make_primary_key (self .pk ), num_seconds )
@@ -1223,19 +1236,10 @@ def get_annotations(cls):
1223
1236
async def add (
1224
1237
cls ,
1225
1238
models : Sequence ["RedisModel" ],
1226
- pipeline : Optional [Pipeline ] = None ,
1239
+ pipeline : Optional [redis . client . Pipeline ] = None ,
1227
1240
pipeline_verifier : Callable [..., Any ] = verify_pipeline_response ,
1228
1241
) -> Sequence ["RedisModel" ]:
1229
- if pipeline is None :
1230
- # By default, send commands in a pipeline. Saving each model will
1231
- # be atomic, but Redis may process other commands in between
1232
- # these saves.
1233
- db = cls .db ().pipeline (transaction = False )
1234
- else :
1235
- # If the user gave us a pipeline, add our commands to that. The user
1236
- # will be responsible for executing the pipeline after they've accumulated
1237
- # the commands they want to send.
1238
- db = pipeline
1242
+ db = cls ._get_db (pipeline , bulk = True )
1239
1243
1240
1244
for model in models :
1241
1245
# save() just returns the model, we don't need that here.
@@ -1249,6 +1253,31 @@ async def add(
1249
1253
1250
1254
return models
1251
1255
1256
+ @classmethod
1257
+ def _get_db (
1258
+ self , pipeline : Optional [redis .client .Pipeline ] = None , bulk : bool = False
1259
+ ):
1260
+ if pipeline is not None :
1261
+ return pipeline
1262
+ elif bulk :
1263
+ return self .db ().pipeline (transaction = False )
1264
+ else :
1265
+ return self .db ()
1266
+
1267
+ @classmethod
1268
+ async def delete_many (
1269
+ cls ,
1270
+ models : Sequence ["RedisModel" ],
1271
+ pipeline : Optional [redis .client .Pipeline ] = None ,
1272
+ ) -> int :
1273
+ db = cls ._get_db (pipeline )
1274
+
1275
+ for chunk in ichunked (models , 100 ):
1276
+ pks = [cls .make_primary_key (model .pk ) for model in chunk ]
1277
+ await cls ._delete (db , * pks )
1278
+
1279
+ return len (models )
1280
+
1252
1281
@classmethod
1253
1282
def redisearch_schema (cls ):
1254
1283
raise NotImplementedError
@@ -1283,17 +1312,13 @@ def __init_subclass__(cls, **kwargs):
1283
1312
f"HashModels cannot index dataclass fields. Field: { name } "
1284
1313
)
1285
1314
1286
- def dict (self ) -> Dict [str , Any ]:
1287
- # restore none values
1288
- return dict (self )
1289
-
1290
- async def save (self , pipeline : Optional [Pipeline ] = None ) -> "HashModel" :
1315
+ async def save (self , pipeline : Optional [redis .client .Pipeline ] = None ) -> "HashModel" :
1291
1316
self .check ()
1292
1317
if pipeline is None :
1293
1318
db = self .db ()
1294
1319
else :
1295
1320
db = pipeline
1296
- document = jsonable_encoder ({ key : val if val else "0" for key , val in self .dict (). items ()} )
1321
+ document = jsonable_encoder (self .dict ())
1297
1322
# TODO: Wrap any Redis response errors in a custom exception?
1298
1323
await db .hset (self .key (), mapping = document )
1299
1324
return self
@@ -1461,12 +1486,12 @@ def __init__(self, *args, **kwargs):
1461
1486
)
1462
1487
super ().__init__ (* args , ** kwargs )
1463
1488
1464
- async def save (self , pipeline : Optional [Pipeline ] = None ) -> "JsonModel" :
1489
+ async def save (
1490
+ self , pipeline : Optional [redis .client .Pipeline ] = None
1491
+ ) -> "JsonModel" :
1465
1492
self .check ()
1466
- if pipeline is None :
1467
- db = self .db ()
1468
- else :
1469
- db = pipeline
1493
+ db = self ._get_db (pipeline )
1494
+
1470
1495
# TODO: Wrap response errors in a custom exception?
1471
1496
await db .execute_command ("JSON.SET" , self .key (), "." , self .json ())
1472
1497
return self
0 commit comments