Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker rediscovery with config.brokers parameter taking callback function #854

Merged
merged 19 commits into from
Sep 21, 2020

Conversation

pimpelsang
Copy link
Contributor

@pimpelsang pimpelsang commented Sep 2, 2020

Hi,

Please look at this proposal to add support for kafka servers full rediscovery using async callback. fixes #589

I'm reusing the brokers configuration parameter, so when value is a function it will be called (with async support) every time a new connection is built.

Most changes are because connectionBuilder.build() is now async method and updated all places using it.

Our custom service discovery code also provides SASL credentials, so the brokers callback can optionally also define username/password besides the brokers list.

Examples:

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: async() => {
		const service = await myCustomServiceDiscovery();
		return [`${service.host}:${service.port}`];
	}
})

Todo:
[x] tests
[] docs

@pimpelsang pimpelsang changed the title Broker rediscovery with async getConnection method Broker rediscovery with async getConnection method, fixes #589 Sep 2, 2020
@pimpelsang pimpelsang changed the title Broker rediscovery with async getConnection method, fixes #589 Broker rediscovery with async getConnection method Sep 2, 2020
@pimpelsang pimpelsang changed the title Broker rediscovery with async getConnection method Broker rediscovery with config.brokers parameter taking callback function Sep 8, 2020
@Nevon
Copy link
Collaborator

Nevon commented Sep 11, 2020

I think having this functionality makes sense, but I'm not sure about the implementation yet.

Generally, I think having brokers be either a static list or an async function returning such a list makes sense. Having it also return sasl credentials though, sounds curious to me, and the API chafes a little bit. It seems to indicate to me that the credentials are per-host, which I have never encountered before. Is this the case, or is it just being used as a way to periodically refresh the credentials? If it's the latter, I think that should rather be implemented in some similar way to how OauthBearer is done, where the sasl object can provide an async function for refreshing the token used, but I see this as completely separate from broker discovery. Would you say that broker discovery and credential refreshing are the same thing, or could we break the two issues apart?

So, the first thing we need to agree on is what problem this is trying to solve, and then we can move on to the actual implementation itself. As a general note, I would prefer that even if we have multiple possible input types, we try to normalize this internally. So for example, if brokers can be either a list of strings or an async function resolving to a list of strings, I would at the outset wrap it into an async function that just returns the static list of strings, so that we don't need to deal with both possibilities further inside.

@pimpelsang
Copy link
Contributor Author

pimpelsang commented Sep 14, 2020

Fair criticism! I agree with the brokers parameter value to be either list or async method to get that list and nothing weird like I proposed initially. I've already updated code and it does look better.

For the other part of the question - why the need to have sasl parameter dynamic? It's not about having different credentials per kafka host, but more about initally getting credentials asynchronously from another service and maybe the possibility that those credentials could get changed during runtime.

@pimpelsang
Copy link
Contributor Author

Fixed tests by reverting to original broker rotation instead of taking random


const validateBrokers = brokers => {
if (!brokers || brokers.length === 0) {
throw new KafkaJSNonRetriableError(`Failed to connect: expected brokers array and got nothing`)
throw new KafkaJSConnectionError(`Failed to connect: expected brokers array and got nothing`)
Copy link
Collaborator

@Nevon Nevon Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be a KafkaJSConnectionError, since it's actually a validation error that's not retriable unless the data it's validating changes. KafkaJSConnectionError is retriable. I see three options:

  1. Keep it retriable and live with the fact that it's weird to validate the same static list N times.
  2. Make validateBrokers a bit smarter and have it throw a retriable error if brokers is a function and a non-retriable error if it's not.
  3. Wrap brokers in an async function that does retrying internally instead. Something along the lines of
const retrier = createRetry({ ...retry })
const getBrokers = () => retrier(async bail => {
  const static = Array.isArray(brokers)
  const list = static ? brokers : await brokers()
  try {
    validateBrokers(list) // Throws retriable error
    return list
  } catch (err) {
    // if it's a static list, don't bother retrying
    if (static) {
      // Important to throw non-retriable error, since it will bubble up outside this retrier
      bail(new KafkaJSNonRetriableError(err))
    } else {
      throw err
    }
  }
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest second option

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

types/index.d.ts Outdated Show resolved Hide resolved
const getBrokers = async brokers => {
const dynamic = typeof brokers === 'function'

const list = dynamic ? await brokers() : brokers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to catch this so that you can wrap the error in a new KafkaJSConnectionError(error) if it rejects.

Copy link
Contributor Author

@pimpelsang pimpelsang Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, updated getBrokers method to be more explicit about failures

@Nevon
Copy link
Collaborator

Nevon commented Sep 18, 2020

Overall this looks good to me. But note that ##878 adds a new removeBroker method to the BrokerPool that will need to be updated now that brokers can be a function rather than an array.

@pimpelsang
Copy link
Contributor Author

I think it's fine - this.brokers inside BrokerPool is always array. Only inside connectionBuilder it can be function.

@Nevon
Copy link
Collaborator

Nevon commented Sep 21, 2020

Ah yes, you're right. My mistake. 👍

@Nevon
Copy link
Collaborator

Nevon commented Sep 21, 2020

Thanks for the contribution! I'll go about updating the documentation separately.

@Nevon Nevon merged commit 6c553c8 into tulios:master Sep 21, 2020
Nevon added a commit that referenced this pull request Sep 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

allow "brokers" parameter in client configuration to be function
3 participants