66
77use InvalidArgumentException ;
88use longlang \phpkafka \Broker ;
9- use longlang \phpkafka \Client \ClientInterface ;
109use longlang \phpkafka \Consumer \Assignor \PartitionAssignorInterface ;
1110use longlang \phpkafka \Consumer \Struct \ConsumerGroupMemberMetadata ;
1211use longlang \phpkafka \Exception \KafkaErrorException ;
1918use longlang \phpkafka \Protocol \Fetch \FetchPartition ;
2019use longlang \phpkafka \Protocol \Fetch \FetchRequest ;
2120use longlang \phpkafka \Protocol \Fetch \FetchResponse ;
21+ use longlang \phpkafka \Protocol \FindCoordinator \FindCoordinatorResponse ;
2222use longlang \phpkafka \Protocol \JoinGroup \JoinGroupRequestProtocol ;
2323use longlang \phpkafka \Util \KafkaUtil ;
2424use Swoole \Timer ;
@@ -50,11 +50,6 @@ class Consumer
5050 */
5151 protected $ offsetManagers = [];
5252
53- /**
54- * @var ClientInterface
55- */
56- protected $ client ;
57-
5853 /**
5954 * @var string
6055 */
@@ -100,6 +95,13 @@ class Consumer
10095 */
10196 private $ assignor ;
10297
98+ /**
99+ * @var FindCoordinatorResponse
100+ */
101+ private $ coordinator ;
102+
103+ protected $ fetchOptions = [];
104+
103105 public function __construct (ConsumerConfig $ config , ?callable $ consumeCallback = null )
104106 {
105107 $ this ->config = $ config ;
@@ -112,14 +114,13 @@ public function __construct(ConsumerConfig $config, ?callable $consumeCallback =
112114 $ broker ->setBrokers ($ config ->getBroker ());
113115 }
114116
115- $ this ->client = $ broker ->getClient ();
116117 $ this ->groupManager = $ groupManager = new GroupManager ($ broker );
117118 $ groupId = $ config ->getGroupId ();
118119
119120 $ this ->broker ->updateMetadata ($ config ->getTopic ());
120121
121122 // findCoordinator
122- $ groupManager ->findCoordinator ($ groupId , CoordinatorType::GROUP , $ config ->getGroupRetry (), $ config ->getGroupRetrySleep ());
123+ $ this -> coordinator = $ groupManager ->findCoordinator ($ groupId , CoordinatorType::GROUP , $ config ->getGroupRetry (), $ config ->getGroupRetrySleep ());
123124
124125 $ this ->rejoin ();
125126 }
@@ -133,7 +134,6 @@ public function rejoin()
133134 $ groupManager = $ this ->groupManager ;
134135 $ groupId = $ config ->getGroupId ();
135136 $ topics = $ config ->getTopic ();
136- $ client = $ this ->broker ->getClient ();
137137
138138 $ metadata = new ConsumerGroupMemberMetadata ();
139139 $ metadata ->setTopics ($ config ->getTopic ());
@@ -165,6 +165,9 @@ public function rejoin()
165165 $ consumerGroupMemberAssignment ->unpack ($ data );
166166 }
167167
168+ $ this ->initFetchOptions ();
169+
170+ $ client = $ this ->broker ->getClient ($ this ->coordinator ->getNodeId ());
168171 foreach ($ topics as $ topic ) {
169172 $ this ->offsetManagers [$ topic ] = $ offsetManager = new OffsetManager ($ client , $ topic , $ this ->getPartitions ($ topic ), $ groupId , $ config ->getGroupInstanceId (), $ this ->memberId , $ this ->generationId );
170173 $ offsetManager ->updateOffsets ($ config ->getOffsetRetry ());
@@ -234,6 +237,37 @@ public function ack(ConsumeMessage $message)
234237 $ offsetManager ->saveOffsets ($ partition , $ this ->config ->getOffsetRetry ());
235238 }
236239
240+ protected function initFetchOptions ()
241+ {
242+ $ fetchOptions = [];
243+ $ config = $ this ->config ;
244+ $ broker = $ this ->broker ;
245+ $ topicsMeta = $ broker ->getTopicsMeta ();
246+ foreach ($ config ->getTopic () as $ topic ) {
247+ $ currentTopicMetaItem = null ;
248+ foreach ($ topicsMeta as $ topicMetaItem ) {
249+ if ($ topicMetaItem ->getName () === $ topic ) {
250+ $ currentTopicMetaItem = $ topicMetaItem ;
251+ break ;
252+ }
253+ }
254+ if (!$ currentTopicMetaItem ) {
255+ continue ;
256+ }
257+ foreach ($ this ->getFetchPartitions ($ topic ) as $ partition ) {
258+ foreach ($ currentTopicMetaItem ->getPartitions () as $ topicsMetaItemPartition ) {
259+ if ($ partition === $ topicsMetaItemPartition ->getPartitionIndex ()) {
260+ foreach ($ topicsMetaItemPartition ->getReplicaNodes () as $ nodeId ) {
261+ $ fetchOptions [$ nodeId ][$ topic ][] = $ partition ;
262+ }
263+ break ;
264+ }
265+ }
266+ }
267+ }
268+ $ this ->fetchOptions = $ fetchOptions ;
269+ }
270+
237271 protected function fetchMessages ()
238272 {
239273 if (!$ this ->swooleHeartbeat ) {
@@ -250,17 +284,23 @@ protected function fetchMessages()
250284 }
251285 $ request ->setRackId ($ config ->getRackId ());
252286 $ topics = [];
253- foreach ($ config ->getTopic () as $ topic ) {
287+ $ currentList = current ($ this ->fetchOptions );
288+ if (false === $ currentList ) {
289+ $ currentList = reset ($ this ->fetchOptions );
290+ }
291+ $ nodeId = key ($ this ->fetchOptions );
292+ next ($ this ->fetchOptions );
293+ foreach ($ currentList as $ topic => $ partitions ) {
254294 $ fetchPartitions = [];
255- foreach ($ this -> getFetchPartitions ( $ topic ) as $ partition ) {
295+ foreach ($ partitions as $ partition ) {
256296 $ fetchPartitions [] = (new FetchPartition ())->setPartitionIndex ($ partition )->setFetchOffset ($ this ->getOffsetManager ($ topic )->getFetchOffset ($ partition ));
257297 }
258298 $ topics [] = (new FetchableTopic ())->setName ($ topic )->setFetchPartitions ($ fetchPartitions );
259299 }
260300 $ request ->setTopics ($ topics );
261301
262302 /** @var FetchResponse $response */
263- $ response = $ this ->client ->sendRecv ($ request );
303+ $ response = $ this ->broker -> getClient ( $ nodeId ) ->sendRecv ($ request );
264304 $ errorCode = $ response ->getErrorCode ();
265305 switch ($ errorCode ) {
266306 case ErrorCode::REBALANCE_IN_PROGRESS :
@@ -348,6 +388,9 @@ protected function getPartitions(string $topic): array
348388 return $ partitions ;
349389 }
350390
391+ /**
392+ * @return int[]
393+ */
351394 protected function getFetchPartitions (string $ topic ): array
352395 {
353396 $ partitions = [];
0 commit comments