-
-
Notifications
You must be signed in to change notification settings - Fork 525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broker rediscovery with config.brokers parameter taking callback function #854
Conversation
Eero broker rediscovery
I think having this functionality makes sense, but I'm not sure about the implementation yet. Generally, I think having So, the first thing we need to agree on is what problem this is trying to solve, and then we can move on to the actual implementation itself. As a general note, I would prefer that even if we have multiple possible input types, we try to normalize this internally. So for example, if |
Fair criticism! I agree with the brokers parameter value to be either list or async method to get that list and nothing weird like I proposed initially. I've already updated code and it does look better. For the other part of the question - why the need to have sasl parameter dynamic? It's not about having different credentials per kafka host, but more about initally getting credentials asynchronously from another service and maybe the possibility that those credentials could get changed during runtime. |
Fixed tests by reverting to original broker rotation instead of taking random |
src/cluster/connectionBuilder.js
Outdated
|
||
const validateBrokers = brokers => { | ||
if (!brokers || brokers.length === 0) { | ||
throw new KafkaJSNonRetriableError(`Failed to connect: expected brokers array and got nothing`) | ||
throw new KafkaJSConnectionError(`Failed to connect: expected brokers array and got nothing`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be a KafkaJSConnectionError
, since it's actually a validation error that's not retriable unless the data it's validating changes. KafkaJSConnectionError
is retriable. I see three options:
- Keep it retriable and live with the fact that it's weird to validate the same static list N times.
- Make
validateBrokers
a bit smarter and have it throw a retriable error ifbrokers
is a function and a non-retriable error if it's not. - Wrap
brokers
in an async function that does retrying internally instead. Something along the lines of
const retrier = createRetry({ ...retry })
const getBrokers = () => retrier(async bail => {
const static = Array.isArray(brokers)
const list = static ? brokers : await brokers()
try {
validateBrokers(list) // Throws retriable error
return list
} catch (err) {
// if it's a static list, don't bother retrying
if (static) {
// Important to throw non-retriable error, since it will bubble up outside this retrier
bail(new KafkaJSNonRetriableError(err))
} else {
throw err
}
}
})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest second option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
src/cluster/connectionBuilder.js
Outdated
const getBrokers = async brokers => { | ||
const dynamic = typeof brokers === 'function' | ||
|
||
const list = dynamic ? await brokers() : brokers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to catch this so that you can wrap the error in a new KafkaJSConnectionError(error)
if it rejects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, updated getBrokers method to be more explicit about failures
Overall this looks good to me. But note that ##878 adds a new |
I think it's fine - this.brokers inside BrokerPool is always array. Only inside connectionBuilder it can be function. |
Ah yes, you're right. My mistake. 👍 |
Thanks for the contribution! I'll go about updating the documentation separately. |
Hi,
Please look at this proposal to add support for kafka servers full rediscovery using async callback. fixes #589
I'm reusing the brokers configuration parameter, so when value is a function it will be called (with async support) every time a new connection is built.
Most changes are because connectionBuilder.build() is now async method and updated all places using it.
Our custom service discovery code also provides SASL credentials, so the brokers callback can optionally also define username/password besides the brokers list.
Examples:
Todo:
[x] tests
[] docs