@@ -760,18 +760,18 @@ async def aput_writes(
760
760
exists = await self ._redis .exists (key )
761
761
if exists :
762
762
# Update existing key
763
- await self . _redis .json ().set (key , "$.channel" , write_obj ["channel" ]) # type: ignore[misc, arg-type]
764
- await self . _redis .json ().set (key , "$.type" , write_obj ["type" ]) # type: ignore[misc, arg-type]
765
- await self . _redis .json ().set (key , "$.blob" , write_obj ["blob" ]) # type: ignore[misc, arg-type]
763
+ pipeline .json ().set (key , "$.channel" , write_obj ["channel" ]) # type: ignore[misc, arg-type]
764
+ pipeline .json ().set (key , "$.type" , write_obj ["type" ]) # type: ignore[misc, arg-type]
765
+ pipeline .json ().set (key , "$.blob" , write_obj ["blob" ]) # type: ignore[misc, arg-type]
766
766
else :
767
767
# Create new key
768
- await self . _redis .json ().set (key , "$" , write_obj ) # type: ignore[misc]
768
+ pipeline .json ().set (key , "$" , write_obj ) # type: ignore[misc]
769
769
created_keys .append (key )
770
770
else :
771
771
# For non-upsert case, only set if key doesn't exist
772
772
exists = await self ._redis .exists (key )
773
773
if not exists :
774
- await self . _redis .json ().set (key , "$" , write_obj ) # type: ignore[misc]
774
+ pipeline .json ().set (key , "$" , write_obj ) # type: ignore[misc]
775
775
created_keys .append (key )
776
776
777
777
# Execute all operations atomically
0 commit comments