Skip to content

Commit

Permalink
feat(rocketmq): add recover
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Mar 27, 2020
1 parent e247563 commit d5361c0
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 21 deletions.
62 changes: 53 additions & 9 deletions posts/index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,21 +238,65 @@ logicsMsgTimestamp 在定时flush consumer queue 以及 追加consumer queue消
</ul>
<p><strong>flush</strong></p>
<ul>
<li>rocketmq 正常关机</li>
<li>rocketmq 正常关闭 (这里会触发两次刷新, DefaultMessageStore#shutdown 会分别调用 storeCheckpoint#flush storeCheckpoint#shutdown)</li>
<li>在indexFile切换的时候, 不仅仅会flush 之前的 index file, 还会触发 checkpoint file flush</li>
<li>conumer queue 定时刷新的时候, 除了更新 logicsMsgTimestamp, 也会触发 checkpoint file flush</li>
</ul>
<p>刷新: shutdown
周期刷新: flushIntervalConsumeQueue = 1000;
private int flushConsumeQueueThoroughInterval = 1000 * 60; ?</p>
<h3 id="正常重启">正常重启</h3>
<p>commitlog 恢复
consumerqueue 恢复</p>
<p>rocketmq 每次启动的时候, 会在存储根目录下面新建一个 abort 文件, 如果是正常关闭, 那么在shutdown的时候会删除 abort 文件, 如果是异常宕机 (断电、进程强杀等), abort 文件就会一直存放在那里. 因此在启动的时候, DefaultMessageStore 就会检查是否存在 abort 文件, 判断是正常启动 还是 异常启动.</p>
<p>恢复过程中, 需要区分两种情况, commitlog文件可能已经完全被删除了, 这个时候需要将 consumer queue文件也全部删除
常规恢复的场景中, 主要分为三步骤:</p>
<ol>
<li>恢复 consumer queue, 获取 consumer queue 中最大的 offset</li>
<li>恢复 取commitlog, 最大的 processOffset, 对 consumer queue 的多出的数据文件进行 截断</li>
<li>根据 commitlog 恢复 consumer queue 的消息</li>
</ol>
<p>1-3参考 DefaultMessageStore#recover、 ConsumeQueue#recover、CommitLog#recoverNormally, 4 的逻辑在 DefaultMessageStore#start</p>
<p><strong>流程1</strong>
从倒数3个文件开始恢复 (不足三个, 从第一个文件开始, 倒数第三个 是一个经验数值), 找到最大的 consumer queue的 processOffset, 设置元信息: flushedWhere 和 committedWhere; 然后遍历文件进行 truncate:</p>
<pre><code>文件起始offset 大于 processOffset 的直接删除,
文件结束offset 小于 processOffset 不处理,
offset 位于 文件内部的情况, 设置文件的 元信息
</code></pre><p>需要注意的是, 因为 rocketmq 支持 tag ext 扩展文件, 因此在恢复的时候, 也会对 tag ext 恢复. 这里不赘述</p>
<p><strong>流程2</strong></p>
<p>从倒数3个文件开始恢复 (不足三个, 从第一个文件开始), 通过检查文件内容是否合法确定最后的写入位置: <code>processOffset</code>, 这里检查文件内容合法的方法比较特殊: 读取文件内容并构建一个 dispatchRequest. 根据 processOffset 设置元信息: flushedWhere 和 committedWhere, 遍历文件进行 truncate, 逻辑和 consumer queue的一样.</p>
<p>和 consumer queue 文件恢复不同的地方在于, commitlog 需要对consumer queue的文件内容进行 “纠偏”. 因为 consumer queue的数据都是从 commitlog 构建的, 因此 需要确保consumer queue的数据在 commitlog 全都要找到, 因此在恢复的时候, 需要根据 commitlog 的 processOffset 对 consumer queue 进行截断 和 元信息重置</p>
<ul>
<li>需要注意的是, 在读取文件如何区分 读取到文件末尾的情况?</li>
</ul>
<p>commitlog在写入数据的时候, 会进行判断 msgLen + 8() &gt; 剩余文件空间, 如果true, 那么, 就会放弃在这个文件写入, 会轮转到下一个文件写入, 同时在这个文件的末尾写入 totalSize(int=4 byte) + CommitLog.BLANK_MAGIC_CODE(int=4 byte)</p>
<p><strong>流程3</strong></p>
<p>完成上面两个流程, 基本上保证了 commitlog 和 consumer queue 文件的正确性. 但是这里存在一个问题, 可能consumer queue的数据 少于commitlog, 因为 构建consumer queue 速度慢于 commitlog 或者 consumer queue 文件被删除 或者新启的broker copy了别的机器的commitlog. 那么就需要一个机制 将commitlog 中的数据 重新构建到 consumer queue, 流程3 就是做了这件事情</p>
<p>通过获取 consumer queue最大的 processOffset, 然后从 commitlog 的 processOffset 点位进行构建工作.</p>
<h3 id="异常宕机">异常宕机</h3>
<p>commitlog 恢复</p>
<p>consumerqueue 恢复</p>
<p>重复信息?</p>
<p>异常宕机相比于正常关闭, 需要借助 checkpoint 文件进行恢复.</p>
<p>整体流程和正常恢复差异不大, 依旧是上面上个流程</p>
<ol>
<li>恢复 consumer queue, 获取 consumer queue 中最大的 offset</li>
<li>恢复 取commitlog, 最大的 processOffset, 对 consumer queue 的多出的数据文件进行 截断</li>
<li>根据 commitlog 恢复 consumer queue 的消息</li>
</ol>
<p>唯一的不同, 在于第二步骤, 因为是异常宕机, 所以不能从倒数第三个, 需要一个可以 check的时间点(minTime) 进行恢复, 这个时间点之前的文件是可以认为是正确的, 时间点之后的文件开始恢复. 根据配置的不同, 有两种选择:</p>
<ol>
<li>需要根据checkpoint记录的 physicMsgTimestamp 和 logicsMsgTimestamp 的最小值开始恢复.</li>
<li>
<ol>
<li>配置 MessageStoreConfig#messageIndexSafe = ture (默认false) 和 messageIndexEnable = true (默认true) 的时候, 会以 physicMsgTimestamp logicsMsgTimestamp indexMsgTimestamp 的最小开始恢复</li>
</ol>
</li>
</ol>
<p>确定了时间点之后, 倒序找到第一个文件存储时间小于 minTime的文件, 从这个文件开始, 不断读取消息 并执行 dispatcher 责任链逻辑: 构建consumer queue、构建index message. 剩下的逻辑: 重置元信息、截断 consumer queue 和正常恢复一样</p>
<p>需要注意的是, 这里提供了 index message 的minTime 机制, 对于一些依赖 index 逻辑的场景, 还是很有必要的</p>
<h2 id="总结">总结</h2>
<ol>
<li>异常恢复需要重建 consumer queue 和 index message</li>
<li>恢复过程需要确保 consumer queue 信息和 commitlog 对齐, 不能多(截断)、不能少(重新reinput)</li>
<li>依赖index message的场景, 需要开启 MessageStoreConfig#messageIndexSafe=true, 确保index 的完整性</li>
</ol>
<h2 id="思考">思考</h2>
<ol>
<li>代码冗余度很高, 需要优化下</li>
</ol>
]]></content>
</item>

Expand Down
68 changes: 56 additions & 12 deletions posts/rocketmq_recover/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
logicsMsgTimestamp 在定时flush consumer queue 以及 追加consumer queue消息的时候, 都会更新. (因此, logicsMsgTimestamp 并不是 consumer queue flush的时间)">
<meta itemprop="datePublished" content="2020-03-24T23:43:23&#43;08:00" />
<meta itemprop="dateModified" content="2020-03-24T23:43:23&#43;08:00" />
<meta itemprop="wordCount" content="119">
<meta itemprop="wordCount" content="427">
<meta itemprop="image" content="https://xujianhai.fun/"/>


Expand Down Expand Up @@ -128,7 +128,7 @@
<main class="post">

<div class="post-info">
<p><svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" class="feather feather-clock"><circle cx="12" cy="12" r="10"></circle><polyline points="12 6 12 12 16 14"></polyline></svg>One minute
<p><svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" class="feather feather-clock"><circle cx="12" cy="12" r="10"></circle><polyline points="12 6 12 12 16 14"></polyline></svg>3 minutes



Expand Down Expand Up @@ -167,21 +167,65 @@ <h3 id="前菜">前菜</h3>
</ul>
<p><strong>flush</strong></p>
<ul>
<li>rocketmq 正常关机</li>
<li>rocketmq 正常关闭 (这里会触发两次刷新, DefaultMessageStore#shutdown 会分别调用 storeCheckpoint#flush storeCheckpoint#shutdown)</li>
<li>在indexFile切换的时候, 不仅仅会flush 之前的 index file, 还会触发 checkpoint file flush</li>
<li>conumer queue 定时刷新的时候, 除了更新 logicsMsgTimestamp, 也会触发 checkpoint file flush</li>
</ul>
<p>刷新: shutdown
周期刷新: flushIntervalConsumeQueue = 1000;
private int flushConsumeQueueThoroughInterval = 1000 * 60; ?</p>
<h3 id="正常重启">正常重启</h3>
<p>commitlog 恢复
consumerqueue 恢复</p>
<p>rocketmq 每次启动的时候, 会在存储根目录下面新建一个 abort 文件, 如果是正常关闭, 那么在shutdown的时候会删除 abort 文件, 如果是异常宕机 (断电、进程强杀等), abort 文件就会一直存放在那里. 因此在启动的时候, DefaultMessageStore 就会检查是否存在 abort 文件, 判断是正常启动 还是 异常启动.</p>
<p>恢复过程中, 需要区分两种情况, commitlog文件可能已经完全被删除了, 这个时候需要将 consumer queue文件也全部删除
常规恢复的场景中, 主要分为三步骤:</p>
<ol>
<li>恢复 consumer queue, 获取 consumer queue 中最大的 offset</li>
<li>恢复 取commitlog, 最大的 processOffset, 对 consumer queue 的多出的数据文件进行 截断</li>
<li>根据 commitlog 恢复 consumer queue 的消息</li>
</ol>
<p>1-3参考 DefaultMessageStore#recover、 ConsumeQueue#recover、CommitLog#recoverNormally, 4 的逻辑在 DefaultMessageStore#start</p>
<p><strong>流程1</strong>
从倒数3个文件开始恢复 (不足三个, 从第一个文件开始, 倒数第三个 是一个经验数值), 找到最大的 consumer queue的 processOffset, 设置元信息: flushedWhere 和 committedWhere; 然后遍历文件进行 truncate:</p>
<pre><code>文件起始offset 大于 processOffset 的直接删除,
文件结束offset 小于 processOffset 不处理,
offset 位于 文件内部的情况, 设置文件的 元信息
</code></pre><p>需要注意的是, 因为 rocketmq 支持 tag ext 扩展文件, 因此在恢复的时候, 也会对 tag ext 恢复. 这里不赘述</p>
<p><strong>流程2</strong></p>
<p>从倒数3个文件开始恢复 (不足三个, 从第一个文件开始), 通过检查文件内容是否合法确定最后的写入位置: <code>processOffset</code>, 这里检查文件内容合法的方法比较特殊: 读取文件内容并构建一个 dispatchRequest. 根据 processOffset 设置元信息: flushedWhere 和 committedWhere, 遍历文件进行 truncate, 逻辑和 consumer queue的一样.</p>
<p>和 consumer queue 文件恢复不同的地方在于, commitlog 需要对consumer queue的文件内容进行 “纠偏”. 因为 consumer queue的数据都是从 commitlog 构建的, 因此 需要确保consumer queue的数据在 commitlog 全都要找到, 因此在恢复的时候, 需要根据 commitlog 的 processOffset 对 consumer queue 进行截断 和 元信息重置</p>
<ul>
<li>需要注意的是, 在读取文件如何区分 读取到文件末尾的情况?</li>
</ul>
<p>commitlog在写入数据的时候, 会进行判断 msgLen + 8() &gt; 剩余文件空间, 如果true, 那么, 就会放弃在这个文件写入, 会轮转到下一个文件写入, 同时在这个文件的末尾写入 totalSize(int=4 byte) + CommitLog.BLANK_MAGIC_CODE(int=4 byte)</p>
<p><strong>流程3</strong></p>
<p>完成上面两个流程, 基本上保证了 commitlog 和 consumer queue 文件的正确性. 但是这里存在一个问题, 可能consumer queue的数据 少于commitlog, 因为 构建consumer queue 速度慢于 commitlog 或者 consumer queue 文件被删除 或者新启的broker copy了别的机器的commitlog. 那么就需要一个机制 将commitlog 中的数据 重新构建到 consumer queue, 流程3 就是做了这件事情</p>
<p>通过获取 consumer queue最大的 processOffset, 然后从 commitlog 的 processOffset 点位进行构建工作.</p>
<h3 id="异常宕机">异常宕机</h3>
<p>commitlog 恢复</p>
<p>consumerqueue 恢复</p>
<p>重复信息?</p>
<p>异常宕机相比于正常关闭, 需要借助 checkpoint 文件进行恢复.</p>
<p>整体流程和正常恢复差异不大, 依旧是上面上个流程</p>
<ol>
<li>恢复 consumer queue, 获取 consumer queue 中最大的 offset</li>
<li>恢复 取commitlog, 最大的 processOffset, 对 consumer queue 的多出的数据文件进行 截断</li>
<li>根据 commitlog 恢复 consumer queue 的消息</li>
</ol>
<p>唯一的不同, 在于第二步骤, 因为是异常宕机, 所以不能从倒数第三个, 需要一个可以 check的时间点(minTime) 进行恢复, 这个时间点之前的文件是可以认为是正确的, 时间点之后的文件开始恢复. 根据配置的不同, 有两种选择:</p>
<ol>
<li>需要根据checkpoint记录的 physicMsgTimestamp 和 logicsMsgTimestamp 的最小值开始恢复.</li>
<li>
<ol>
<li>配置 MessageStoreConfig#messageIndexSafe = ture (默认false) 和 messageIndexEnable = true (默认true) 的时候, 会以 physicMsgTimestamp logicsMsgTimestamp indexMsgTimestamp 的最小开始恢复</li>
</ol>
</li>
</ol>
<p>确定了时间点之后, 倒序找到第一个文件存储时间小于 minTime的文件, 从这个文件开始, 不断读取消息 并执行 dispatcher 责任链逻辑: 构建consumer queue、构建index message. 剩下的逻辑: 重置元信息、截断 consumer queue 和正常恢复一样</p>
<p>需要注意的是, 这里提供了 index message 的minTime 机制, 对于一些依赖 index 逻辑的场景, 还是很有必要的</p>
<h2 id="总结">总结</h2>
<ol>
<li>异常恢复需要重建 consumer queue 和 index message</li>
<li>恢复过程需要确保 consumer queue 信息和 commitlog 对齐, 不能多(截断)、不能少(重新reinput)</li>
<li>依赖index message的场景, 需要开启 MessageStoreConfig#messageIndexSafe=true, 确保index 的完整性</li>
</ol>
<h2 id="思考">思考</h2>
<ol>
<li>代码冗余度很高, 需要优化下</li>
</ol>

</div>
</article>
Expand All @@ -193,7 +237,7 @@ <h2 id="思考">思考</h2>
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" class="feather feather-tag meta-icon"><path d="M20.59 13.41l-7.17 7.17a2 2 0 0 1-2.83 0L2 12V2h10l8.59 8.59a2 2 0 0 1 0 2.82z"></path><line x1="7" y1="7" x2="7" y2="7"></line></svg><span class="tag"><a href="https://xujianhai.fun/tags/rocketmq">rocketmq</a></span>
</p>

<p><svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" class="feather feather-file-text"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"></path><polyline points="14 2 14 8 20 8"></polyline><line x1="16" y1="13" x2="8" y2="13"></line><line x1="16" y1="17" x2="8" y2="17"></line><polyline points="10 9 9 9 8 9"></polyline></svg>119 Words</p>
<p><svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" class="feather feather-file-text"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"></path><polyline points="14 2 14 8 20 8"></polyline><line x1="16" y1="13" x2="8" y2="13"></line><line x1="16" y1="17" x2="8" y2="17"></line><polyline points="10 9 9 9 8 9"></polyline></svg>427 Words</p>

<p><svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" class="feather feather-calendar"><rect x="3" y="4" width="18" height="18" rx="2" ry="2"></rect><line x1="16" y1="2" x2="16" y2="6"></line><line x1="8" y1="2" x2="8" y2="6"></line><line x1="3" y1="10" x2="21" y2="10"></line></svg>2020-03-24 23:43 &#43;0800</p>
</div>
Expand Down

0 comments on commit d5361c0

Please sign in to comment.