Skip to content

Commit 5a55c50

Browse files
committed
Various fixes and supporting logic:
- When a socket was closed, the blocked senders would not be notified by having the appropriate exception raised. Now they should be. - When a error happens in the wrapped write-related calls, the local exception catching related logic errored and prevented proper handling of the error. This was because of a reference to the non-existent 'socket.error'. Also the caught error should have been raised into the blocked channel. - Added a method to create a normal socket (make_blocking_socket). This is often useful if you want to do socket operations before your application has started up and is pumping asyncore. - Added a method to convert all existing blocking sockets over to non-blocking stackless compatible ones (ready_to_schedule). However, it remains to be seen if this is even possible. Socket state may be such the convertion logic cannot hope to convert it. Still passes the Python 2.7 unit tests as before. This work was done on CCP time, and is being contributed back because CCP are good like that. Thanks CCP!
1 parent ad29044 commit 5a55c50

File tree

1 file changed

+98
-10
lines changed

1 file changed

+98
-10
lines changed

examples/networking/stacklesssocket.py

Lines changed: 98 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88
#
99
# Remaining work:
1010
#
11+
# = Test suite that verifies that emulated behaviour is correct.
12+
# = When closing the socket, pending senders are sent ECONNRESET.
13+
# This was obtained by opening a server socket, connecting a
14+
# client and then closing the server. Then the client did a
15+
# send and got ECONNRESET.
1116
# = Asyncore does not add that much to this module. In fact, its
1217
# limitations and differences between implementations in different Python
1318
# versions just complicate things.
@@ -33,7 +38,7 @@
3338
#
3439

3540
import stackless
36-
import asyncore, weakref, time, select, types
41+
import asyncore, weakref, time, select, types, sys, gc
3742
from collections import deque
3843

3944
# If you pump the scheduler and wish to prevent the scheduler from staying
@@ -57,7 +62,8 @@
5762
import socket as stdsocket # We need the "socket" name for the function we export.
5863
try:
5964
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
60-
ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED
65+
ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, \
66+
ECONNREFUSED
6167
except Exception:
6268
# Fallback on hard-coded PS3 constants.
6369
EALREADY = 37
@@ -70,6 +76,7 @@
7076
EISCONN = 56
7177
EBADF = 9
7278
ECONNABORTED = 53
79+
ECONNREFUSED = 61
7380

7481
# If we are to masquerade as the socket module, we need to provide the constants.
7582
if "__all__" in stdsocket.__dict__:
@@ -170,6 +177,17 @@ def accept(self):
170177

171178
accept.__doc__ = _socketobject_old.accept.__doc__
172179

180+
def make_blocking_socket(family=AF_INET, type=SOCK_STREAM, proto=0):
181+
"""
182+
Sometimes you may want to create a normal Python socket, even when
183+
monkey-patching is in effect. One use case might be when you are trying to
184+
do socket operations on the last runnable tasklet, if these socket
185+
operations are on small writes on a non-connected UDP socket then you
186+
might as well just use a blocking socket, as the effect of blocking
187+
is negligible.
188+
"""
189+
_sock = _realsocket_old(family, type, proto)
190+
return _socketobject_old(_sock=_sock)
173191

174192

175193
def install(pi=None):
@@ -185,6 +203,65 @@ def uninstall():
185203
stdsocket._realsocket = _realsocket_old
186204
stdsocket.socket = stdsocket.SocketType = stdsocket._socketobject = _socketobject_old
187205

206+
READY_TO_SCHEDULE_TAG = "_SET_ASIDE"
207+
208+
def ready_to_schedule(flag):
209+
"""
210+
There may be cases where it is desirable to have socket operations happen before
211+
an application starts up its framework, which would then poll asyncore. This
212+
function is intended to allow all sockets to be switched between working
213+
"stacklessly" or working directly on their underlying socket objects in a
214+
blocking manner.
215+
216+
Note that sockets created while this is in effect lack attribute values that
217+
asyncore or this module may have set if the sockets were created in a full
218+
monkey patched manner.
219+
"""
220+
221+
def reroute_wrapper(funcName):
222+
def reroute_call(self, *args, **kwargs):
223+
if READY_TO_SCHEDULE_TAG not in _fakesocket.__dict__:
224+
return
225+
return getattr(self.socket, funcName)(*args, **kwargs)
226+
return reroute_call
227+
228+
def update_method_referrers(methodName, oldClassMethod, newClassMethod):
229+
"""
230+
The instance methods we need to update are stored in slots on instances of
231+
socket._socketobject (actually our replacement subclass _socketobject_new).
232+
"""
233+
for referrer1 in gc.get_referrers(oldClassMethod):
234+
if isinstance(referrer1, types.MethodType):
235+
for referrer2 in gc.get_referrers(referrer1):
236+
if isinstance(referrer2, _socketobject_new):
237+
setattr(referrer2, methodName, types.MethodType(newClassMethod, referrer1.im_self, referrer1.im_class))
238+
239+
# Guard against removal if not in place.
240+
if flag:
241+
if READY_TO_SCHEDULE_TAG not in _fakesocket.__dict__:
242+
return
243+
del _fakesocket.__dict__[READY_TO_SCHEDULE_TAG]
244+
else:
245+
_fakesocket.__dict__[READY_TO_SCHEDULE_TAG] = None
246+
# sys.__stdout__.write("READY_TO_SCHEDULE %s\n" % flag)
247+
248+
# Play switcheroo with the attributes to get direct socket usage, or normal socket usage.
249+
for attributeName in dir(_realsocket_old):
250+
if not attributeName.startswith("_"):
251+
storageAttributeName = attributeName +"_SET_ASIDE"
252+
if flag:
253+
storedValue = _fakesocket.__dict__.pop(storageAttributeName, None)
254+
if storedValue is not None:
255+
rerouteValue = _fakesocket.__dict__[attributeName]
256+
# sys.__stdout__.write("___ RESTORING %s (AS %s) (WAS %s)\n" % (attributeName, storedValue, rerouteValue))
257+
_fakesocket.__dict__[attributeName] = storedValue
258+
update_method_referrers(attributeName, rerouteValue, storedValue)
259+
else:
260+
if attributeName in _fakesocket.__dict__:
261+
# sys.__stdout__.write("___ STORING %s = %s\n" % (attributeName, _fakesocket.__dict__[attributeName]))
262+
_fakesocket.__dict__[storageAttributeName] = _fakesocket.__dict__[attributeName]
263+
_fakesocket.__dict__[attributeName] = reroute_wrapper(attributeName)
264+
188265

189266
class _fakesocket(asyncore.dispatcher):
190267
connectChannel = None
@@ -434,23 +511,23 @@ def close(self):
434511

435512
self.connected = False
436513
self.accepting = False
437-
self.writeQueue = []
438514

439515
# Clear out all the channels with relevant errors.
440516
while self.acceptChannel and self.acceptChannel.balance < 0:
441-
self.acceptChannel.send_exception(error, 9, 'Bad file descriptor')
517+
self.acceptChannel.send_exception(stdsocket.error, EBADF, 'Bad file descriptor')
442518
while self.connectChannel and self.connectChannel.balance < 0:
443-
self.connectChannel.send_exception(error, 10061, 'Connection refused')
444-
self._clear_read_queue()
519+
self.connectChannel.send_exception(stdsocket.error, ECONNREFUSED, 'Connection refused')
520+
self._clear_queue(self.writeQueue, stdsocket.error, ECONNRESET)
521+
self._clear_queue(self.readQueue)
445522

446-
def _clear_read_queue(self, *args):
447-
for t in self.readQueue:
523+
def _clear_queue(self, queue, *args):
524+
for t in queue:
448525
if t[0].balance < 0:
449526
if len(args):
450527
t[0].send_exception(*args)
451528
else:
452529
t[0].send("")
453-
self.readQueue = []
530+
queue.clear()
454531

455532
# asyncore doesn't support this. Why not?
456533
def fileno(self):
@@ -503,6 +580,11 @@ def handle_connect(self):
503580
# Asyncore says its done but self.readBuffer may be non-empty
504581
# so can't close yet. Do nothing and let 'recv' trigger the close.
505582
def handle_close(self):
583+
# These do not interfere with ongoing reads, but should prevent
584+
# sends and the like from going through.
585+
self.connected = False
586+
self.accepting = False
587+
506588
# This also gets called in the case that a non-blocking connect gets
507589
# back to us with a no. If we don't reject the connect, then all
508590
# connect calls that do not connect will block indefinitely.
@@ -595,10 +677,12 @@ def asyncore_send(self, data, flags=0):
595677
try:
596678
result = self.socket.send(data, flags)
597679
return result
598-
except socket.error, why:
680+
except stdsocket.error, why:
599681
if why.args[0] == EWOULDBLOCK:
600682
return 0
601683
elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
684+
# Ensure the sender appears to have directly received this exception.
685+
channel.send_exception(why.__class__, *why.args)
602686
self.handle_close()
603687
return 0
604688
else:
@@ -767,6 +851,10 @@ def UDPClient(address):
767851
finally:
768852
uninstall()
769853

854+
if "notready" in sys.argv:
855+
sys.argv.remove("notready")
856+
ready_to_schedule(False)
857+
770858
if len(sys.argv) == 2:
771859
if sys.argv[1] == "client":
772860
print "client started"

0 commit comments

Comments
 (0)