Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

消费者端无法连接到kafka,连接地址被固定 #1689

Open
Dr-SummerFlower opened this issue May 26, 2024 · 11 comments
Open

消费者端无法连接到kafka,连接地址被固定 #1689

Dr-SummerFlower opened this issue May 26, 2024 · 11 comments

Comments

@Dr-SummerFlower
Copy link

我在使用kafkajs写一个demo时发现了无法连接的情况,根据我对日志的排查,发现broker固定连接一个名为“kafka:9092”的地址,而不是我在实例化kafkajs时设定的主机地址,这就导致我无法连接到正确的服务器,我通过在计算机的hosts文件中添加192.168.21.11 kafka解决了这个问题,但是我还是希望你们能修复这个问题。

我的项目依赖:

{
  "dependencies": {
    "express": "^4.19.2",
    "kafkajs": "^2.2.4"
  }
}

出现问题时的日志:

{"level":"ERROR","timestamp":"2024-05-26T09:41:29.695Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka:9092","clientId":"test"}

我的代码

import * as express from 'express';
import { Express, Request, Response } from 'express';
import { Kafka, Partitioners, Producer } from 'kafkajs';

const app: Express = express();

const kafka: Kafka = new Kafka({
    clientId: 'test',
    brokers: ['kafka:9092'],
});
const producer: Producer = kafka.producer({
    createPartitioner: Partitioners.LegacyPartitioner,
    allowAutoTopicCreation: true,
});

app.use(express.json());

app.post('/data', async (req: Request, res: Response): Promise<void> => {
    await producer.connect();
    await producer.send({
        topic: 'test-topic',
        messages: [
            {
                value: JSON.stringify(req.body),
            },
        ],
    });
    await producer.disconnect();
    res.status(200).send('请求成功');
});

app.listen(25551, (): void => {
    console.log('服务启动在:localhost:25551');
});

/**
 * @File: client.ts
 * @author: 夏花
 * @time: 2024-05-26
 */

import { Consumer, Kafka } from 'kafkajs';

const kafka: Kafka = new Kafka({
    clientId: 'test',
    brokers: ['192.168.21.11:9092'],
});
const consumer: Consumer = kafka.consumer({
    groupId: 'test-group',
});

(async (): Promise<void> => {
    await consumer.connect();
    await consumer.subscribe({
        topic: 'test-topic',
        fromBeginning: true,
    });
    await consumer.run({
        eachMessage: async ({ topic, partition, message }): Promise<void> => {
            console.log({
                topic,
                partition,
                offset: message.offset,
                value: message.value?.toString(),
            });
        },
    });
})();
@iejixudong
Copy link

const kafka: Kafka = new Kafka({
clientId: 'test',
brokers: ['kafka:9092'],
});
这个是你自己配置的

@Dr-SummerFlower
Copy link
Author

const kafka: Kafka = new Kafka({ clientId: 'test', Brokers: ['kafka:9092'], });这是你自己配置的

问题在于我使用ip也是这样,返回的日志任然是"kafka:9092",而不是我配置的地址

@JavenLaw
Copy link

我也遇到同样的问题

新配置的地址启动时候能生效
但是当重连的时候,就会使用旧的地址

@JavenLaw
Copy link

遇到同样的问题
开始使用的测试连接地址是A
后面开始使用正式的地址是B

在启动的时候,实例化的地址确实为B
但是当这期间网络断开,kafkajs重新连接时,就会一直重连地址A
并报TIMEOUT错误

但是搜索整个项目和配置,都已经不存在地址A
就像此问题说的:连接地址被固定了

@Dr-SummerFlower
Copy link
Author

遇到同样的问题 开始使用的测试连接地址是A 后面开始使用正式的地址是B

在启动的时候,实例化的地址确实为B 但是当这期间网络断开,kafkajs重新连接时,就会一直重连地址A 并报TIMEOUT错误

但是搜索整个项目和配置,都已经不存在地址A 就像此问题说的:连接地址被固定了

尝试在hosts中添加“<kafka的主机地址> kafka”暂时解决问题吧,然后等待更新

@JavenLaw
Copy link

遇到同样的问题 开始使用的测试连接地址是A 后面开始使用正式的地址是B
在启动的时候,实例化的地址确实为B 但是当这期间网络断开,kafkajs重新连接时,就会一直重连地址A 并报TIMEOUT错误
但是搜索整个项目和配置,都已经不存在地址A 就像此问题说的:连接地址被固定了

尝试在hosts中添加“<kafka的主机地址> kafka”暂时解决问题吧,然后等待更新

不过我的配置直接是:brokers: [“127.0.0.1:9001”]的格式也可以吗?在hosts如果想把127.0.0.1改为192.168.1.1应该如何改呢

@Dr-SummerFlower
Copy link
Author

遇到同样的问题开始使用的测试连接地址是A后面开始使用的正式地址是B
在启动的时候,实例化的地址确实为B但是当这期间网络断开,kafkajs重新连接时,就会一直重连接地址A并报超时错误
但是搜索整个项目和配置,都已经不存在地址A就像此问题所说的:连接地址已固定了

尝试在hosts中添加“<kafka的主机地址> kafka”暂时解决问题吧,等待然后更新

我的配置直接是:brokers: [“127.0.0.1:9001”]的格式也可以吗?在hosts的话如果想把127.0.0.1改为192.168.1.1应该怎么改呢

我没有尝试过修改127.0.0.1,但是我想它是可以的
192.168.1.1应该是你的路由器向你的电脑分配的地址,同样也可以添加到hosts文件中
你可以在hosts文件中添加

127.0.0.1 kafka
或者
192.168.1.1 kafka

两个只需要选择其中一个添加就可以,他们的效果应该是一样的

@JavenLaw
Copy link

遇到同样的问题开始使用的测试连接地址是A后面开始使用的正式地址是B
在启动的时候,实例化的地址确实为B但是当这期间网络断开,kafkajs重新连接时,就会一直重连接地址A并报超时错误
但是搜索整个项目和配置,都已经不存在地址A就像此问题所说的:连接地址已固定了

尝试在hosts中添加“<kafka的主机地址> kafka”暂时解决问题吧,等待然后更新

我的配置直接是:brokers: [“127.0.0.1:9001”]的格式也可以吗?在hosts的话如果想把127.0.0.1改为192.168.1.1应该怎么改呢

我没有尝试过修改127.0.0.1,但是我想它是可以的 192.168.1.1应该是你的路由器向你的电脑分配的地址,同样也可以添加到hosts文件中 你可以在hosts文件中添加

127.0.0.1 kafka
或者
192.168.1.1 kafka

两个只需要选择其中一个添加就可以,他们的效果应该是一样的

我的意思是:我没有使用brokers: ['kafka:9092'],而是直接brokers: ['127.0.0.1:9092']
此时如何在hosts中配置127.0.0.1 kafka?

@JavenLaw
Copy link

实验了一下,即使hosts改了,在重新连接时还是会去连接旧地址

@Dr-SummerFlower
Copy link
Author

这就是我遇到的问题了,它似乎是将brokers: ['kafka:9092']直接写入到npm包中了,导致我们写的代码中的地址无法被npm包接受作为链接地址

@mxdmly
Copy link

mxdmly commented Oct 24, 2024

建议用英文描述这个问题

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants