Skip to content

Commit

Permalink
feat(rocketmq): add no message
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Mar 26, 2020
1 parent cce583e commit e247563
Show file tree
Hide file tree
Showing 13 changed files with 811 additions and 46 deletions.
61 changes: 33 additions & 28 deletions index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,28 @@
<description>Recent content on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Wed, 25 Mar 2020 18:22:56 +0800</lastBuildDate>
<lastBuildDate>Thu, 26 Mar 2020 23:33:04 +0800</lastBuildDate>

<atom:link href="https://xujianhai.fun/index.xml" rel="self" type="application/rss+xml" />


<item>
<title>Rocketmq_nomessage</title>
<link>https://xujianhai.fun/posts/rocketmq_nomessage/</link>
<pubDate>Thu, 26 Mar 2020 23:33:04 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_nomessage/</guid>
<description>背景 最近在修rocketmq-golang-client的问题的时候, 发现在处理 PullNoNewMsg 的时候会导致 offset 被自动提交, 但是用户并没有设置自动ack, 并且也没有手动ack
注:
rocketmq 开源的版本并没有ack的概念
排查 于是, 通过日志打印调试, 发现是在 rocketmq-client-go 拉取消息处理 primitive.PullNoNewMsg 的状态的时候, 直接将 result.NextBeginOffset 替换为 request.nextOffset, 并且还 更新了本地offsetStore的offset 信息, 因为 rocektmq-client-go 是 周期性提交offset, 所以导致了 offset被ack 了
解决 在rocketmq-client-go的内部开发版本中, 直接将 offset 的本地存储更新给注释掉就可以了, 因为内部开发中, 是异步处理处理消息的, 并且offset的提交不需要满足递增的特性 (考虑到很多场景中可能存在 offset被移动到 更小的情况)
在开源的版本中, 对齐java的实现, 判断 processQueue是否有消息, 如果没有消息, 在更新本地offsetStore, 避免提交了 正在消费的消息
更多的理解 乘这次机会, 重新梳理了 rocketmq 在 pullMessage 的响应逻辑的处理. 根据客户端处理的逻辑, 区分如下 (不涉及到transaction)
1.NO_NEW_MSG
当broker返回 ResponseCode.PULL_NOT_FOUND 的时候, 客户端会转义成 PullStatus.NO_NEW_MSG, 会执行如下操作:</description>
</item>

<item>
<title>Rocketmq_search</title>
<link>https://xujianhai.fun/posts/rocketmq_search/</link>
Expand All @@ -26,6 +43,21 @@ msgkey
在消息发送的时候, 发送的消息是可以指定消息的key的, 需要注意的是, msgKey可以设置多个.</description>
</item>

<item>
<title>Rocketmq_recover</title>
<link>https://xujianhai.fun/posts/rocketmq_recover/</link>
<pubDate>Tue, 24 Mar 2020 23:43:23 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_recover/</guid>
<description>背景 最近要开发延迟消息, 这里记录下 recover相关的逻辑实现
原理 之前知道, rocketmq是所有的消息统一投递到 commitlog, 然后异步构建 consumer queue, 那么, 如果机器正常重启/异常宕机的情况下, 又是怎么恢复的呢?
前菜 rocketmq 使用了 checkpoint 文件记录了 physicMsgTimestamp logicsMsgTimestamp indexMsgTimestamp 三个字段, 分别表示 commitlog 的flush的时间点、comsumer queue的flush的时间点、index file 刷新的时间点. 也就是 已经落地磁盘的时间点. (通过fileChannel#force)
那么 这些时间点什么场景下会被更新, 什么时候checkpoint会flush呢?
physicMsgTimestamp 首先, CommitLog 本身既有一个定时flush的任务, 根据flush方式的不同, 有两种实现: GroupCommitService 和 FlushRealTimeService(后面单独分析), 无论是同步还是异步, 每次flush之后都会设置 physicMsgTimestamp.
除此之外, 在 dledger模式中, slave构建 consumer queue的时候 也会设置 physicMsgTimestamp
logicsMsgTimestamp 在定时flush consumer queue 以及 追加consumer queue消息的时候, 都会更新. (因此, logicsMsgTimestamp 并不是 consumer queue flush的时间)</description>
</item>

<item>
<title>Rocketmq_flow_control</title>
<link>https://xujianhai.fun/posts/rocketmq_flow_control/</link>
Expand Down Expand Up @@ -108,32 +140,5 @@ proposal</description>
这个需要讲到 GOMAXPROCS 的参数了, 这个参数规定了 P 的最大数量, 默认取值是 cpu数量, 通过设置 最大并行度(GOMAXPROCS) 为 cpu 数量, 可以充分利用每个cpu, 避免线程切换间的代价. 如果说将 GOMAXPROCS 设置成了128, 首先并行执行go代码的线程数膨胀, 但是由于 k8s 容器对于cpu的约束,导致只有 16个cpu 运行 128个线程 (至少128个, 因为系统调用的线程是不受 GOMAXPROCS 约束的)</description>
</item>

<item>
<title>Rocketmq Heartbeat Timeout</title>
<link>https://xujianhai.fun/posts/rocketmq-heartbeat-timeout/</link>
<pubDate>Sat, 07 Mar 2020 09:35:43 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq-heartbeat-timeout/</guid>
<description>背景 最近线上发现了一些报警: &amp;ldquo;send heart beat to broker add: xxx error: request timeout&amp;rdquo;, 同时伴随着服务重启, 会出现consumer 流量短时间降低, 同时 consumer的连接创建也很缓慢
排查 通过关键字匹配, 发现这个是 rocektmq-golang-sdk 的一处错误打印, 是心跳命令请求broker超时的场景下打印的
既然是请求rocketmq超时了, 直接登录到线上rocketmq broker查看负载, 但是通过top执行发现cpu和内存占比都比较正常, 同时 netstat -anp | grep pid 扫描的socket的数量也只有几千个,没有异常点.
没有线索的情况下, 我们继续排查日志内容, 通过 tailf broker.log 一段时间后, 发现有一些类似 &amp;ldquo;event queue size 10000 enough, so drop this event CLOSE&amp;rdquo; 和 &amp;ldquo;event queue size 10000 enough, so drop this event CONNECT&amp;rdquo; 的日志, 同样进行关键字匹配, 发现这段逻辑是 rocketmq 对event的抽象处理, event比如: CONNECT/CLOSE/IDLE/EXCEPTION, 以 CLOSE 为例, 当rocketmq netty server 监听到 主动关闭或者被动关闭 连接的时候, 会实例化一个 CLOSE 类型的 event信息 投递到了 eventQueue, 这个 eventQueue 大小是 1w, 当大小大于 1w 的时候, 就会投递失败 (这里有个坑), eventQueue 投递后的消息是由一个单线程异步处理的, 线程会回调根据注册的listener进行回调, 这一块逻辑参考 NettyRemotingServer#channelInactive、NettyRemotingAbstract#putNettyEvent 和 NettyRemotingAbstract#run, 注册的回调逻辑实现在 ClientHousekeepingService#onChannelClose, 回调都做了什么事情呢?</description>
</item>

<item>
<title>Protobuf</title>
<link>https://xujianhai.fun/posts/protobuf/</link>
<pubDate>Sat, 11 Jan 2020 15:16:40 +0800</pubDate>

<guid>https://xujianhai.fun/posts/protobuf/</guid>
<description>gogo-protobuf 扩展了 protobuf 的使用姿势, 不仅添加了丰富的插件: string/euqla/marshal/unmarshal, 性能上还进行了优化. gogo-protobuf 的插件体系相对于原生的protobuf-go的插件实现(虽然只有一个grpc), 不仅丰富, 而且支持开关. 开关是借助于 描述符中extension 实现的.
gogo-protobuf 因为支持的插件体系比较多, 为此, 将插件分成了几种启用级别, 对外是不同的使用入口. 比如 protoc-gen-gogofast、protoc-gen-gogofaster、protoc-gen-gogoslick. 除了通过不同的入口, 还可以通过不同proto文件的参数定制, 比如 option (gogoproto.gostring_all) = true; 实现给每个message添加string的方法.
补充: extension extension 是 proto2 中支持的语法, 在新的pb文件中, 使用了 Any 进行了替代. 更多关于extension可以参考: https://developers.google.com/protocol-buffers/docs/proto#extensions, Any可以参照 https://developers.google.com/protocol-buffers/docs/proto3#any . 但是在 gogo的使用实现中, 还是用 extend 机制, proto2 用extend, proto3 使用本地登记的方式
2.validator
validator插件 https://github.com/mwitkow/go-proto-validators 提供了字段检查的功能, 会根据proto文件生成goalng validator代码文件.
protoc插件 protoc是支持插件的, 比如gogo-out其实就是去找gogo的插件, govalidators_out就是找 govalidators插件
其他深入的点 union group 类型.</description>
</item>

</channel>
</rss>
12 changes: 12 additions & 0 deletions posts/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,24 @@ <h1>Posts</h1>
<div class="post-year">2020</div>

<ul class="posts-list">
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_nomessage/">
<span class="post-title">Rocketmq_nomessage</span>
<span class="post-day">Mar 26</span>
</a>
</li>
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_search/">
<span class="post-title">Rocketmq_search</span>
<span class="post-day">Mar 25</span>
</a>
</li>
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_recover/">
<span class="post-title">Rocketmq_recover</span>
<span class="post-day">Mar 24</span>
</a>
</li>
<li class="post-item">
<a href="https://xujianhai.fun/posts/rocketmq_flow_control/">
<span class="post-title">Rocketmq_flow_control</span>
Expand Down
Loading

0 comments on commit e247563

Please sign in to comment.