Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do network connections and writes in KafkaClient.poll() #1729

Merged
merged 6 commits into from
Mar 8, 2019

Conversation

dpkp
Copy link
Owner

@dpkp dpkp commented Mar 6, 2019

This PR completes #981 and attempts to address several reports of consumer lockups that appear related to KafkaClient blocking for up to the full request_timeout_ms while holding the client lock, while all other threads are prevented from initiating new network requests or connections until the lock is released.

There are 4 commits here. The first is a simple refactor of BrokerConnection that separates queueing of a new network request (via .send / ._send) and performing the actual network IO (via .send_pending_requests).

The second updates KafkaClient to only performing network IO requests during .poll(). It uses the wakeup channel to signal between threads, allowing a sender to wakeup a blocked poller and trigger an immediate call to .send_pending_requests().

The third commit address network connection management, separate from network writes. It updates KafkaClient.send() to only acquire the client lock when a new connection is needed.

And the fourth commit completes the transition by moving all connection attempts via _maybe_connect into KafkaClient.poll(), which should eliminate the thread contention between a thread that is polling and some other thread that wants to initiate network IO.


This change is Reviewable

Copy link
Collaborator

@jeffwidman jeffwidman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

if blocking:
error = self.send_pending_requests()
if isinstance(error, Exception):
future.failure(error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a debug-level log statement here of this error?
It looks like send_pending_requests() already logs most (but not all) errors, so this may be superfluous, but my one thought is that if someone files a ticket, we have a little more visibility/guarantees about the errors they're hitting...

future.failure(error)
return future

log.debug('%s Request %d: %s', self, correlation_id, request)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this log line be located above the if blocking: line? Since the info seems useful regardless of whether blocking.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation we only log this after the request is sent successfully. So I put this after the blocking section to keep it consistent.

except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
log.exception("Error sending request data to %s", self)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to stop logging the request value?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this design we only have the encoded bytes at this stage -- we no longer have the original request object. so for that reason i took it out of the log message. This error should be sent down to the future and we can expect that the error handler for the request future will be responsible for logging the details.

"""Send a request to a specific node.
"""Send a request to a specific node. Bytes are placed on an
internal per-connection send-queue. Actual network I/O will be
triggered in a subsequent call to .poll()
Copy link
Collaborator

@jeffwidman jeffwidman Mar 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this create any new scenarios where we now need to make sure to call poll()?
In other words, will this break any behavior that used to work fine w/o ever calling poll()?

Currently, I can't think of any--looks like metadata refresh picks this up automatically since it relies on maybe_connect and you also updated the fetcher to always call poll(), just wondering if there might be any others...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, yes, but they should be rare. The main culprits would be blocking loops attempting to connect without calling poll(), or not calling poll() unless there are in-flight-requests. I think I've found and fixed those issues, but definitely keep an eye open for others.

@jeffwidman
Copy link
Collaborator

I rebased to pickup the latest changes on master, in particular the locking change... I doubt there's any issues, but just in case. It was also for convenience so I could quickly pip install the tip of this on a production box (where my tooling is limited) w/o having to deal with creating my own separate fork etc.

@dpkp
Copy link
Owner Author

dpkp commented Mar 8, 2019

I'm going to be out of town / offline for the next week or so. Tests are passing, and I'm satisfied w/ where this is at for now, so I'm going to merge to master. Feel free to post more feedback here that you collect from real-world usage.

@dpkp dpkp merged commit 8c07925 into master Mar 8, 2019
@dpkp dpkp deleted the async_connect_send branch March 8, 2019 16:01
@jeffwidman
Copy link
Collaborator

Sounds good, thanks for all the hard work here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants