@@ -755,7 +755,6 @@ class ClusterNode:
755755 """
756756
757757 __slots__ = (
758- "_command_stack" ,
759758 "_connections" ,
760759 "_free" ,
761760 "connection_class" ,
@@ -796,7 +795,6 @@ def __init__(
796795
797796 self ._connections : List [Connection ] = []
798797 self ._free : Deque [Connection ] = collections .deque (maxlen = self .max_connections )
799- self ._command_stack : List ["PipelineCommand" ] = []
800798
801799 def __repr__ (self ) -> str :
802800 return (
@@ -887,18 +885,18 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
887885 # Release connection
888886 self ._free .append (connection )
889887
890- async def execute_pipeline (self ) -> bool :
888+ async def execute_pipeline (self , commands : List [ "PipelineCommand" ] ) -> bool :
891889 # Acquire connection
892890 connection = self .acquire_connection ()
893891
894892 # Execute command
895893 await connection .send_packed_command (
896- connection .pack_commands (cmd .args for cmd in self . _command_stack ), False
894+ connection .pack_commands (cmd .args for cmd in commands ), False
897895 )
898896
899897 # Read responses
900898 ret = False
901- for cmd in self . _command_stack :
899+ for cmd in commands :
902900 try :
903901 cmd .result = await self .parse_response (
904902 connection , cmd .args [0 ], ** cmd .kwargs
@@ -1365,12 +1363,14 @@ async def _execute(
13651363
13661364 node = target_nodes [0 ]
13671365 if node .name not in nodes :
1368- nodes [node .name ] = node
1369- node ._command_stack = []
1370- node ._command_stack .append (cmd )
1366+ nodes [node .name ] = (node , [])
1367+ nodes [node .name ][1 ].append (cmd )
13711368
13721369 errors = await asyncio .gather (
1373- * (asyncio .ensure_future (node .execute_pipeline ()) for node in nodes .values ())
1370+ * (
1371+ asyncio .ensure_future (node [0 ].execute_pipeline (node [1 ]))
1372+ for node in nodes .values ()
1373+ )
13741374 )
13751375
13761376 if any (errors ):
0 commit comments