Skip to content

Commit

Permalink
fix(rocketmq): add flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Mar 23, 2020
1 parent 72bd5c4 commit 37388a6
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 25 deletions.
28 changes: 13 additions & 15 deletions index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@
<description>Recent content on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Tue, 17 Mar 2020 18:18:05 +0800</lastBuildDate>
<lastBuildDate>Mon, 23 Mar 2020 22:31:59 +0800</lastBuildDate>

<atom:link href="https://xujianhai.fun/index.xml" rel="self" type="application/rss+xml" />


<item>
<title>Rocketmq_flow_control</title>
<link>https://xujianhai.fun/posts/rocketmq_flow_control/</link>
<pubDate>Mon, 23 Mar 2020 22:31:59 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_flow_control/</guid>
<description>背景 rocketmq推广过程中, 偶尔会遇到 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 201ms, size of queue: 5389 类似的报错, 导致上游业务失败率报警以及错误日志飙升. 在相应的监控上, rocketmq 的发送qps也是非常高.
原因 其实这个行为是 rocektmq broker 的自我保护机制, 那么什么时候会触发呢? 这个主要是在 store 进行put 消息的时候会触发. 之前讲过, 在 rocketmq 的处理机制中, netty 将读取到的消息 会封装成 RequestTask 对象提交到 executorService 的队列中, 然后等待 executorService 调度执行. 那么, 这里存在两种情况:
queue已经被写满了, 无法再提交新的任务, 那么会触发 RejectedExecutionException, 这个时候, rocketmq broker 会返回 RemotingSysResponseCode.SYSTEM_BUSY, 提示信息是: [OVERLOAD]. 参考: NettyRemotingAbstract#processRequestCommand
调度延迟的问题. 我在 11:05 提交了一个写入请求, 但是因为 写入流程耗时 增加, 导致我的请求到 11:06 才被处理, 对于实时在线业务而言, 这条消息其实早就超时了, 这种情况, rocketmq 有两套机制:</description>
</item>

<item>
<title>Rocketmq_msgid</title>
<link>https://xujianhai.fun/posts/rocketmq_msgid/</link>
Expand Down Expand Up @@ -117,19 +129,5 @@ protoc插件 protoc是支持插件的, 比如gogo-out其实就是去找gogo的
<description> reassignment 限流 摘要: 对 reassignment 的 replication 进行限流, 避免全局限流的导致isr落后的partition 的无法追上。这里的限流是动态的. 以前的通用的限流不进行废弃, 因为存在无isr而且 follower 用光带宽的时候, 这个限制还是需要的。新增加了两个配置: leader.reassignment.throttled.rate 和 follower.reassignment.throttled.rate, 前者是 leader broker 统一限流, 但是因为kafka reassignment 的 partition follower 可以有很多个, leader限流的话需要计算每个followerd的比例, 所以添加了 follower限流 kip: https://cwiki.apache.org/confluence/display/KAFKA/KIP-542%3A+Partition+Reassignment+Throttling replication quota 摘要: client 的限流是通过延迟实现的, replica原来也有类似的限流 replica.fetch.max.bytes , 不过是 partition 级别的。新的方案, 添加了 LeaderQuotaRate 和 FollowerQuotaRate. kip: https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas </description>
</item>

<item>
<title>Rocketmq Admin</title>
<link>https://xujianhai.fun/posts/rocketmq-admin/</link>
<pubDate>Sun, 17 Nov 2019 21:50:30 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq-admin/</guid>
<description>查看消费进度:
sh mqadmin consumerProgress -g $groupName -n ${ip:port} 查看客户端的连接信息
sh mqadmin consumerConnection -g $group -n ${ip:port} 查看topic状态
sh mqadmin topicStatus -t $topic -n ${ip:port} 按照时间重置offset
sh mqadmin resetOffsetByTime -t $topic -n ${ip:port} -g $group -s $ms 创建topic
sh mqadmin updateTopic -t $topic -n ${ip:port} -c $cluster -r 1 -w 1 -o true </description>
</item>

</channel>
</rss>
6 changes: 6 additions & 0 deletions posts/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ <h1>Posts</h1>
<div class="post-year">2020</div>

<ul class="posts-list">
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_flow_control/">
<span class="post-title">Rocketmq_flow_control</span>
<span class="post-day">Mar 23</span>
</a>
</li>
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_msgid/">
<span class="post-title">Rocketmq_msgid</span>
Expand Down
55 changes: 54 additions & 1 deletion posts/index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,62 @@
<description>Recent content in Posts on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Tue, 17 Mar 2020 18:18:05 +0800</lastBuildDate>
<lastBuildDate>Mon, 23 Mar 2020 22:31:59 +0800</lastBuildDate>
<atom:link href="https://xujianhai.fun/posts/index.xml" rel="self" type="application/rss+xml" />

<item>
<title>Rocketmq_flow_control</title>
<link>https://xujianhai.fun/posts/rocketmq_flow_control/</link>
<pubDate>Mon, 23 Mar 2020 22:31:59 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_flow_control/</guid>
<description>背景 rocketmq推广过程中, 偶尔会遇到 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 201ms, size of queue: 5389 类似的报错, 导致上游业务失败率报警以及错误日志飙升. 在相应的监控上, rocketmq 的发送qps也是非常高.
原因 其实这个行为是 rocektmq broker 的自我保护机制, 那么什么时候会触发呢? 这个主要是在 store 进行put 消息的时候会触发. 之前讲过, 在 rocketmq 的处理机制中, netty 将读取到的消息 会封装成 RequestTask 对象提交到 executorService 的队列中, 然后等待 executorService 调度执行. 那么, 这里存在两种情况:
queue已经被写满了, 无法再提交新的任务, 那么会触发 RejectedExecutionException, 这个时候, rocketmq broker 会返回 RemotingSysResponseCode.SYSTEM_BUSY, 提示信息是: [OVERLOAD]. 参考: NettyRemotingAbstract#processRequestCommand
调度延迟的问题. 我在 11:05 提交了一个写入请求, 但是因为 写入流程耗时 增加, 导致我的请求到 11:06 才被处理, 对于实时在线业务而言, 这条消息其实早就超时了, 这种情况, rocketmq 有两套机制:</description>
<content type="html"><![CDATA[<h2 id="背景">背景</h2>
<p>rocketmq推广过程中, 偶尔会遇到 <code>[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 201ms, size of queue: 5389</code> 类似的报错, 导致上游业务失败率报警以及错误日志飙升. 在相应的监控上, rocketmq 的发送qps也是非常高.</p>
<h2 id="原因">原因</h2>
<p>其实这个行为是 rocektmq broker 的自我保护机制, 那么什么时候会触发呢? 这个主要是在 store 进行put 消息的时候会触发. 之前讲过, 在 rocketmq 的处理机制中, netty 将读取到的消息 会封装成 RequestTask 对象提交到 executorService 的队列中, 然后等待 executorService 调度执行. 那么, 这里存在两种情况:</p>
<ol>
<li>
<p>queue已经被写满了, 无法再提交新的任务, 那么会触发 <code>RejectedExecutionException</code>, 这个时候, rocketmq broker 会返回 <code>RemotingSysResponseCode.SYSTEM_BUSY</code>, 提示信息是: <code>[OVERLOAD]</code>. 参考: <code>NettyRemotingAbstract#processRequestCommand</code></p>
</li>
<li>
<p>调度延迟的问题. 我在 11:05 提交了一个写入请求, 但是因为 写入流程耗时 增加, 导致我的请求到 11:06 才被处理, 对于实时在线业务而言, 这条消息其实早就超时了, 这种情况, rocketmq 有两套机制:</p>
<ul>
<li>
<p>2.1 定期检查queue, 会检查 RequestTask 的生成时间 和 当前时间的 差值, 如果超过了配置的超时时间, 就会返回 <code>RemotingSysResponseCode.SYSTEM_BUSY</code>, 提示 <code>TIMEOUT_CLEAN_QUEUE</code>, 参考 BrokerFastFailure#cleanExpiredRequestInQueue</p>
</li>
<li>
<p>2.2 对于 send 请求, 会检查当前系统是否 <code>isOSPageCacheBusy</code>, 如果 true, 就会拿取 queue 的第一个 RequestTask 返回 <code>RemotingSysResponseCode.SYSTEM_BUSY</code>, 提示 <code>PCBUSY_CLEAN_QUEUE</code>, 直到 判断是 false. 那么, isOSPageCacheBusy 的判断逻辑是什么呢? 如下:</p>
</li>
</ul>
</li>
</ol>
<pre><code>@Override
public boolean isOSPageCacheBusy() {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
return diff &lt; 10000000
&amp;&amp; diff &gt; this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
</code></pre><p>getOsPageCacheBusyTimeOutMills默认是1000ms, 可以发现, 这里diff 需要在 (1000, 10_000_000) 的区间, 也就是commitlog 最近的一条写入大于1s的情况 (beginTimeInLock 在写入的时候会被设置当前时间戳, 但是在写入成功后会被重置为0), diff 的 上界为了区别于 没有写入的情况(没有写入, beginTimeInLock就是0, diff=this.systemClock.now() ).</p>
<ol start="3">
<li>
<p>写入逻辑之前会进行一次 <code>isOSPageCacheBusy</code>的检查, true 则设置 PutMessageResult 状态 <code>OS_PAGECACHE_BUSY</code>, 返回 <code>ResponseCode.SYSTEM_ERROR</code>, 提示 <code>[PC_SYNCHRONIZED]</code></p>
</li>
<li>
<p>为了避免 pageCache busy 场景下请求的无效投递到queue, 在提交queue之前, 会检查条件 <code>rejectRequest</code>, 判断条件是 <code>this.brokerController.getMessageStore().isOSPageCacheBusy() ||this.brokerController.getMessageStore().isTransientStorePoolDeficient()</code>, 其中, <code>isTransientStorePoolDeficient</code> 已经被弃用, isOSPageCacheBusy 参见上文.</p>
</li>
</ol>
<p>综合上面的分析, 因为我们业务的qps很高, 导致了 <code>TIMEOUT_CLEAN_QUEUE</code> 的提示, 是因为 处理写入时间 过长</p>
<h2 id="解决">解决</h2>
<p>扩容, 降低broker的热点负载</p>
]]></content>
</item>

<item>
<title>Rocketmq_msgid</title>
<link>https://xujianhai.fun/posts/rocketmq_msgid/</link>
Expand Down
Loading

0 comments on commit 37388a6

Please sign in to comment.