22
22
with_statement
23
23
24
24
import os
25
+ import time
25
26
import socket
26
27
import select
27
28
import errno
51
52
POLL_NVAL : 'POLL_NVAL' ,
52
53
}
53
54
54
-
55
- class EpollLoop (object ):
56
-
57
- def __init__ (self ):
58
- self ._epoll = select .epoll ()
59
-
60
- def poll (self , timeout ):
61
- return self ._epoll .poll (timeout )
62
-
63
- def add_fd (self , fd , mode ):
64
- self ._epoll .register (fd , mode )
65
-
66
- def remove_fd (self , fd ):
67
- self ._epoll .unregister (fd )
68
-
69
- def modify_fd (self , fd , mode ):
70
- self ._epoll .modify (fd , mode )
55
+ # we check timeouts every TIMEOUT_PRECISION seconds
56
+ TIMEOUT_PRECISION = 10
71
57
72
58
73
59
class KqueueLoop (object ):
@@ -100,17 +86,17 @@ def poll(self, timeout):
100
86
results [fd ] |= POLL_OUT
101
87
return results .items ()
102
88
103
- def add_fd (self , fd , mode ):
89
+ def register (self , fd , mode ):
104
90
self ._fds [fd ] = mode
105
91
self ._control (fd , mode , select .KQ_EV_ADD )
106
92
107
- def remove_fd (self , fd ):
93
+ def unregister (self , fd ):
108
94
self ._control (fd , self ._fds [fd ], select .KQ_EV_DELETE )
109
95
del self ._fds [fd ]
110
96
111
- def modify_fd (self , fd , mode ):
112
- self .remove_fd (fd )
113
- self .add_fd (fd , mode )
97
+ def modify (self , fd , mode ):
98
+ self .unregister (fd )
99
+ self .register (fd , mode )
114
100
115
101
116
102
class SelectLoop (object ):
@@ -129,32 +115,31 @@ def poll(self, timeout):
129
115
results [fd ] |= p [1 ]
130
116
return results .items ()
131
117
132
- def add_fd (self , fd , mode ):
118
+ def register (self , fd , mode ):
133
119
if mode & POLL_IN :
134
120
self ._r_list .add (fd )
135
121
if mode & POLL_OUT :
136
122
self ._w_list .add (fd )
137
123
if mode & POLL_ERR :
138
124
self ._x_list .add (fd )
139
125
140
- def remove_fd (self , fd ):
126
+ def unregister (self , fd ):
141
127
if fd in self ._r_list :
142
128
self ._r_list .remove (fd )
143
129
if fd in self ._w_list :
144
130
self ._w_list .remove (fd )
145
131
if fd in self ._x_list :
146
132
self ._x_list .remove (fd )
147
133
148
- def modify_fd (self , fd , mode ):
149
- self .remove_fd (fd )
150
- self .add_fd (fd , mode )
134
+ def modify (self , fd , mode ):
135
+ self .unregister (fd )
136
+ self .register (fd , mode )
151
137
152
138
153
139
class EventLoop (object ):
154
140
def __init__ (self ):
155
- self ._iterating = False
156
141
if hasattr (select , 'epoll' ):
157
- self ._impl = EpollLoop ()
142
+ self ._impl = select . epoll ()
158
143
model = 'epoll'
159
144
elif hasattr (select , 'kqueue' ):
160
145
self ._impl = KqueueLoop ()
@@ -165,72 +150,71 @@ def __init__(self):
165
150
else :
166
151
raise Exception ('can not find any available functions in select '
167
152
'package' )
168
- self ._fd_to_f = {}
169
- self ._handlers = []
170
- self ._ref_handlers = []
171
- self ._handlers_to_remove = []
153
+ self ._fdmap = {} # (f, handler)
154
+ self ._last_time = time . time ()
155
+ self ._periodic_callbacks = []
156
+ self ._stopping = False
172
157
logging .debug ('using event model: %s' , model )
173
158
174
159
def poll (self , timeout = None ):
175
160
events = self ._impl .poll (timeout )
176
- return [(self ._fd_to_f [fd ], fd , event ) for fd , event in events ]
161
+ return [(self ._fdmap [fd ][ 0 ], fd , event ) for fd , event in events ]
177
162
178
- def add (self , f , mode ):
163
+ def add (self , f , mode , handler ):
179
164
fd = f .fileno ()
180
- self ._fd_to_f [fd ] = f
181
- self ._impl .add_fd (fd , mode )
165
+ self ._fdmap [fd ] = ( f , handler )
166
+ self ._impl .register (fd , mode )
182
167
183
168
def remove (self , f ):
184
169
fd = f .fileno ()
185
- del self ._fd_to_f [fd ]
186
- self ._impl .remove_fd (fd )
170
+ del self ._fdmap [fd ]
171
+ self ._impl .unregister (fd )
172
+
173
+ def add_periodic (self , callback ):
174
+ self ._periodic_callbacks .append (callback )
175
+
176
+ def remove_periodic (self , callback ):
177
+ self ._periodic_callbacks .remove (callback )
187
178
188
179
def modify (self , f , mode ):
189
180
fd = f .fileno ()
190
- self ._impl .modify_fd (fd , mode )
191
-
192
- def add_handler (self , handler , ref = True ):
193
- self ._handlers .append (handler )
194
- if ref :
195
- # when all ref handlers are removed, loop stops
196
- self ._ref_handlers .append (handler )
197
-
198
- def remove_handler (self , handler ):
199
- if handler in self ._ref_handlers :
200
- self ._ref_handlers .remove (handler )
201
- if self ._iterating :
202
- self ._handlers_to_remove .append (handler )
203
- else :
204
- self ._handlers .remove (handler )
181
+ self ._impl .modify (fd , mode )
182
+
183
+ def stop (self ):
184
+ self ._stopping = True
205
185
206
186
def run (self ):
207
187
events = []
208
- while self ._ref_handlers :
188
+ while not self ._stopping :
189
+ asap = False
209
190
try :
210
- events = self .poll (1 )
191
+ events = self .poll (TIMEOUT_PRECISION )
211
192
except (OSError , IOError ) as e :
212
193
if errno_from_exception (e ) in (errno .EPIPE , errno .EINTR ):
213
194
# EPIPE: Happens when the client closes the connection
214
195
# EINTR: Happens when received a signal
215
196
# handles them as soon as possible
197
+ asap = True
216
198
logging .debug ('poll:%s' , e )
217
199
else :
218
200
logging .error ('poll:%s' , e )
219
201
import traceback
220
202
traceback .print_exc ()
221
203
continue
222
- self ._iterating = True
223
- for handler in self ._handlers :
224
- # TODO when there are a lot of handlers
225
- try :
226
- handler (events )
227
- except (OSError , IOError ) as e :
228
- shell .print_exception (e )
229
- if self ._handlers_to_remove :
230
- for handler in self ._handlers_to_remove :
231
- self ._handlers .remove (handler )
232
- self ._handlers_to_remove = []
233
- self ._iterating = False
204
+
205
+ for sock , fd , event in events :
206
+ handler = self ._fdmap .get (fd , None )
207
+ if handler is not None :
208
+ handler = handler [1 ]
209
+ try :
210
+ handler .handle_event (sock , fd , event )
211
+ except (OSError , IOError ) as e :
212
+ shell .print_exception (e )
213
+ now = time .time ()
214
+ if asap or now - self ._last_time >= TIMEOUT_PRECISION :
215
+ for callback in self ._periodic_callbacks :
216
+ callback ()
217
+ self ._last_time = now
234
218
235
219
236
220
# from tornado
0 commit comments