Skip to content

Commit 9fac7f5

Browse files
fix: use coordinator (#12)
* fix: use coordinator ensure it is using the coordinator broker to send group messages * fix: consumers should update brokers too just like producers, consumers should update the broker list with clients so we can mapper nodeIds to broker clients correctly instead of relying on indexes
1 parent b3b59ce commit 9fac7f5

File tree

4 files changed

+58
-26
lines changed

4 files changed

+58
-26
lines changed

src/Broker.php

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,15 @@ public function close()
5252
public function updateBrokers()
5353
{
5454
$config = $this->config;
55-
$url = parse_url($config->getBootstrapServer());
55+
56+
if ($config instanceof ProducerConfig) {
57+
$url = parse_url($config->getBootstrapServer());
58+
} elseif ($config instanceof ConsumerConfig) {
59+
$url = parse_url(explode(',', $config->getBroker())[0]);
60+
} else {
61+
throw new InvalidArgumentException('Unknown config, it should be either a ProducerConfig or a ConsumerConfig');
62+
}
63+
5664
if (!$url) {
5765
throw new InvalidArgumentException(sprintf('Invalid bootstrapServer %s', $config->getBootstrapServer()));
5866
}
@@ -93,38 +101,31 @@ public function updateMetadata(array $topics = [], ?ClientInterface $client = nu
93101

94102
public function getClient(?int $brokerId = null): ClientInterface
95103
{
96-
if (null === $brokerId) {
97-
return $this->getRandomClient();
98-
} elseif (isset($this->brokers[$brokerId])) {
99-
return $this->getClientByIndex($brokerId);
100-
} else {
101-
throw new InvalidArgumentException(sprintf('Not found brokerId %s', $brokerId));
102-
}
104+
return $this->getClientByBrokerId($brokerId ?? array_rand($this->brokers, 1));
103105
}
104106

105-
public function getClientByIndex(int $index): ClientInterface
107+
public function getClientByBrokerId(int $brokerId): ClientInterface
106108
{
107-
$brokers = $this->getBrokers();
108-
$url = parse_url($brokers[$index]);
109+
if (!isset($this->brokers[$brokerId])) {
110+
throw new InvalidArgumentException(sprintf('Not found brokerId %s', $brokerId));
111+
}
112+
113+
$url = parse_url($this->brokers[$brokerId]);
109114
if (!$url) {
110-
throw new InvalidArgumentException(sprintf('Invalid bootstrapServer %s', $brokers[$index]));
115+
throw new InvalidArgumentException(sprintf('Invalid bootstrapServer %s', $this->brokers[$brokerId]));
111116
}
117+
112118
$config = $this->config;
113-
if (!isset($this->clients[$index])) {
119+
if (!isset($this->clients[$brokerId])) {
114120
$clientClass = KafkaUtil::getClientClass($config->getClient());
115121

116122
/** @var ClientInterface $client */
117123
$client = new $clientClass($url['host'], $url['port'] ?? 9092, $config, KafkaUtil::getSocketClass($config->getSocket()));
118124
$client->connect();
119-
$this->clients[$index] = $client;
125+
$this->clients[$brokerId] = $client;
120126
}
121127

122-
return $this->clients[$index];
123-
}
124-
125-
public function getRandomClient(): ClientInterface
126-
{
127-
return $this->getClientByIndex(array_rand($this->getBrokers(), 1));
128+
return $this->clients[$brokerId];
128129
}
129130

130131
/**

src/Consumer/Consumer.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,14 @@ public function __construct(ConsumerConfig $config, ?callable $consumeCallback =
104104
{
105105
$this->config = $config;
106106
$this->consumeCallback = $consumeCallback;
107+
107108
$this->broker = $broker = new Broker($config);
108-
$broker->setBrokers($config->getBroker());
109+
if ($config->getUpdateBrokers()) {
110+
$broker->updateBrokers();
111+
} else {
112+
$broker->setBrokers($config->getBroker());
113+
}
114+
109115
$this->client = $broker->getClient();
110116
$this->groupManager = $groupManager = new GroupManager($broker);
111117
$groupId = $config->getGroupId();

src/Consumer/ConsumerConfig.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ class ConsumerConfig extends CommonConfig
107107
*/
108108
protected $partitionAssignmentStrategy = \longlang\phpkafka\Consumer\Assignor\RangeAssignor::class;
109109

110+
/**
111+
* @var bool
112+
*/
113+
private $updateBrokers = true;
114+
110115
public function getClient(): ?string
111116
{
112117
return $this->client;
@@ -340,4 +345,16 @@ public function setPartitionAssignmentStrategy(string $partitionAssignmentStrate
340345

341346
return $this;
342347
}
348+
349+
public function getUpdateBrokers(): bool
350+
{
351+
return $this->updateBrokers;
352+
}
353+
354+
public function setUpdateBrokers(bool $updateBrokers): self
355+
{
356+
$this->updateBrokers = $updateBrokers;
357+
358+
return $this;
359+
}
343360
}

src/Group/GroupManager.php

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ class GroupManager
3131
*/
3232
protected $isLeader = false;
3333

34+
/**
35+
* @var FindCoordinatorResponse
36+
*/
37+
protected $findCoordinatorResponse;
38+
3439
/**
3540
* @var JoinGroupResponse
3641
*/
@@ -47,7 +52,10 @@ public function findCoordinator(string $key, int $keyType = CoordinatorType::GRO
4752
$request->setKey($key);
4853
$request->setKeyType($keyType);
4954

50-
return KafkaUtil::retry($this->broker->getClient(), $request, $retry, $sleep);
55+
/** @var FindCoordinatorResponse $response */
56+
$this->findCoordinatorResponse = KafkaUtil::retry($this->broker->getClient(), $request, $retry, $sleep);
57+
58+
return $this->findCoordinatorResponse;
5159
}
5260

5361
public function joinGroup(string $groupId, string $memberId, string $protocolType, ?string $groupInstanceId = null, array $protocols = [], int $sessionTimeoutMs = 60000, int $rebalanceTimeoutMs = -1, int $retry = 0, float $sleep = 0.01): JoinGroupResponse
@@ -62,7 +70,7 @@ public function joinGroup(string $groupId, string $memberId, string $protocolTyp
6270
$request->setRebalanceTimeoutMs($rebalanceTimeoutMs);
6371

6472
/** @var JoinGroupResponse $response */
65-
$response = $this->joinGroupResponse = KafkaUtil::retry($this->broker->getClient(), $request, $retry, $sleep);
73+
$response = $this->joinGroupResponse = KafkaUtil::retry($this->broker->getClient($this->findCoordinatorResponse->getNodeId()), $request, $retry, $sleep);
6674

6775
$this->isLeader = $response->getLeader() === $response->getMemberId();
6876

@@ -78,7 +86,7 @@ public function leaveGroup(string $groupId, string $memberId, ?string $groupInst
7886
(new MemberIdentity())->setMemberId($memberId)->setGroupInstanceId($groupInstanceId),
7987
]);
8088

81-
return KafkaUtil::retry($this->broker->getClient(), $request, $retry, $sleep);
89+
return KafkaUtil::retry($this->broker->getClient($this->findCoordinatorResponse->getNodeId()), $request, $retry, $sleep);
8290
}
8391

8492
public function syncGroup(string $groupId, string $groupInstanceId, string $memberId, int $generationId, string $protocolName, string $protocolType, array $assignments, int $retry = 0, float $sleep = 0.01): SyncGroupResponse
@@ -92,7 +100,7 @@ public function syncGroup(string $groupId, string $groupInstanceId, string $memb
92100
$request->setProtocolType($protocolType);
93101
$request->setAssignments($assignments);
94102

95-
return KafkaUtil::retry($this->broker->getClient(), $request, $retry, $sleep);
103+
return KafkaUtil::retry($this->broker->getClient($this->findCoordinatorResponse->getNodeId()), $request, $retry, $sleep);
96104
}
97105

98106
public function heartbeat(string $groupId, string $groupInstanceId, string $memberId, int $generationId, int $retry = 0, float $sleep = 0.01): HeartbeatResponse
@@ -103,7 +111,7 @@ public function heartbeat(string $groupId, string $groupInstanceId, string $memb
103111
$request->setGenerationId($generationId);
104112
$request->setMemberId($memberId);
105113

106-
return KafkaUtil::retry($this->broker->getClient(), $request, $retry, $sleep);
114+
return KafkaUtil::retry($this->broker->getClient($this->findCoordinatorResponse->getNodeId()), $request, $retry, $sleep);
107115
}
108116

109117
public function getClient(): ClientInterface

0 commit comments

Comments
 (0)