Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions hazelcast/reactor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncore
import errno
import io
import logging
import os
import select
Expand Down Expand Up @@ -352,6 +353,7 @@ def connection_factory(self, connection_manager, connection_id, address, network
class AsyncoreConnection(Connection, asyncore.dispatcher):
sent_protocol_bytes = False
receive_buffer_size = _BUFFER_SIZE
send_buffer_size = _BUFFER_SIZE

def __init__(self, reactor, connection_manager, connection_id, address,
config, message_callback):
Expand All @@ -361,6 +363,7 @@ def __init__(self, reactor, connection_manager, connection_id, address,
self._reactor = reactor
self.connected_address = address
self._write_queue = deque()
self._write_buf = io.BytesIO()
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)

timeout = config.connection_timeout
Expand All @@ -378,6 +381,8 @@ def __init__(self, reactor, connection_manager, connection_id, address,
for level, option_name, value in config.socket_options:
if option_name is socket.SO_RCVBUF:
self.receive_buffer_size = value
elif option_name is socket.SO_SNDBUF:
self.send_buffer_size = value

self.socket.setsockopt(level, option_name, value)

Expand Down Expand Up @@ -445,20 +450,38 @@ def handle_read(self):
reader.process()

def handle_write(self):
while True:
try:
data = self._write_queue.popleft()
except IndexError:
return
write_queue = self._write_queue
send_buffer_size = self.send_buffer_size
write_batch = []
total_length = 0

sent = self.send(data)
self.last_write_time = time.time()
self.sent_protocol_bytes = True
if sent < len(data):
self._write_queue.appendleft(data[sent:])
while write_queue:
message_bytes = write_queue.popleft()
write_batch.append(message_bytes)
total_length += len(message_bytes)

if sent == 0:
return
if total_length >= send_buffer_size:
break

# We enter this only if len(write_queue) > 0.
# So, len(write_batch) cannot be 0.
if len(write_batch) == 1:
bytes_ = write_batch[0]
else:
buf = self._write_buf
buf.seek(0)
for message_bytes in write_batch:
buf.write(message_bytes)

bytes_ = buf.getvalue()
buf.truncate(0)

sent = self.send(bytes_)
self.last_write_time = time.time()
self.sent_protocol_bytes = True

if sent < len(bytes_):
write_queue.appendleft(bytes_[sent:])

def handle_close(self):
_logger.warning("Connection closed by server")
Expand All @@ -485,6 +508,7 @@ def writable(self):

def _inner_close(self):
asyncore.dispatcher.close(self)
self._write_buf.close()

def __repr__(self):
return "Connection(id=%s, live=%s, remote_address=%s)" % (self._id, self.live, self.remote_address)
Expand Down
16 changes: 16 additions & 0 deletions tests/reactor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,19 @@ def test_receive_buffer_size(self):
self.assertEqual(size, conn.receive_buffer_size)
finally:
conn._inner_close()

def test_send_buffer_size(self):
# When the SO_SNDBUF option is set, we should try
# to use that value while trying to write something.
config = _Config()
size = 64 * 1024
config.socket_options = [
(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
]
conn = AsyncoreConnection(MagicMock(map=dict()), None, None, self.member.address, config, None)

try:
# By default this is set to 128000
self.assertEqual(size, conn.send_buffer_size)
finally:
conn._inner_close()