Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetchTopicMetadata generates Cannot read property 'topicMetadata' of null #549

Closed
lafeuil opened this issue Nov 7, 2019 · 2 comments · Fixed by #556
Closed

fetchTopicMetadata generates Cannot read property 'topicMetadata' of null #549

lafeuil opened this issue Nov 7, 2019 · 2 comments · Fixed by #556

Comments

@lafeuil
Copy link
Contributor

lafeuil commented Nov 7, 2019

When I call twice the fetchTopicMetadata function of admin client, I get the following error :

TypeError: Cannot read property 'topicMetadata' of null
    at Object.fetchTopicMetadata (/home/user/kafkajs-test/node_modules/kafkajs/src/admin/index.js:591:24)
    at processTicksAndRejections (internal/process/task_queues.js:85:5)
    at async /home/user/kafkajs-test/admin.js:22:5

To have this error, the first call of fetchTopicMetadata must be on an existed topic and the second call on a non-existent topic.

To reproduce :

const { Kafka } = require("kafkajs");
const kafka = new Kafka({
  clientId: "kafkajs-test.admin",
  brokers: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"]
});
const admin = kafka.admin();
(async () => {
  await admin.connect();
  try {
      await admin.fetchTopicMetadata({
        topics: ["topic1"] // topic1 exists => no error
      });
  } catch (e) {
    console.error("topic1 error", e);
  }

  try {
    await admin.fetchTopicMetadata({
      topics: ["topic2"] // topic2 does not exist => Cannot read property 'topicMetadata' of null 
    });
  } catch (e) {
    console.error("topic2 error", e);
  }
  await admin.disconnect();
})();

Normally, the fetchTopicMetadata function generates a KafkaJSProtocolError error with type UNKNOWN_TOPIC_OR_PARTITION if topic does not exist.

@lafeuil
Copy link
Contributor Author

lafeuil commented Nov 7, 2019

The issue is due to this silent try/catch in withBroker function :
https://github.com/tulios/kafkajs/blob/v1.11.0/src/cluster/brokerPool.js#L229

  for (const nodeId of brokers) {
      const broker = await this.findBroker({ nodeId })
      try {
        return await callback({ nodeId, broker })
      } catch (e) {} // catch UNKNOWN_TOPIC_OR_PARTITION HERE
    }

    return null // and so return null

@lafeuil
Copy link
Contributor Author

lafeuil commented Nov 7, 2019

I think the problem is in the refreshMetadataIfNecessary function. If I force the refresh like this by checking if the requested topics are in the cache, it works !

diff --git a/src/cluster/brokerPool.js b/src/cluster/brokerPool.js
index 2264ea6..ef9eb42 100644
--- a/src/cluster/brokerPool.js
+++ b/src/cluster/brokerPool.js
@@ -188,7 +188,12 @@ module.exports = class BrokerPool {
    */
   async refreshMetadataIfNecessary(topics) {
     const shouldRefresh =
-      this.metadata == null || this.metadataExpireAt == null || Date.now() > this.metadataExpireAt
+      this.metadata == null ||
+      this.metadataExpireAt == null ||
+      Date.now() > this.metadataExpireAt ||
+      !topics.every(topic =>
+        this.metadata.topicMetadata.some(topicMetadata => topicMetadata.topic === topic)
+      )
 
     if (shouldRefresh) {
       return this.refreshMetadata(topics)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant