Skip to content
This repository was archived by the owner on Jul 15, 2023. It is now read-only.

Fix for RecreateSyncProducerPoolForMetadata #34

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions src/KafkaNET.Library/Helper/KafkaSimpleManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Kafka.Client.Helper
using Kafka.Client.Producers.Sync;
using Kafka.Client.Requests;
using Kafka.Client.Utils;
using Microsoft.KafkaNET.Library.Util;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand Down Expand Up @@ -177,35 +178,37 @@ public TopicMetadata RefreshMetadata(short versionId, string clientId, int corre
Logger.InfoFormat("RefreshMetadata enter: {0} {1} {2} Topic:{3} Force:{4}", versionId, clientId, correlationId, topic, force);
if (!force && this.TopicMetadatas.ContainsKey(topic))
return this.TopicMetadatas[topic];

int retry = 0;
while (retry < 2)
Dictionary<string, TopicMetadata> tempTopicMetadatas = new Dictionary<string, TopicMetadata>();
Dictionary<string, DateTime> tempTopicMetadatasLastUpdateTime = new Dictionary<string, DateTime>();
Dictionary<int, Tuple<Broker, BrokerConfiguration>> partitionLeaders = new Dictionary<int, Tuple<Broker, BrokerConfiguration>>();
try
{
Dictionary<string, TopicMetadata> tempTopicMetadatas = new Dictionary<string, TopicMetadata>();
Dictionary<string, DateTime> tempTopicMetadatasLastUpdateTime = new Dictionary<string, DateTime>();
Dictionary<int, Tuple<Broker, BrokerConfiguration>> partitionLeaders = new Dictionary<int, Tuple<Broker, BrokerConfiguration>>();
RefreshMetadataInternal(versionId, clientId, correlationId, topic, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, partitionLeaders);

if (tempTopicMetadatas.ContainsKey(topic))
{
this.TopicMetadatas[topic] = tempTopicMetadatas[topic];
this.TopicMetadatasLastUpdateTime[topic] = tempTopicMetadatasLastUpdateTime[topic];
this.TopicMetadataPartitionsLeaders[topic] = partitionLeaders;
int partitionCountInZK = GetTopicPartitionsFromZK(topic).Count;
if (partitionCountInZK != partitionLeaders.Count)
Logger.WarnFormat("RefreshMetadata exit return. Some partitions has no leader. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} != partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK);
else
Logger.InfoFormat("RefreshMetadata exit return. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK);
return this.TopicMetadatas[topic];
}
}
catch (Exception ex)
{
Logger.WarnFormat("Got exception while refreshing metadata of topic {0}, {1} ",topic, ExceptionUtil.GetExceptionDetailInfo(ex));
// Calling RecreateSyncProducerPoolForMetadata to get a fresh list of broker used for metadata call
RecreateSyncProducerPoolForMetadata ();
throw;
}
if (tempTopicMetadatas.ContainsKey(topic))
{
this.TopicMetadatas[topic] = tempTopicMetadatas[topic];
this.TopicMetadatasLastUpdateTime[topic] = tempTopicMetadatasLastUpdateTime[topic];
this.TopicMetadataPartitionsLeaders[topic] = partitionLeaders;
int partitionCountInZK = GetTopicPartitionsFromZK(topic).Count;
if (partitionCountInZK != partitionLeaders.Count)
Logger.WarnFormat("RefreshMetadata exit return. Some partitions has no leader. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} != partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK);
else
{
Logger.WarnFormat("Got null for metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . ", topic);
RecreateSyncProducerPoolForMetadata();
}
retry++;
Logger.InfoFormat("RefreshMetadata exit return. Topic:{0} PartitionMetadata:{1} partitionLeaders:{2} partitionCountInZK:{3}", topic, tempTopicMetadatas[topic].PartitionsMetadata.Count(), partitionLeaders.Count, partitionCountInZK);
return this.TopicMetadatas[topic];
}
else
{
Logger.WarnFormat("Got null for metadata of topic {0}, will RecreateSyncProducerPoolForMetadata and retry . ", topic);
RecreateSyncProducerPoolForMetadata();
}

Logger.WarnFormat("RefreshMetadata exit return NULL: {0} {1} {2} Topic:{3} Force:{4}", versionId, clientId, correlationId, topic, force);
return null;
}
Expand Down Expand Up @@ -241,7 +244,6 @@ internal BrokerConfiguration GetLeaderBrokerOfPartition(string topic, int partit
private void RefreshMetadataInternal(short versionId, string clientId, int correlationId, string topic, Dictionary<string, TopicMetadata> tempTopicMetadatas, Dictionary<string, DateTime> tempTopicMetadatasLastUpdateTime, Dictionary<int, Tuple<Broker, BrokerConfiguration>> partitionLeaders)
{
Logger.InfoFormat("RefreshMetadataInternal enter: {0} {1} {2} Topic:{3} ", versionId, clientId, correlationId, topic);

lock (syncProducerPoolForMetadataLock)
{
BrokerPartitionInfo brokerPartitionInfo = new BrokerPartitionInfo(this.syncProducerPoolForMetaData, tempTopicMetadatas, tempTopicMetadatasLastUpdateTime, ProducerConfiguration.DefaultTopicMetaDataRefreshIntervalMS, this.syncProducerPoolForMetaData.zkClient);
Expand Down