-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add broker compatibility check #499
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KSQL 0.2 and up need Kafka 0.11 since we need a bugfix in admin client that was merged in 0.11.
However, KSQL 0.1 will work with Kafka 0.10.
* until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}. | ||
* | ||
* @throws KsqlException if brokers have version 0.10.0.x | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KSQL 0.2 and up need Kafka 0.11 since we need a bugfix in admin client that was merged in 0.11.
However, KSQL 0.1 will work with Kafka 0.10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not it won't. Streams only works with kafka 0.10.1.x and higher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hjafarpour the AdminClient
bug fix, was that client only? i.e., we should be able to run with kafka 0.10.1.x and higher brokers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right @dguy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dguy yes, you are right. I was thinking about the KSQL dependency, but as you said it's client side and we should be fine with 0.10.1.x in broker side.
* @throws TimeoutException if there was no response within {@code request.timeout.ms} | ||
* @throws StreamsException any other fatal error | ||
*/ | ||
private ClientResponse sendRequest(final ClientRequest clientRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. this and the other utility methods here seem to duplicate the existing methods in NetworkClientUtils
: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
That class already has methods for waiting for a node to be ready, doing a blocking send and receive, etc.
Maybe we can just use that class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good spot, can replace this with NetworkClientUtils#sendAndReceive
however i don't see any value in replacing ensureOneNodeIsReady
As discussed offline, this is relying on internal Kafka code and it would be good to consider other options, if at all possible. Could we create a |
Thanks @ijuma, i verified that that will work. I'll update the PR later to use that instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@bluemonk3y @hjafarpour please review again as i changed it to use the consumer to do the check |
@dguy Much shorter code! :) LGTM. |
@dguy - also lgtm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, apart from one minor comment.
*/ | ||
void checkCompatibility() throws StreamsException { | ||
try { | ||
consumer.offsetsForTimes(Collections.singletonMap(topicPartition, 0L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have a mixed version cluster (like during an upgrade), this will validate the cluster even though some brokers are incompatible, right? Particularly, it only checks the version for the leader of partition 0.
We should file an issue to keep track of this limitation.. it may come back to bite us later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would fix itself if it happens in the middle of an upgrade right? Not sure if a JIRA is needed, maybe just a comment in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would fix itself for an upgrade. But not if the cluster was really mixed.
Admittedly, the latter shouldn't happen, and actually being able to detect it reliably with this approach is probably impossible since you would have to know all the brokers, then create a topic with enough partitions, and then hope that the leaders are evenly distributed, etc.
So perhaps a comment denoting the limitation is enough.
added a comment to |
Add a
BrokerCompatibilityCheck
that will run when the app starts and throw an exception if the brokers aren't of a compatible version