Skip to content

Commit 7aa582f

Browse files
author
Arthur Gautier
committed
Issue Gandi#7: drop timeout, use a non-blocking socket.
Signed-off-by: Arthur Gautier <baloo@gandi.net>
1 parent 862a51a commit 7aa582f

File tree

3 files changed

+185
-15
lines changed

3 files changed

+185
-15
lines changed

tox.ini

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
[tox]
22
envlist =
33
{py27,py3}-{lowest,release},
4-
# No python3 yet, not supported by thrift, see https://issues.apache.org/jira/browse/THRIFT-1857
54
pep8
65
# pylint,
76
# coverage

zipkin/client.py

Lines changed: 105 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
import logging
2-
from socket import timeout
2+
import socket
3+
import errno
4+
import struct
5+
from io import BytesIO
36

4-
from thriftpy.transport import TTransportException, TFramedTransportFactory
5-
from thriftpy.rpc import make_client
7+
from thriftpy.transport import TTransportException, TFramedTransportFactory,\
8+
TSocket
9+
from thriftpy.protocol import TBinaryProtocolFactory
10+
from thriftpy.protocol.binary import write_message_begin, write_val
11+
from thriftpy.thrift import TClient, TType, TMessageType
612

713
from .util import base64_thrift_formatter
814
from .scribe import scribe_thrift
@@ -12,24 +18,109 @@
1218

1319
CONNECTION_RETRIES = [1, 10, 20, 50, 100, 200, 400, 1000]
1420

21+
try:
22+
MSG_NOSIGNAL = socket.MSG_NOSIGNAL
23+
except:
24+
MSG_NOSIGNAL = 16384 # python2
25+
26+
27+
class TNonBlockingSocket(TSocket):
28+
def _init_sock(self):
29+
super(TNonBlockingSocket, self)._init_sock()
30+
31+
# 1M sendq buffer
32+
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024*1024)
33+
34+
self.sock.setblocking(0)
35+
36+
def open(self):
37+
self._init_sock()
38+
39+
addr = self.unix_socket or (self.host, self.port)
40+
status = self.sock.connect_ex(addr)
41+
42+
if status not in [errno.EINPROGRESS, errno.EALREADY]:
43+
raise IOError("connection attempt on a non-clean socket",
44+
errno.errorcode[status])
45+
46+
def write(self, buff):
47+
# First ensure our incoming end is always empty
48+
self.read_all()
49+
50+
# Then actually try to write to socket
51+
try:
52+
# We are a library, we can't just set sighandlers. But we don't
53+
# want SIGPIPE if peer has gone away either. We better set
54+
# MSG_NOSIGNAL to avoid that.
55+
# If peer has disconnected, then a errno.EPIPE will raise, and
56+
# will be catched on the uppper layer
57+
58+
self.sock.sendall(buff, MSG_NOSIGNAL)
59+
except socket.error as e:
60+
if e.errno not in [errno.EINPROGRESS, # Not connected yet
61+
errno.EWOULDBLOCK]: # write buffer full
62+
# In all other cases, raise.
63+
raise
64+
65+
# If not yet connected or write buffer is full, silently drop.
66+
67+
def read_all(self):
68+
"""
69+
Flush incoming buffer
70+
"""
71+
try:
72+
while True: # socket.error.errno.EAGAIN will exit this
73+
self.sock.recv(1024)
74+
except socket.error as e:
75+
# if EAGAIN or EWOULDBLOCK, then there is nothing to read
76+
if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]:
77+
# Otherwise that's an error.
78+
raise
79+
80+
return # No more data to read, or connection is not ready
81+
82+
def read(self, _):
83+
"""
84+
Mock response, we don't care about results. We never actually read
85+
them. But we don't want client to wait for server to reply.
86+
"""
87+
buffer = BytesIO()
88+
seq_id = 0 # Sequence id is never compared to message.
89+
write_message_begin(buffer, 'Log_result', TMessageType.REPLY, seq_id)
90+
91+
response = scribe_thrift.Scribe.Log_result(
92+
success=scribe_thrift.ResultCode.OK)
93+
write_val(buffer, TType.STRUCT, response)
94+
95+
out = buffer.getvalue()
96+
97+
# Framed message, starts with length of message.
98+
return struct.pack('!i', len(out)) + out
99+
100+
101+
def make_client(service, host, port,
102+
proto_factory=TBinaryProtocolFactory(),
103+
trans_factory=TFramedTransportFactory()):
104+
105+
socket = TNonBlockingSocket(host, port)
106+
transport = trans_factory.get_transport(socket)
107+
protocol = proto_factory.get_protocol(transport)
108+
transport.open()
109+
return TClient(service, protocol)
110+
15111

16112
class Client(object):
17113

18114
host = None
19115
port = 9410
20116
_client = None
21117
_connection_attempts = 0
22-
# This is used in TSocket which divides by 1000 before passing it to
23-
# python socket. This is ms, do not trust socket.settimeout documentation.
24-
timeout = 50
25118

26119
@classmethod
27120
def configure(cls, settings, prefix):
28121
cls.host = settings.get(prefix + 'collector')
29122
if prefix + 'collector.port' in settings:
30123
cls.port = int(settings[prefix + 'collector.port'])
31-
if prefix + 'collector.timeout' in settings:
32-
cls.timeout = int(settings[prefix + 'collector.timeout'])
33124

34125
@classmethod
35126
def get_connection(cls):
@@ -47,8 +138,7 @@ def get_connection(cls):
47138
try:
48139
cls._client = make_client(
49140
scribe_thrift.Scribe, host=cls.host,
50-
port=cls.port, timeout=cls.timeout,
51-
trans_factory=TFramedTransportFactory())
141+
port=cls.port)
52142

53143
cls._connection_attempts = 0
54144
except TTransportException:
@@ -79,10 +169,6 @@ def log(cls, trace):
79169
cls._client = None
80170
logger.error('EOFError while logging a trace on zipkin '
81171
'collector %s:%d' % (cls.host, cls.port))
82-
except timeout:
83-
cls._client = None
84-
logger.error('timeout when sending data or connecting to '
85-
'collector %s:%d' % (cls.host, cls.port))
86172
except Exception:
87173
cls._client = None
88174
logger.exception('Unknown Exception while logging a trace on '
@@ -91,6 +177,11 @@ def log(cls, trace):
91177
else:
92178
logger.warn("Can't log zipkin trace, not connected")
93179

180+
@classmethod
181+
def disconnect(cls):
182+
if cls._client:
183+
cls._client.close()
184+
94185

95186
def log(trace):
96187
Client.log(trace)

zipkin/tests/test_client.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import unittest
2+
import threading
3+
import socket
4+
import select
5+
6+
from ..client import Client
7+
from ..models import Trace, Annotation
8+
9+
10+
class Sinkhole(threading.Thread):
11+
def __init__(self, *args, **kwargs):
12+
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13+
self.sock.bind(('127.0.0.1', 0))
14+
self.port = self.sock.getsockname()[1]
15+
self.sock.listen(5)
16+
17+
self.stopped = False
18+
19+
super(Sinkhole, self).__init__()
20+
21+
self.has_received_data = False
22+
self.has_data = threading.Lock()
23+
self.has_data.acquire()
24+
25+
def run(self):
26+
sockets = [self.sock]
27+
28+
while not self.stopped:
29+
readable, _, _ = select.select(sockets, [], [])
30+
31+
for s in readable:
32+
if s is self.sock:
33+
connection, _ = s.accept()
34+
connection.setblocking(0)
35+
sockets.append(connection)
36+
else:
37+
data = s.recv(1024)
38+
if not data:
39+
sockets.remove(s)
40+
s.close()
41+
else:
42+
self.has_received_data = True
43+
self.has_data.release()
44+
45+
self.sock.close()
46+
47+
def stop(self):
48+
self.stopped = True
49+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50+
s.connect_ex(('127.0.0.1', self.port))
51+
s.close()
52+
self.join()
53+
54+
55+
class TestClient(unittest.TestCase):
56+
def setUp(self):
57+
self.server = Sinkhole()
58+
self.server.start()
59+
60+
def tearDown(self):
61+
self.server.stop()
62+
63+
def test_simple(self):
64+
Client.configure({
65+
'collector': '127.0.0.1',
66+
'collector.port': self.server.port,
67+
}, '')
68+
69+
trace = Trace('test')
70+
trace.record(Annotation.string('foo', 'bar'))
71+
Client.get_connection()
72+
73+
import time
74+
time.sleep(1) # Just to give it time to connect
75+
76+
Client.log(trace)
77+
self.server.has_data.acquire()
78+
self.server.has_data.release()
79+
self.assertTrue(self.server.has_received_data)
80+
Client.disconnect()

0 commit comments

Comments
 (0)