Skip to content

Commit 2c59ded

Browse files
committed
✨ feat: RabbitListener add stop_listener
1 parent 1550164 commit 2c59ded

File tree

2 files changed

+65
-24
lines changed

2 files changed

+65
-24
lines changed

example/example_consume_decorator.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
2+
import time
23

3-
from use_rabbitmq import useRabbitMQ
4+
from use_rabbitmq import useRabbitMQ, useRabbitListener
45

56
logging.basicConfig(level=logging.INFO)
67

@@ -12,9 +13,15 @@
1213
)
1314

1415

15-
# @useRabbitListener(mq, queue_name="test_queue")
16-
# or
17-
@mq.listener(queue_name="test_queue")
16+
def stop_listener_when_timeout(client):
17+
if time.time() - client.last_message_time > 10:
18+
print("停止监听")
19+
return True
20+
21+
22+
@useRabbitListener(
23+
mq, queue_name="test_queue", stop_listener=stop_listener_when_timeout
24+
)
1825
def do_something(message):
1926
print(message.body)
2027
message.ack()

src/use_rabbitmq/__init__.py

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,18 @@ class RabbitMQStore:
2121

2222
MAX_SEND_ATTEMPTS: int = 6 # 最大发送重试次数
2323
MAX_CONNECTION_ATTEMPTS: float = float("inf") # 最大连接重试次数
24-
MAX_CONNECTION_DELAY: int = 2 ** 5 # 最大延迟时间
24+
MAX_CONNECTION_DELAY: int = 2**5 # 最大延迟时间
2525
RECONNECTION_DELAY: int = 1
2626

2727
def __init__(
28-
self,
29-
*,
30-
confirm_delivery: bool = True,
31-
host: Optional[str] = None,
32-
port: Optional[int] = None,
33-
username: Optional[str] = None,
34-
password: Optional[str] = None,
35-
**kwargs,
28+
self,
29+
*,
30+
confirm_delivery: bool = True,
31+
host: Optional[str] = None,
32+
port: Optional[int] = None,
33+
username: Optional[str] = None,
34+
password: Optional[str] = None,
35+
**kwargs,
3636
):
3737
"""
3838
:param confirm_delivery: 是否开启消息确认
@@ -99,7 +99,7 @@ def connection(self) -> None:
9999
@property
100100
def channel(self) -> amqpstorm.Channel:
101101
if all([self._connection, self._channel]) and all(
102-
[self._connection.is_open, self._channel.is_open]
102+
[self._connection.is_open, self._channel.is_open]
103103
):
104104
return self._channel
105105
self._channel = self.connection.channel()
@@ -131,11 +131,11 @@ def declare_queue(self, queue_name: str, durable: bool = True, **kwargs):
131131
return self.channel.queue.declare(queue_name, durable=durable, **kwargs)
132132

133133
def send(
134-
self,
135-
queue_name: str,
136-
message: Union[str, bytes],
137-
priority: Optional[dict] = None,
138-
**kwargs,
134+
self,
135+
queue_name: str,
136+
message: Union[str, bytes],
137+
priority: Optional[dict] = None,
138+
**kwargs,
139139
):
140140
"""发送消息"""
141141
attempts = 1
@@ -163,7 +163,7 @@ def get_message_counts(self, queue_name: str) -> int:
163163
return queue_response.get("message_count", 0)
164164

165165
def start_consuming(
166-
self, queue_name: str, callback: Callable, prefetch=1, **kwargs
166+
self, queue_name: str, callback: Callable, prefetch=1, **kwargs
167167
):
168168
"""开始消费"""
169169
self.__shutdown = False
@@ -226,18 +226,52 @@ def stop_listener(self, queue_name: str):
226226

227227

228228
class RabbitListener:
229-
def __init__(self, instance: RabbitMQStore, *, queue_name: str, no_ack: bool = False, **kwargs):
229+
def __init__(
230+
self,
231+
instance: RabbitMQStore,
232+
*,
233+
queue_name: str,
234+
no_ack: bool = False,
235+
stop_listener: Callable = None,
236+
**kwargs,
237+
):
230238
self.instance = instance
231239
self.queue_name = queue_name
232240
self.no_ack = no_ack
233241
self.kwargs = kwargs
242+
self.stop_listener = stop_listener
243+
self.last_message_time = time.time()
244+
self.should_stop = False
234245

235246
def __call__(self, callback: Callable[[amqpstorm.Message], None]):
236-
listener = self.instance.listener(self.queue_name, self.no_ack, **self.kwargs)
237-
return listener(callback)
238247

248+
def wrapped_callback(message):
249+
self.last_message_time = time.time()
250+
callback(message)
251+
252+
def monitor_thread():
253+
while not self.should_stop:
254+
time.sleep(1)
255+
if self.stop_listener and self.stop_listener(self):
256+
logger.info("停止监听器")
257+
self.should_stop = True
258+
self.instance.shutdown()
259+
break
239260

240-
# alias
261+
def consume_thread():
262+
listener = self.instance.listener(
263+
self.queue_name, self.no_ack, **self.kwargs
264+
)
265+
listener(wrapped_callback)
241266

267+
consume_thread = threading.Thread(target=consume_thread)
268+
monitor_thread = threading.Thread(target=monitor_thread)
269+
270+
consume_thread.start()
271+
monitor_thread.start()
272+
return consume_thread, monitor_thread
273+
274+
275+
# alias
242276
useRabbitMQ = RabbitMQStore
243277
useRabbitListener = RabbitListener

0 commit comments

Comments
 (0)