1
1
from __future__ import unicode_literals
2
+
2
3
import mmap
3
4
import msgpack
4
- import os
5
- import pkg_resources
6
- import posix_ipc
7
5
import random
8
6
import six
9
7
import string
12
10
import time
13
11
from asgiref .base_layer import BaseChannelLayer
14
12
13
+ import pkg_resources
14
+ import posix_ipc
15
15
16
16
__version__ = pkg_resources .require ('asgi_ipc' )[0 ].version
17
17
MB = 1024 * 1024
@@ -31,7 +31,9 @@ class IPCChannelLayer(BaseChannelLayer):
31
31
little... heavier than that.
32
32
"""
33
33
34
- def __init__ (self , prefix = "asgi" , expiry = 60 , group_expiry = 86400 , capacity = 10 , channel_capacity = None , channel_memory = 100 * MB , group_memory = 20 * MB ):
34
+ def __init__ (self , prefix = "asgi" , expiry = 60 , group_expiry = 86400 ,
35
+ capacity = 10 , channel_capacity = None ,
36
+ channel_memory = 100 * MB , group_memory = 20 * MB ):
35
37
super (IPCChannelLayer , self ).__init__ (
36
38
expiry = expiry ,
37
39
group_expiry = group_expiry ,
@@ -40,16 +42,24 @@ def __init__(self, prefix="asgi", expiry=60, group_expiry=86400, capacity=10, ch
40
42
)
41
43
self .thread_lock = threading .Lock ()
42
44
self .prefix = prefix
43
- self .channel_store = MemoryDict ("/%s-chan" % self .prefix , size = channel_memory )
45
+ self .channel_store = MemoryDict (
46
+ "/%s-chan" % self .prefix ,
47
+ size = channel_memory
48
+ )
44
49
# Set containing all groups to flush
45
- self .group_store = MemoryDict ("/%s-group" % self .prefix , size = group_memory )
50
+ self .group_store = MemoryDict (
51
+ "/%s-group" % self .prefix ,
52
+ size = group_memory
53
+ )
46
54
47
- ### ASGI API ###
55
+ # --------
56
+ # ASGI API
57
+ # --------
48
58
49
59
extensions = ["flush" , "groups" ]
50
60
51
61
def send (self , channel , message ):
52
- # Typecheck
62
+ # Type check
53
63
assert isinstance (message , dict ), "message is not a dict"
54
64
assert self .valid_channel_name (channel ), "channel name not valid"
55
65
# Write message into the correct message queue
@@ -61,17 +71,20 @@ def send(self, channel, message):
61
71
channel_list .append ([message , time .time () + self .expiry ])
62
72
self .channel_store [channel ] = channel_list
63
73
64
- def receive (self , channels , block = False ):
74
+ def receive (self , channels ):
65
75
if not channels :
66
76
return None , None
67
77
channels = list (channels )
68
- assert all (self .valid_channel_name (channel ) for channel in channels ), "one or more channel names invalid"
78
+ assert all (
79
+ self .valid_channel_name (channel ) for channel in channels
80
+ ), "one or more channel names invalid"
69
81
random .shuffle (channels )
70
82
# Try to pop off all of the named channels
71
83
with self .thread_lock :
72
84
for channel in channels :
73
85
channel_list = self .channel_store .get (channel , [])
74
- # Keep looping on the channel until we hit no messages or an unexpired one
86
+ # Keep looping on the channel until
87
+ # we hit no messages or an unexpired one
75
88
while True :
76
89
try :
77
90
# Popleft equivalent
@@ -83,16 +96,16 @@ def receive(self, channels, block=False):
83
96
return channel , message
84
97
except IndexError :
85
98
break
86
- # If the channel is now empty, delete its key
87
- if not channel_list and channel in self .channel_store :
88
- del self .channel_store [channel ]
99
+ # If the channel is now empty, delete its key
100
+ if not channel_list and channel in self .channel_store :
101
+ del self .channel_store [channel ]
89
102
return None , None
90
103
91
104
def new_channel (self , pattern ):
92
105
assert isinstance (pattern , six .text_type )
93
106
# Keep making channel names till one isn't present.
94
107
while True :
95
- random_string = "" .join (random .choice (string .ascii_letters ) for i in range ( 12 ))
108
+ random_string = "" .join (random .sample (string .ascii_letters , 12 ))
96
109
assert pattern .endswith ("!" ) or pattern .endswith ("?" )
97
110
new_name = pattern + random_string
98
111
# To see if it's present we open the queue without O_CREAT
@@ -102,7 +115,9 @@ def new_channel(self, pattern):
102
115
else :
103
116
continue
104
117
105
- ### Groups extension ###
118
+ # ----------------
119
+ # Groups extension
120
+ # ----------------
106
121
107
122
def group_add (self , group , channel ):
108
123
"""
@@ -154,7 +169,9 @@ def send_group(self, group, message):
154
169
except self .ChannelFull :
155
170
pass
156
171
157
- ### Flush extension ###
172
+ # ---------------
173
+ # Flush extension
174
+ # ---------------
158
175
159
176
def flush (self ):
160
177
"""
@@ -165,7 +182,7 @@ def flush(self):
165
182
self .group_store .flush ()
166
183
167
184
def __str__ (self ):
168
- return "%s(hosts =%s)" % (self .__class__ .__name__ , self .hosts )
185
+ return "%s(prefix =%s)" % (self .__class__ .__name__ , self .prefix )
169
186
170
187
171
188
class MemoryDatastructure (object ):
@@ -209,7 +226,8 @@ def __init__(self, path, size=None):
209
226
)
210
227
except ValueError as e :
211
228
raise ValueError (
212
- "Unable to allocate shared memory segment (potentially out of memory).\n " +
229
+ "Unable to allocate shared memory segment"
230
+ "(potentially out of memory).\n "
213
231
"Error was: %s" % e
214
232
)
215
233
self .mmap = mmap .mmap (self .shm .fd , self .size )
0 commit comments