11
11
12
12
13
13
class NatsBroker (AsyncBroker ):
14
+ """
15
+ NATS broker for taskiq.
16
+
17
+ By default this broker works
18
+ broadcasting message to all connected workers.
19
+
20
+ If you want to make it work as queue,
21
+ you need to supply name of the queue in
22
+ queue argument.
23
+
24
+ Docs about queue:
25
+ https://docs.nats.io/nats-concepts/core-nats/queue
26
+ """
27
+
14
28
def __init__ ( # noqa: WPS211 (too many args)
15
29
self ,
16
30
servers : Union [str , list [str ]],
@@ -28,23 +42,40 @@ def __init__( # noqa: WPS211 (too many args)
28
42
self .subject = subject
29
43
30
44
async def startup (self ) -> None :
45
+ """
46
+ Startup event handler.
47
+
48
+ It simply connects to NATS cluster.
49
+ """
31
50
await super ().startup ()
32
51
await self .client .connect (self .servers , ** self .connection_kwargs )
33
52
34
53
async def kick (self , message : BrokerMessage ) -> None :
54
+ """
55
+ Send a message using NATS.
56
+
57
+ :param message: message to send.
58
+ """
35
59
await self .client .publish (
36
60
self .subject ,
37
61
payload = message .json ().encode (),
38
62
headers = message .labels ,
39
63
)
40
64
41
65
async def listen (self ) -> AsyncGenerator [BrokerMessage , None ]:
66
+ """
67
+ Start listen to new messages.
68
+
69
+ :yield: incoming messages.
70
+ """
42
71
subscribe = await self .client .subscribe (self .subject , queue = self .queue or "" )
43
72
async for message in subscribe .messages :
44
73
try :
45
74
yield BrokerMessage .parse_raw (message .data )
46
75
except ValueError :
47
- logger .warning (f"Cannot parse message: { message .data .decode ('utf-8' )} " )
76
+ data = message .data .decode ("utf-8" )
77
+ logger .warning (f"Cannot parse message: { data } " )
48
78
49
79
async def shutdown (self ) -> None :
80
+ """Close connections to NATS."""
50
81
await self .client .close ()
0 commit comments