Skip to content

Commit

Permalink
feat: add rocketmq allocate problem
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Mar 8, 2020
1 parent 69849a6 commit 8b596a7
Show file tree
Hide file tree
Showing 12 changed files with 448 additions and 29 deletions.
33 changes: 13 additions & 20 deletions index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@
<description>Recent content on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Sun, 08 Mar 2020 15:35:05 +0800</lastBuildDate>
<lastBuildDate>Sun, 08 Mar 2020 20:04:52 +0800</lastBuildDate>

<atom:link href="https://xujianhai.fun/index.xml" rel="self" type="application/rss+xml" />


<item>
<title>Rocketmq_allocate</title>
<link>https://xujianhai.fun/posts/rocketmq_allocate/</link>
<pubDate>Sun, 08 Mar 2020 20:04:52 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_allocate/</guid>
<description>最近一年中, 经常有用户不同的服务用一个group分别订阅不同的topic, 导致部分partition不消费
场景 业务反馈的时候, 通常是 监控上部分partition lag 增长, 并且queue的消费qps是0.
通过使用 mqadmin consumerProgress 查看offset 提交的时候, 发现这个group提交了多个topic, 并且每次结果不一样
-&amp;gt; % mqadmin consumerProgress -g groupA -n $addr -s true RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. #Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime %RETRY%groupA broker1 0 0 0 ip1 0 N/A topicA broker1 0 2180901 2180901 ip1 0 2020-03-08 20:10:04 topicA broker1 1 2000000 0 ip1 200000 2020-03-08 00:10:04 -&amp;gt; % mqadmin consumerProgress -g groupA -n $addr -s true RocketMQLog:WARN No appenders could be found for logger (io.</description>
</item>

<item>
<title>Rocketmq_subsconfig</title>
<link>https://xujianhai.fun/posts/rocketmq_subs/</link>
Expand Down Expand Up @@ -121,24 +133,5 @@ kafka group member 协议 主要参考 AbstractCoordinator 的实现流程 以
onJoinPrepare: 在 eager 模式下, 上次分配的内容全部 revoked; 在 COOPERATIVE 模式下, 只撤回不在定于的 topic 的 partition. metadata: sendJoinGroupRequest使用的数据信息, 用于后面的分配.</description>
</item>

<item>
<title>Kafka Group Kip</title>
<link>https://xujianhai.fun/posts/kafka-group-kip/</link>
<pubDate>Thu, 07 Nov 2019 09:54:35 +0800</pubDate>

<guid>https://xujianhai.fun/posts/kafka-group-kip/</guid>
<description>这里主要讨论 kafka group 相关的协议: rebalance, partition 等
https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request
https://cwiki.apache.org/confluence/display/KAFKA/KIP-379%3A+Multiple+Consumer+Group+Management
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89070828
https://cwiki.apache.org/confluence/display/KAFKA/KIP-341%3A+Update+Sticky+Assignor%27s+User+Data+Protocol
https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit
在 group非常大的时候, rebalance 次数就会增加; rebalance 时间取决于最慢的consumer, group 越大, 慢consumer出现的概率就越大. 除此之外, group coordinator 可能多个 group 共享的, 所以彼此会影响. 这个提案中, 提出了 `consumer.group.max.size` 的概念, 对 server端进行了保护. 当有超过数量的member加入, 将会收到 异常. https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request
之前都是 broker 在 收到 joinGroup request 的时候, 返回 uuid 给 client 作为 member.id, 在边缘case中(client不断重启加入), 可能导致内存膨胀. 这个 proposal 中, 就是需要用户手动提交 memebr.id https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
为了避免rebalance导致 有状态的应用程序的数据迁移. 目前的状态 broker group status: Running -&amp;gt; member JoinGroupRequest -&amp;gt; broker group status: PREPARE_REBALANCE -&amp;gt; broker group status: COMPLETING_REBALANCE -&amp;gt; sync group request (group members) -&amp;gt; SyncGroupResponse (broker send to memebrs) 其中, 第一个加入的 member 就是 group leader.</description>
</item>

</channel>
</rss>
6 changes: 6 additions & 0 deletions posts/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ <h1>Posts</h1>
<div class="post-year">2020</div>

<ul class="posts-list">
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_allocate/">
<span class="post-title">Rocketmq_allocate</span>
<span class="post-day">Mar 8</span>
</a>
</li>
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_subs/">
<span class="post-title">Rocketmq_subsconfig</span>
Expand Down
97 changes: 96 additions & 1 deletion posts/index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,104 @@
<description>Recent content in Posts on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Sun, 08 Mar 2020 15:35:05 +0800</lastBuildDate>
<lastBuildDate>Sun, 08 Mar 2020 20:04:52 +0800</lastBuildDate>
<atom:link href="https://xujianhai.fun/posts/index.xml" rel="self" type="application/rss+xml" />

<item>
<title>Rocketmq_allocate</title>
<link>https://xujianhai.fun/posts/rocketmq_allocate/</link>
<pubDate>Sun, 08 Mar 2020 20:04:52 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_allocate/</guid>
<description>最近一年中, 经常有用户不同的服务用一个group分别订阅不同的topic, 导致部分partition不消费
场景 业务反馈的时候, 通常是 监控上部分partition lag 增长, 并且queue的消费qps是0.
通过使用 mqadmin consumerProgress 查看offset 提交的时候, 发现这个group提交了多个topic, 并且每次结果不一样
-&amp;gt; % mqadmin consumerProgress -g groupA -n $addr -s true RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. #Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime %RETRY%groupA broker1 0 0 0 ip1 0 N/A topicA broker1 0 2180901 2180901 ip1 0 2020-03-08 20:10:04 topicA broker1 1 2000000 0 ip1 200000 2020-03-08 00:10:04 -&amp;gt; % mqadmin consumerProgress -g groupA -n $addr -s true RocketMQLog:WARN No appenders could be found for logger (io.</description>
<content type="html"><![CDATA[<blockquote>
<p>最近一年中, 经常有用户不同的服务用一个group分别订阅不同的topic, 导致部分partition不消费</p>
</blockquote>
<h3 id="场景">场景</h3>
<p>业务反馈的时候, 通常是 监控上部分partition lag 增长, 并且queue的消费qps是0.</p>
<p>通过使用 <code>mqadmin consumerProgress</code> 查看offset 提交的时候, 发现这个group提交了多个topic, 并且每次结果不一样</p>
<pre><code>-&gt; % mqadmin consumerProgress -g groupA -n $addr -s true
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime
%RETRY%groupA broker1 0 0 0 ip1 0 N/A
topicA broker1 0 2180901 2180901 ip1 0 2020-03-08 20:10:04
topicA broker1 1 2000000 0 ip1 200000 2020-03-08 00:10:04
</code></pre><pre><code>-&gt; % mqadmin consumerProgress -g groupA -n $addr -s true
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime
%RETRY%groupA broker1 0 0 0 ip1 0 N/A
topicB broker1 0 2172997 2172997 ip1 0 2020-03-08 20:10:05
topicB broker1 1 1000000 0 ip1 10000 2020-03-08 00:10:04
</code></pre><p>上面可以发现, groupA 消费了 topicA、topicB, 但是都只消费了一个queue, 导致了其他queue的lag. (为了说明问题, 这里很多使用了填充, 比如 ip、groupName、topic)</p>
<p>难道rocketmq不支持consumer client 同时订阅多个 topic?</p>
<p>其实正常使用rocketmq的情况下, 是允许一个consumer同时订阅多个topic的, 但是需要在consumer client启动前一次性订阅完多个topic, 比如下图, 一次性订阅TopicTestA 和 TopicTestB</p>
<p><img src="/rocketmq_sub_multi.png" alt="rocketmq_sub_multi" title="rocketmq_sub_multi"></p>
<p>但是如果业务分开来分别订阅的话, 就会存在问题, 如下使用</p>
<p><img src="/rocketmq_sub_a.png" alt="rocketmq_sub_a.png" title="rocketmq_sub_a.png"></p>
<p><img src="/rocketmq_sub_b.png" alt="rocketmq_sub_b.png" title="rocketmq_sub_b.png"></p>
<h3 id="原因">原因</h3>
<p>为什么出现这样的原因?</p>
<p>首先, 为什么每次查询的结果不一样. 这个和broker维护consumer信息的实现 以及 心跳实现有关, 关键结构如下:</p>
<pre><code>public class ConsumerManager {
private final ConcurrentMap&lt;String/* Group */, ConsumerGroupInfo&gt; consumerTable =
new ConcurrentHashMap&lt;String, ConsumerGroupInfo&gt;(1024);
......
}
public class ConsumerGroupInfo {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName;
private final ConcurrentMap&lt;String/* Topic */, SubscriptionData&gt; subscriptionTable =
new ConcurrentHashMap&lt;String, SubscriptionData&gt;()
.......
}
</code></pre><p>重点是 SubscriptionData 的维护上, 可以发现, 本质上这是个映射的实现: group -&gt; topic -&gt; SubscriptionData. 之所以可以订阅多个topic, 是因为 ConsumerGroupInfo 内部维护了一个topic的map, 这样订阅多个topic的时候, 只需要将topic和订阅数据 存放在 ConcurrentMap 中, 这个行为是在心跳的机制中实现的</p>
<p>但是在问题场景中, topicA 和 topicB 是分两种心跳投递的, 第一种心跳是 groupA 订阅 topicA 的心跳, 假设是服务A启动的进程; 第二种心跳是 groupA 订阅 topicB 的心跳, 假设是 服务B启动的. 因为是两次心跳逻辑, 在rocketmq中心跳中订阅数据是基于覆盖方式实现的, 关键实现如 ConsumerGroupInfo#updateSubscription:</p>
<pre><code>public class ConsumerGroupInfo {
public boolean updateSubscription(final Set&lt;SubscriptionData&gt; subList) {
boolean updated = false;
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
.....
} else if (sub.getSubVersion() &gt; old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info(&quot;subscription changed, group: {} OLD: {} NEW: {}&quot;,
this.groupName,
old.toString(),
sub.toString()
);
}
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
......
}
}
</code></pre><p>这里补充下, sub#subVersion 就是当前时间. 也就是说, 随着多次心跳, topic的 SubscriptionData 会经常变化. 在我们执行命令 <code>mqamdin consumerProgress</code>的逻辑, cosnumer 的topic消费信息会根据 订阅信息进行过滤, 因为心跳的原因, 导致每次过滤出的topic 不一样, 也就会导致我们看到的结果不一样</p>
<p>那么, 为什么 消费存在lag呢? 这个就和 消费的rebalance 有关了.</p>
<p>在cluster模式的消费的流程中, 需要走以下几个逻辑(不是串行的, 这里为了简单说明):</p>
<ol>
<li>遍历本地订阅的topic</li>
<li>获取这个topic下所有的mq</li>
<li>获取这个consumer group的所有consumerID</li>
<li>根据算法, 将topic下的所有mq分配给所有consumerID</li>
<li>当前consumer会尝试消费分配给自己的mq (如果是顺序消费, 会存在一次加锁行为, 这里不讨论)</li>
</ol>
<p>这里需要注意的是, 无论是 服务A 还是 服务B 的消费, 在第三步中, 获取的 consumerID 都是 两个服务总共的 consumer, 这样在执行分配的时候, topicA 和 topicB 都是基于所有的consumer进行分配的. 可是, 服务A的consumer client 并不会去消费 topicB, 这样 topicB分配给 服务A consumer client 的 mq 并不会消费, 导致了lag.</p>
<h3 id="如何快速定位">如何快速定位</h3>
<p>我们可以使用 <code>mqadmin consumerProgress</code> 查看这个consumer group 订阅topic的offset提交情况, 如果发现多个 topic 的提交情况, 并且并没有一次性订阅多个topic, 基本上是这种情况了</p>
]]></content>
</item>

<item>
<title>Rocketmq_subsconfig</title>
<link>https://xujianhai.fun/posts/rocketmq_subs/</link>
Expand Down
Loading

0 comments on commit 8b596a7

Please sign in to comment.