1
1
import logging
2
+ import threading
2
3
3
4
import redis
4
5
import time
@@ -65,8 +66,19 @@ def connection(self):
65
66
del self .state .connection
66
67
67
68
68
- class RedisStreamMixin :
69
- connection : redis .Redis
69
+ class RedisStreamStore (RedisStore ):
70
+
71
+ def __init__ (self , * args , ** kwargs ):
72
+ super ().__init__ (* args , ** kwargs )
73
+ self .__shutdown = False
74
+ self .__shutdown_event = threading .Event ()
75
+
76
+ def __del__ (self ):
77
+ self .shutdown ()
78
+
79
+ def shutdown (self ):
80
+ self .__shutdown = True
81
+ self .__shutdown_event .set ()
70
82
71
83
def _create_group (self , stream , group ):
72
84
try :
@@ -90,8 +102,12 @@ def send(self, stream, message, **kwargs):
90
102
91
103
def start_consuming (self , stream , group , consumer , callback , prefetch = 1 , ** kwargs ):
92
104
"""开始消费"""
105
+ self .__shutdown = False
106
+ self .__shutdown_event .clear ()
93
107
self ._create_group (stream , group )
94
- while True :
108
+ while not self .__shutdown :
109
+ if self .__shutdown :
110
+ break
95
111
try :
96
112
messages = self .connection .xreadgroup (group , consumer , {stream : '>' }, count = prefetch , ** kwargs )
97
113
for message in messages :
@@ -107,9 +123,5 @@ def start_consuming(self, stream, group, consumer, callback, prefetch=1, **kwarg
107
123
time .sleep (1 )
108
124
109
125
110
- class RedisStreamStore (RedisStore , RedisStreamMixin ):
111
- pass
112
-
113
-
114
126
useRedis = RedisStore
115
127
useRedisStreamStore = RedisStreamStore
0 commit comments