Skip to content

Commit

Permalink
feat: add rocketmq subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Mar 8, 2020
1 parent 85f3bc1 commit 69849a6
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 19 deletions.
22 changes: 12 additions & 10 deletions index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@
<description>Recent content on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Sun, 08 Mar 2020 12:00:05 +0800</lastBuildDate>
<lastBuildDate>Sun, 08 Mar 2020 15:35:05 +0800</lastBuildDate>

<atom:link href="https://xujianhai.fun/index.xml" rel="self" type="application/rss+xml" />


<item>
<title>Rocketmq_subsconfig</title>
<link>https://xujianhai.fun/posts/rocketmq_subs/</link>
<pubDate>Sun, 08 Mar 2020 15:35:05 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_subs/</guid>
<description>最近咨询订阅配置的人比较多, 这里进行分析下.
配置信息 订阅配置信息是consumer向broker消费消息的凭证, 如果broker开启了 autoCreateSubscriptionGroup=false , 那么consumer client在消费之前, 必须通过命令行或者控制台上创建订阅配置, 然后consumer client使用配置订阅的名字. 通过命令行创建的订阅如下:
-&amp;gt; % mqadmin updateSubGroup usage: mqadmin updateSubGroup [-a &amp;lt;arg&amp;gt;] [-b &amp;lt;arg&amp;gt;] [-c &amp;lt;arg&amp;gt;] [-d &amp;lt;arg&amp;gt;] -g &amp;lt;arg&amp;gt; [-h] [-i &amp;lt;arg&amp;gt;] [-m &amp;lt;arg&amp;gt;] [-n &amp;lt;arg&amp;gt;] [-q &amp;lt;arg&amp;gt;] [-r &amp;lt;arg&amp;gt;] [-s &amp;lt;arg&amp;gt;] [-w &amp;lt;arg&amp;gt;] -a,--notifyConsumerIdsChanged &amp;lt;arg&amp;gt; notify consumerId changed -b,--brokerAddr &amp;lt;arg&amp;gt; create subscription group to which broker -c,--clusterName &amp;lt;arg&amp;gt; create subscription group to which cluster -d,--consumeBroadcastEnable &amp;lt;arg&amp;gt; broadcast -g,--groupName &amp;lt;arg&amp;gt; consumer group name -h,--help Print help -i,--brokerId &amp;lt;arg&amp;gt; consumer from which broker id -m,--consumeFromMinEnable &amp;lt;arg&amp;gt; from min offset -n,--namesrvAddr &amp;lt;arg&amp;gt; Name server address list, eg: 192.</description>
</item>

<item>
<title>Rocketmq_article</title>
<link>https://xujianhai.fun/posts/rocketmq_article/</link>
Expand Down Expand Up @@ -129,14 +140,5 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configu
为了避免rebalance导致 有状态的应用程序的数据迁移. 目前的状态 broker group status: Running -&amp;gt; member JoinGroupRequest -&amp;gt; broker group status: PREPARE_REBALANCE -&amp;gt; broker group status: COMPLETING_REBALANCE -&amp;gt; sync group request (group members) -&amp;gt; SyncGroupResponse (broker send to memebrs) 其中, 第一个加入的 member 就是 group leader.</description>
</item>

<item>
<title>Kafka Mirror Review</title>
<link>https://xujianhai.fun/posts/kafka-mirror-review/</link>
<pubDate>Sat, 02 Nov 2019 23:33:49 +0800</pubDate>

<guid>https://xujianhai.fun/posts/kafka-mirror-review/</guid>
<description></description>
</item>

</channel>
</rss>
6 changes: 6 additions & 0 deletions posts/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ <h1>Posts</h1>
<div class="post-year">2020</div>

<ul class="posts-list">
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_subs/">
<span class="post-title">Rocketmq_subsconfig</span>
<span class="post-day">Mar 8</span>
</a>
</li>
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_article/">
<span class="post-title">Rocketmq_article</span>
Expand Down
81 changes: 80 additions & 1 deletion posts/index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,88 @@
<description>Recent content in Posts on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Sun, 08 Mar 2020 12:00:05 +0800</lastBuildDate>
<lastBuildDate>Sun, 08 Mar 2020 15:35:05 +0800</lastBuildDate>
<atom:link href="https://xujianhai.fun/posts/index.xml" rel="self" type="application/rss+xml" />

<item>
<title>Rocketmq_subsconfig</title>
<link>https://xujianhai.fun/posts/rocketmq_subs/</link>
<pubDate>Sun, 08 Mar 2020 15:35:05 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_subs/</guid>
<description>最近咨询订阅配置的人比较多, 这里进行分析下.
配置信息 订阅配置信息是consumer向broker消费消息的凭证, 如果broker开启了 autoCreateSubscriptionGroup=false , 那么consumer client在消费之前, 必须通过命令行或者控制台上创建订阅配置, 然后consumer client使用配置订阅的名字. 通过命令行创建的订阅如下:
-&amp;gt; % mqadmin updateSubGroup usage: mqadmin updateSubGroup [-a &amp;lt;arg&amp;gt;] [-b &amp;lt;arg&amp;gt;] [-c &amp;lt;arg&amp;gt;] [-d &amp;lt;arg&amp;gt;] -g &amp;lt;arg&amp;gt; [-h] [-i &amp;lt;arg&amp;gt;] [-m &amp;lt;arg&amp;gt;] [-n &amp;lt;arg&amp;gt;] [-q &amp;lt;arg&amp;gt;] [-r &amp;lt;arg&amp;gt;] [-s &amp;lt;arg&amp;gt;] [-w &amp;lt;arg&amp;gt;] -a,--notifyConsumerIdsChanged &amp;lt;arg&amp;gt; notify consumerId changed -b,--brokerAddr &amp;lt;arg&amp;gt; create subscription group to which broker -c,--clusterName &amp;lt;arg&amp;gt; create subscription group to which cluster -d,--consumeBroadcastEnable &amp;lt;arg&amp;gt; broadcast -g,--groupName &amp;lt;arg&amp;gt; consumer group name -h,--help Print help -i,--brokerId &amp;lt;arg&amp;gt; consumer from which broker id -m,--consumeFromMinEnable &amp;lt;arg&amp;gt; from min offset -n,--namesrvAddr &amp;lt;arg&amp;gt; Name server address list, eg: 192.</description>
<content type="html"><![CDATA[<blockquote>
<p>最近咨询订阅配置的人比较多, 这里进行分析下.</p>
</blockquote>
<h2 id="配置信息">配置信息</h2>
<p>订阅配置信息是consumer向broker消费消息的凭证, 如果broker开启了 <code>autoCreateSubscriptionGroup=false</code> , 那么consumer client在消费之前, 必须通过命令行或者控制台上创建订阅配置, 然后consumer client使用配置订阅的名字. 通过命令行创建的订阅如下:</p>
<pre><code>-&gt; % mqadmin updateSubGroup
usage: mqadmin updateSubGroup [-a &lt;arg&gt;] [-b &lt;arg&gt;] [-c &lt;arg&gt;] [-d &lt;arg&gt;] -g &lt;arg&gt; [-h] [-i &lt;arg&gt;] [-m &lt;arg&gt;]
[-n &lt;arg&gt;] [-q &lt;arg&gt;] [-r &lt;arg&gt;] [-s &lt;arg&gt;] [-w &lt;arg&gt;]
-a,--notifyConsumerIdsChanged &lt;arg&gt; notify consumerId changed
-b,--brokerAddr &lt;arg&gt; create subscription group to which broker
-c,--clusterName &lt;arg&gt; create subscription group to which cluster
-d,--consumeBroadcastEnable &lt;arg&gt; broadcast
-g,--groupName &lt;arg&gt; consumer group name
-h,--help Print help
-i,--brokerId &lt;arg&gt; consumer from which broker id
-m,--consumeFromMinEnable &lt;arg&gt; from min offset
-n,--namesrvAddr &lt;arg&gt; Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-q,--retryQueueNums &lt;arg&gt; retry queue nums
-r,--retryMaxTimes &lt;arg&gt; retry max times
-s,--consumeEnable &lt;arg&gt; consume enable
-w,--whichBrokerWhenConsumeSlowly &lt;arg&gt; which broker id when consume slowly
</code></pre><p>其中, <code>-b</code> 和 <code>-c</code> 两个参数是对立的, <code>-b</code> 只会请求对应的broker, 而<code>-c</code> 则会先获取指定集群下的所有broker地址, 然后遍历执行创建<code>SubscriptionGroupConfig</code>. <code>-m</code> 目前是多余的配置, 暂时不起任何作用. 其他的配置中, 比较重要的如下:</p>
<ul>
<li>
<p>notifyConsumerIdsChanged: 当有新的consumer连接到broker的时候, 是否允许broker遍历已经注册的consumer进行通知请求, cosumer接收到通知请求后, 会触发rebalance. 这个参数主要的作用是什么呢? 如果是有序消费, 并没有太大的影响, 只是添加的consumer需要在下一轮rebalance之后才能消费, 并且是 获取到broker的队列锁之后才能消费; 如果是并发消费, 关闭这个选项的话, 就会导致严重的重复消费. 因为和有序消费不同, 并发消费没有队列锁, 那么, 如果关闭选项的话, 每个consumer不能及时感知到其他consumer的存在, 每个consumer rebalance的实际不一样, 导致一段时间内, 有的consumer消费的是加入前分配的结果, 有的consumer消费的则是分配后的结果, consumer主动触发rebalance是 20s.</p>
</li>
<li>
<p>consumeBroadcastEnable: 这个只有在需要广播消息的时候才需要打开, 一般用不到</p>
</li>
<li>
<p>consumeEnable: 正常使用直接设置成 true 就可以了. 如果希望所有的consumer都不消费, 比如 敏感秘密级别的原因, 设置成 false</p>
</li>
<li>
<p>whichBrokerWhenConsumeSlowly: 这个只有在 consumer group lag 非常大的时候才会触发. 只有在 brokerConfig#isSlaveReadEnable 打开的情况下才会奏效. 当master lag非常大的时候, rocketmq 是有策略的: 重定向consumer 到 slave 消费, 众所周知, master-salve 同步配置中, 每个broker是有 brokerId 的, brokerId=0 是master, brokerId大于0 的是slave, 一般建议设置为1, 一些master-slave的配置中, 为了保证数据不丢, 配置了两个slave, 一般是 slaveId=1 和 slaveId=2, 所以, 因为 whichBrokerWhenConsumeSlowly 只能设置一个值, 因为 存在一个slave空闲的场景</p>
</li>
</ul>
<h2 id="存储">存储</h2>
<p>SubscriptionGroupConfig 是存储在 broker 上的, 并且以 json 格式存储在 storePath 路径下. 上面只讲述了创建的功能, 其实 mqadmin 还提供了删除了功能.</p>
<h2 id="使用">使用</h2>
<ol>
<li>心跳</li>
</ol>
<ul>
<li>
<p>consumer group定期心跳的时候, 心跳数据包含了group name 和 订阅的topic. 这里存在一个判断, 如果brokerConfig中 <code>autoCreateSubscriptionGroup=true</code> 的话, 即使 group 没有注册过订阅信息, 这里就会创建一个默认的 SubscriptionGroupConfig. 为后面拉取消息 提供凭证. 如果说 broker 关闭自动创建 SubscriptionGroupConfig: <code>autoCreateSubscriptionGroup=false</code>, 那么 没有注册过的group 无法消费消息</p>
</li>
<li>
<p>在处理心跳的时候, 还会根据 <code>SubscriptionGroupConfig#notifyConsumerIdsChanged</code> 决定是否向已经注册的consumer client 发送 <code>consumerIdChange</code> 事件, 来触发 consumer client 的 rebalance</p>
</li>
</ul>
<ol start="2">
<li>拉消息</li>
</ol>
<ul>
<li>
<p>拉取消息之前, 必须有相应的 SubscriptionGroupConfig 信息, 如果没有的话, 就不能消费. 所以, 如果group没有注册过订阅信息, 那么 group 必须心跳成功后, 才有可能正常消费.</p>
</li>
<li>
<p>拉取消息的时候, 如果lag很大, 在 <code>brokerConfig#isSlaveReadEnable=true</code> 的配置下, 则会根据 <code>SubscriptionGroupConfig#whichBrokerWhenConsumeSlowly</code> 的slaveId 来重定向consumer client 向指定的slave 拉取消息</p>
</li>
</ul>
<ol start="3">
<li>监控</li>
</ol>
<ul>
<li>比较特殊的情况, 线上经常使用group测试消费数据一段时间后, 就不在使用了. 但是 group 的lag监控却在一直增长, 并可能引起报警影响用户的生活. 如果用的是开源的rocketmq 监控, 即使使用 <code>mqadmin#deleteSubGroup</code> 也不能消除lag</li>
</ul>
]]></content>
</item>

<item>
<title>Rocketmq_article</title>
<link>https://xujianhai.fun/posts/rocketmq_article/</link>
Expand Down
7 changes: 7 additions & 0 deletions posts/rocketmq_article/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ <h2 id="源码分析">源码分析</h2>

<div class="pagination__buttons">

<span class="button previous">
<a href="https://xujianhai.fun/posts/rocketmq_subs/">
<span class="button__icon"></span>
<span class="button__text">Rocketmq_subsconfig</span>
</a>
</span>



<span class="button next">
Expand Down
Loading

0 comments on commit 69849a6

Please sign in to comment.