Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do network connections and writes in KafkaClient.poll() #1729

Merged
merged 6 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,21 @@ def _conn_state_change(self, node_id, conn):
# SSL connections can enter this state 2x (second during Handshake)
if node_id not in self._connecting:
self._connecting.add(node_id)
try:
self._selector.register(conn._sock, selectors.EVENT_WRITE)
except KeyError:
self._selector.modify(conn._sock, selectors.EVENT_WRITE)

elif conn.connected():
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
self._connecting.remove(node_id)

try:
self._selector.unregister(conn._sock)
self._selector.modify(conn._sock, selectors.EVENT_READ, conn)
except KeyError:
pass
self._selector.register(conn._sock, selectors.EVENT_READ, conn)
self._selector.register(conn._sock, selectors.EVENT_READ, conn)

if self._sensors:
self._sensors.connection_created.record()

Expand All @@ -336,6 +339,7 @@ def _conn_state_change(self, node_id, conn):
self._selector.unregister(conn._sock)
except KeyError:
pass

if self._sensors:
self._sensors.connection_closed.record()

Expand All @@ -348,6 +352,17 @@ def _conn_state_change(self, node_id, conn):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()

def maybe_connect(self, node_id):
"""Queues a node for asynchronous connection during the next .poll()"""
if self._can_connect(node_id):
self._connecting.add(node_id)
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
self.wakeup()
return True
return False

def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
with self._lock:
Expand Down Expand Up @@ -397,7 +412,7 @@ def ready(self, node_id, metadata_priority=True):
Returns:
bool: True if we are ready to send to the given node
"""
self._maybe_connect(node_id)
self.maybe_connect(node_id)
return self.is_ready(node_id, metadata_priority=metadata_priority)

def connected(self, node_id):
Expand Down Expand Up @@ -499,14 +514,15 @@ def is_ready(self, node_id, metadata_priority=True):
return True

def _can_send_request(self, node_id):
with self._lock:
if node_id not in self._conns:
return False
conn = self._conns[node_id]
return conn.connected() and conn.can_send_more()
conn = self._conns.get(node_id)
if not conn:
return False
return conn.connected() and conn.can_send_more()

def send(self, node_id, request):
"""Send a request to a specific node.
"""Send a request to a specific node. Bytes are placed on an
internal per-connection send-queue. Actual network I/O will be
triggered in a subsequent call to .poll()
Copy link
Collaborator

@jeffwidman jeffwidman Mar 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this create any new scenarios where we now need to make sure to call poll()?
In other words, will this break any behavior that used to work fine w/o ever calling poll()?

Currently, I can't think of any--looks like metadata refresh picks this up automatically since it relies on maybe_connect and you also updated the fetcher to always call poll(), just wondering if there might be any others...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, yes, but they should be rare. The main culprits would be blocking loops attempting to connect without calling poll(), or not calling poll() unless there are in-flight-requests. I think I've found and fixed those issues, but definitely keep an eye open for others.


Arguments:
node_id (int): destination node
Expand All @@ -518,11 +534,21 @@ def send(self, node_id, request):
Returns:
Future: resolves to Response struct or Error
"""
with self._lock:
if not self._maybe_connect(node_id):
return Future().failure(Errors.NodeNotReadyError(node_id))
if not self._can_send_request(node_id):
self.maybe_connect(node_id)
return Future().failure(Errors.NodeNotReadyError(node_id))

# conn.send will queue the request internally
# we will need to call send_pending_requests()
# to trigger network I/O
future = self._conns[node_id].send(request, blocking=False)

return self._conns[node_id].send(request)
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
self.wakeup()

return future

def poll(self, timeout_ms=None, future=None):
"""Try to read and write to sockets.
Expand Down Expand Up @@ -640,6 +666,8 @@ def _poll(self, timeout):
conn.close(error=Errors.RequestTimedOutError(
'Request timed out after %s ms' %
conn.config['request_timeout_ms']))
else:
conn.send_pending_requests()

if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
Expand Down Expand Up @@ -801,9 +829,8 @@ def refresh_done(val_or_error):
# have such application level configuration, using request timeout instead.
return self.config['request_timeout_ms']

if self._can_connect(node_id):
if self.maybe_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
self._maybe_connect(node_id)
return self.config['reconnect_backoff_ms']

# connected but can't send more, OR connecting
Expand Down
49 changes: 30 additions & 19 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,47 +733,58 @@ def close(self, error=None):
future.failure(error)
self.config['state_change_callback'](self)

def send(self, request):
"""send request, return Future()

Can block on network if request is larger than send_buffer_bytes
"""
def send(self, request, blocking=True):
"""Queue request for async network send, return Future()"""
future = Future()
if self.connecting():
return future.failure(Errors.NodeNotReadyError(str(self)))
elif not self.connected():
return future.failure(Errors.KafkaConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
return self._send(request)
return self._send(request, blocking=blocking)

def _send(self, request):
def _send(self, request, blocking=True):
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
future = Future()
correlation_id = self._protocol.send_request(request)

# Attempt to replicate behavior from prior to introduction of
# send_pending_requests() / async sends
if blocking:
error = self.send_pending_requests()
if isinstance(error, Exception):
future.failure(error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a debug-level log statement here of this error?
It looks like send_pending_requests() already logs most (but not all) errors, so this may be superfluous, but my one thought is that if someone files a ticket, we have a little more visibility/guarantees about the errors they're hitting...

return future

log.debug('%s Request %d: %s', self, correlation_id, request)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this log line be located above the if blocking: line? Since the info seems useful regardless of whether blocking.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation we only log this after the request is sent successfully. So I put this after the blocking section to keep it consistent.

if request.expect_response():
sent_time = time.time()
ifr = (correlation_id, future, sent_time)
self.in_flight_requests.append(ifr)
else:
future.success(None)
return future

def send_pending_requests(self):
"""Can block on network if request is larger than send_buffer_bytes"""
if self.state not in (ConnectionStates.AUTHENTICATING,
ConnectionStates.CONNECTED):
return Errors.NodeNotReadyError(str(self))
data = self._protocol.send_bytes()
try:
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
# sending each request payload
sent_time = time.time()
total_bytes = self._send_bytes_blocking(data)
if self._sensors:
self._sensors.bytes_sent.record(total_bytes)
return total_bytes
except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
log.exception("Error sending request data to %s", self)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to stop logging the request value?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this design we only have the encoded bytes at this stage -- we no longer have the original request object. so for that reason i took it out of the log message. This error should be sent down to the future and we can expect that the error handler for the request future will be responsible for logging the details.

error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)

if request.expect_response():
ifr = (correlation_id, future, sent_time)
self.in_flight_requests.append(ifr)
else:
future.success(None)

return future
return error

def can_send_more(self):
"""Return True unless there are max_in_flight_requests_per_connection."""
Expand Down
13 changes: 1 addition & 12 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,16 +1070,6 @@ def _message_generator(self):
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()

# Because the consumer client poll does not sleep unless blocking on
# network IO, we need to explicitly sleep when we know we are idle
# because we haven't been assigned any partitions to fetch / consume
if self._use_consumer_group() and not self.assignment():
sleep_time = max(timeout_at - time.time(), 0)
if sleep_time > 0 and not self._client.in_flight_request_count():
log.debug('No partitions assigned; sleeping for %s', sleep_time)
time.sleep(sleep_time)
continue

# Short-circuit the fetch iterator if we are already timed out
# to avoid any unintentional interaction with fetcher setup
if time.time() > timeout_at:
Expand All @@ -1090,8 +1080,7 @@ def _message_generator(self):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
if self._client.in_flight_request_count():
self._client.poll(timeout_ms=0)
self._client.poll(timeout_ms=0)

# An else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
Expand Down
4 changes: 2 additions & 2 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def ensure_coordinator_ready(self):
if self.config['api_version'] < (0, 8, 2):
self.coordinator_id = self._client.least_loaded_node()
if self.coordinator_id is not None:
self._client.ready(self.coordinator_id)
self._client.maybe_connect(self.coordinator_id)
continue

future = self.lookup_coordinator()
Expand Down Expand Up @@ -686,7 +686,7 @@ def _handle_group_coordinator_response(self, future, response):
self.coordinator_id = response.coordinator_id
log.info("Discovered coordinator %s for group %s",
self.coordinator_id, self.group_id)
self._client.ready(self.coordinator_id)
self._client.maybe_connect(self.coordinator_id)
self.heartbeat.reset_timeouts()
future.success(self.coordinator_id)

Expand Down
7 changes: 4 additions & 3 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,11 @@ def _failure(error):
retries = 10
while True:
node_id = self._client.least_loaded_node()
for ready_retry in range(40):
if self._client.ready(node_id, False):
for connect_retry in range(40):
self._client.maybe_connect(node_id)
if self._client.connected(node_id):
break
time.sleep(.1)
self._client.poll(timeout_ms=100)
else:
raise RuntimeError('Could not connect to broker with node id %d' % (node_id,))

Expand Down
9 changes: 4 additions & 5 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ def test_conn_state_change(mocker, cli, conn):
conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
sel.unregister.assert_called_with(conn._sock)
sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn)

# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
Expand All @@ -145,7 +144,7 @@ def test_conn_state_change(mocker, cli, conn):


def test_ready(mocker, cli, conn):
maybe_connect = mocker.patch.object(cli, '_maybe_connect')
maybe_connect = mocker.patch.object(cli, 'maybe_connect')
node_id = 1
cli.ready(node_id)
maybe_connect.assert_called_with(node_id)
Expand Down Expand Up @@ -362,6 +361,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_connect', return_value=True)
mocker.patch.object(client, '_maybe_connect', return_value=True)
mocker.patch.object(client, 'maybe_connect', return_value=True)

now = time.time()
t = mocker.patch('time.time')
Expand All @@ -370,8 +370,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
# first poll attempts connection
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff
client._can_connect.assert_called_once_with('foobar')
client._maybe_connect.assert_called_once_with('foobar')
client.maybe_connect.assert_called_once_with('foobar')

# poll while connecting should not attempt a new connection
client._connecting.add('foobar')
Expand Down