-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do network connections and writes in KafkaClient.poll() #1729
Changes from all commits
3c87379
492b7d2
2313286
957c62d
7ce6976
0a8914f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -733,47 +733,58 @@ def close(self, error=None): | |
future.failure(error) | ||
self.config['state_change_callback'](self) | ||
|
||
def send(self, request): | ||
"""send request, return Future() | ||
|
||
Can block on network if request is larger than send_buffer_bytes | ||
""" | ||
def send(self, request, blocking=True): | ||
"""Queue request for async network send, return Future()""" | ||
future = Future() | ||
if self.connecting(): | ||
return future.failure(Errors.NodeNotReadyError(str(self))) | ||
elif not self.connected(): | ||
return future.failure(Errors.KafkaConnectionError(str(self))) | ||
elif not self.can_send_more(): | ||
return future.failure(Errors.TooManyInFlightRequests(str(self))) | ||
return self._send(request) | ||
return self._send(request, blocking=blocking) | ||
|
||
def _send(self, request): | ||
def _send(self, request, blocking=True): | ||
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) | ||
future = Future() | ||
correlation_id = self._protocol.send_request(request) | ||
|
||
# Attempt to replicate behavior from prior to introduction of | ||
# send_pending_requests() / async sends | ||
if blocking: | ||
error = self.send_pending_requests() | ||
if isinstance(error, Exception): | ||
future.failure(error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps add a debug-level log statement here of this error? |
||
return future | ||
|
||
log.debug('%s Request %d: %s', self, correlation_id, request) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this log line be located above the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the current implementation we only log this after the request is sent successfully. So I put this after the blocking section to keep it consistent. |
||
if request.expect_response(): | ||
sent_time = time.time() | ||
ifr = (correlation_id, future, sent_time) | ||
self.in_flight_requests.append(ifr) | ||
else: | ||
future.success(None) | ||
return future | ||
|
||
def send_pending_requests(self): | ||
"""Can block on network if request is larger than send_buffer_bytes""" | ||
if self.state not in (ConnectionStates.AUTHENTICATING, | ||
ConnectionStates.CONNECTED): | ||
return Errors.NodeNotReadyError(str(self)) | ||
data = self._protocol.send_bytes() | ||
try: | ||
# In the future we might manage an internal write buffer | ||
# and send bytes asynchronously. For now, just block | ||
# sending each request payload | ||
sent_time = time.time() | ||
total_bytes = self._send_bytes_blocking(data) | ||
if self._sensors: | ||
self._sensors.bytes_sent.record(total_bytes) | ||
return total_bytes | ||
except ConnectionError as e: | ||
log.exception("Error sending %s to %s", request, self) | ||
log.exception("Error sending request data to %s", self) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to stop logging the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with this design we only have the encoded bytes at this stage -- we no longer have the original request object. so for that reason i took it out of the log message. This error should be sent down to the future and we can expect that the error handler for the request future will be responsible for logging the details. |
||
error = Errors.KafkaConnectionError("%s: %s" % (self, e)) | ||
self.close(error=error) | ||
return future.failure(error) | ||
log.debug('%s Request %d: %s', self, correlation_id, request) | ||
|
||
if request.expect_response(): | ||
ifr = (correlation_id, future, sent_time) | ||
self.in_flight_requests.append(ifr) | ||
else: | ||
future.success(None) | ||
|
||
return future | ||
return error | ||
|
||
def can_send_more(self): | ||
"""Return True unless there are max_in_flight_requests_per_connection.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this create any new scenarios where we now need to make sure to call
poll()
?In other words, will this break any behavior that used to work fine w/o ever calling
poll()
?Currently, I can't think of any--looks like metadata refresh picks this up automatically since it relies on
maybe_connect
and you also updated the fetcher to always callpoll()
, just wondering if there might be any others...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible, yes, but they should be rare. The main culprits would be blocking loops attempting to connect without calling poll(), or not calling poll() unless there are in-flight-requests. I think I've found and fixed those issues, but definitely keep an eye open for others.