Skip to content

Commit

Permalink
feat(rocketmq): add ha
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed May 6, 2020
1 parent d4403e6 commit 5a20d11
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 42 deletions.
24 changes: 11 additions & 13 deletions index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,21 @@
<description>Recent content on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Tue, 28 Apr 2020 15:40:34 +0800</lastBuildDate>
<lastBuildDate>Mon, 04 May 2020 16:02:11 +0800</lastBuildDate>

<atom:link href="https://xujianhai.fun/index.xml" rel="self" type="application/rss+xml" />


<item>
<title>Rmq_ha</title>
<link>https://xujianhai.fun/posts/rmq_ha/</link>
<pubDate>Mon, 04 May 2020 16:02:11 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rmq_ha/</guid>
<description>Preface 之前分析了 基于raft的DLedger实现, 这里分析下老版本的master-slave 主从复制 以及 刷盘机制
抽象 GroupCommitRequest: 如果是 半同步, 即MASTER_SYNC 模式, 会对写入请求封装成 GroupCommitRequest 触发同步 HAService$GroupTransferService: 用来检查 GroupCommitRequest 是否同步成功/超时. 同步操作依赖底层统一的数据同步实现 HAService$HAClient: slave用来创建和master的连接, 上报 offset 和 获取写入数据到 commitlog HAService$AcceptSocketService: 用来接收slave创建的连接, linux平台使用epoll, 只监听accept事件 HAConnection: master上表示slave创建的一个连接 HAConnection$ReadSocketService: 专门负责读取 slave 提交的ackOffset 的线程, 只监听read事件 HAConnection$WriteSocketService: 负责发送同步数据, 也会在数据包中包含心跳的头(间歇), 只监听write事件 replicaRequest 流程 master 因为后面的版本支持了 future 模式, 因此 入口有两个, 分别对应 CommitLog#putMessage 和 CommitLog#asyncPutMessage, 最终都会进入 CommitLog#submitReplicaRequest. replicaRequest 提交并不会触发任何同步行为, 因为同步本身是异步线程进行的, 提交的 replicaRequest 仅仅用来 检测 slave同步是否成功/超时, 相当于在另一个线程排队 (消息是顺序写入commitlog的, 因为replicaRequest 自带了顺序特性).</description>
</item>

<item>
<title>Tcp_user_timeout</title>
<link>https://xujianhai.fun/posts/tcp_user_timeout/</link>
Expand Down Expand Up @@ -142,17 +152,5 @@ rocketmq 开源的版本并没有ack的概念
当broker返回 ResponseCode.PULL_NOT_FOUND 的时候, 客户端会转义成 PullStatus.NO_NEW_MSG, 会执行如下操作:</description>
</item>

<item>
<title>Rocketmq_dledger</title>
<link>https://xujianhai.fun/posts/rocketmq_dledger/</link>
<pubDate>Thu, 26 Mar 2020 17:49:31 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rocketmq_dledger/</guid>
<description>Preface RocketMQ 去年实现了基于raft协议的 commitlog 存储库, 解决master-slave架构下 人工选主、切主 的故障转移的运维负担, 以及故障转移过程中数据丢失的问题.
raft 写入 raft 协议的写入如下:
1. client -&amp;gt; leader: 客户端请求leader写入kv 2. leader append 本地日志(commitlog) 3. leader 并行发送日志给 follower 4. follower收到日志, 写入本地 commit log, 并 apply 本地的 FSM, 返回成功给 leader 5. leader收到follower超过半数以上的成功响应, 本地apply 日志到 FSM. 读取 raft 协议的读取如下:
1. client -&amp;gt; leader: 客户端请求leader写入kv 2. leader 通过 lease 检查自己是否是 leader, 3. 检查是leader的情况下, 检查本地 apply index 和 客户端的 index, apply index大的话, 读取本地的状态机的数据 + apply index返回 4.</description>
</item>

</channel>
</rss>
6 changes: 6 additions & 0 deletions posts/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ <h1>Posts</h1>
<div class="post-year">2020</div>

<ul class="posts-list">
<li class="post-item">
<a href="https://xujianhai.fun/posts/rmq_ha/">
<span class="post-title">Rmq_ha</span>
<span class="post-day">May 4</span>
</a>
</li>
<li class="post-item">
<a href="https://xujianhai.fun/posts/tcp_user_timeout/">
<span class="post-title">Tcp_user_timeout</span>
Expand Down
66 changes: 65 additions & 1 deletion posts/index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,73 @@
<description>Recent content in Posts on zero.xu blog</description>
<generator>Hugo -- gohugo.io</generator>
<language>en</language>
<lastBuildDate>Tue, 28 Apr 2020 15:40:34 +0800</lastBuildDate>
<lastBuildDate>Mon, 04 May 2020 16:02:11 +0800</lastBuildDate>
<atom:link href="https://xujianhai.fun/posts/index.xml" rel="self" type="application/rss+xml" />

<item>
<title>Rmq_ha</title>
<link>https://xujianhai.fun/posts/rmq_ha/</link>
<pubDate>Mon, 04 May 2020 16:02:11 +0800</pubDate>

<guid>https://xujianhai.fun/posts/rmq_ha/</guid>
<description>Preface 之前分析了 基于raft的DLedger实现, 这里分析下老版本的master-slave 主从复制 以及 刷盘机制
抽象 GroupCommitRequest: 如果是 半同步, 即MASTER_SYNC 模式, 会对写入请求封装成 GroupCommitRequest 触发同步 HAService$GroupTransferService: 用来检查 GroupCommitRequest 是否同步成功/超时. 同步操作依赖底层统一的数据同步实现 HAService$HAClient: slave用来创建和master的连接, 上报 offset 和 获取写入数据到 commitlog HAService$AcceptSocketService: 用来接收slave创建的连接, linux平台使用epoll, 只监听accept事件 HAConnection: master上表示slave创建的一个连接 HAConnection$ReadSocketService: 专门负责读取 slave 提交的ackOffset 的线程, 只监听read事件 HAConnection$WriteSocketService: 负责发送同步数据, 也会在数据包中包含心跳的头(间歇), 只监听write事件 replicaRequest 流程 master 因为后面的版本支持了 future 模式, 因此 入口有两个, 分别对应 CommitLog#putMessage 和 CommitLog#asyncPutMessage, 最终都会进入 CommitLog#submitReplicaRequest. replicaRequest 提交并不会触发任何同步行为, 因为同步本身是异步线程进行的, 提交的 replicaRequest 仅仅用来 检测 slave同步是否成功/超时, 相当于在另一个线程排队 (消息是顺序写入commitlog的, 因为replicaRequest 自带了顺序特性).</description>
<content type="html"><![CDATA[<h2 id="preface">Preface</h2>
<p>之前分析了 基于raft的DLedger实现, 这里分析下老版本的master-slave 主从复制 以及 刷盘机制</p>
<h3 id="抽象">抽象</h3>
<ul>
<li>GroupCommitRequest: 如果是 半同步, 即MASTER_SYNC 模式, 会对写入请求封装成 GroupCommitRequest 触发同步</li>
<li>HAService$GroupTransferService: 用来检查 GroupCommitRequest 是否同步成功/超时. 同步操作依赖底层统一的数据同步实现</li>
<li>HAService$HAClient: slave用来创建和master的连接, 上报 offset 和 获取写入数据到 commitlog</li>
<li>HAService$AcceptSocketService: 用来接收slave创建的连接, linux平台使用epoll, 只监听accept事件</li>
<li>HAConnection: master上表示slave创建的一个连接</li>
<li>HAConnection$ReadSocketService: 专门负责读取 slave 提交的ackOffset 的线程, 只监听read事件</li>
<li>HAConnection$WriteSocketService: 负责发送同步数据, 也会在数据包中包含心跳的头(间歇), 只监听write事件</li>
</ul>
<h3 id="replicarequest-流程">replicaRequest 流程</h3>
<p>master 因为后面的版本支持了 future 模式, 因此 入口有两个, 分别对应 CommitLog#putMessage 和 CommitLog#asyncPutMessage, 最终都会进入 <code>CommitLog#submitReplicaRequest</code>. replicaRequest 提交并不会触发任何同步行为, 因为同步本身是异步线程进行的, 提交的 replicaRequest 仅仅用来 检测 slave同步是否成功/超时, 相当于在另一个线程排队 (消息是顺序写入commitlog的, 因为replicaRequest 自带了顺序特性).</p>
<p>replicaRequest提交处理流程:</p>
<pre><code>CommitLog#submitReplicaRequest -&gt; HAService#putRequest -&gt; HAService$GroupTransferService#run -&gt; #doWaitTransfer -&gt; GroupCommitRequest#wakeupCustomer 用于通知commitlog 是否已经同步完成
</code></pre><p>判断是否已经同步给slave, 只是简单判断了下 <code>HAService.this.push2SlaveMaxOffset.get() &gt;= req.getNextOffset()</code>, 也就是说 只要有一个slave 同步了消息, 同步就成功了.</p>
<h3 id="数据同步流程">数据同步流程</h3>
<p>master 写入流程, master 会启动一个 AcceptSocketService 接受客户端的连接, 每一个客户端slave连接用 HAConnection 对象表示, 对应读写分别启动 WriteSocketService 和 ReadSocketService 两个对象.</p>
<h4 id="master-readsocketservice">master ReadSocketService</h4>
<p>ReadSocketService 用来读取 slave 提交的 ackOffset, 用来同步的时候读取数据的offset.</p>
<p>流程如下:</p>
<pre><code>AcceptSocketService#run -&gt; HAConnection#run -&gt; HAConnection$ReadSocketService#run -&gt; #processReadEvent -&gt; HAService#notifyTransferSome -&gt; HAService$GroupTransferService#notifyTransferSome
</code></pre><p>可以发现, 读线程的最终效果除了修改 同步的offset, 还会通知堵塞的 GroupTransferService 有数据同步完成, 及时的返回结果给 写入请求.</p>
<h4 id="master-writesocketservice">master WriteSocketService</h4>
<p>master 用来提交发送心跳以及 数据 给slave. 比较简单, 不赘述</p>
<p>但是需要注意的是, master 是主动推送给slave的, 和 kafka 设计中 slave 主动拉取不一样. master 推送之前, slave会周期性的上报自己的offset, 这样master 就知道从哪个位置开始推送.</p>
<h4 id="slave">slave</h4>
<p>slave 会启动一个HAClient 连接到 master, 上报自己的ackOffset, 并不断读取 master 推送的数据.</p>
<p>slave 读取流程如下</p>
<pre><code>HAClient#run -&gt; #processReadEvent -&gt; #dispatchReadRequest -&gt; defaultMessageStore#appendToCommitLog
</code></pre><h3 id="切主">切主</h3>
<p>rocketmq 不支持自动切主, 因此需要人工参与. 可以参考 <a href="https://github.com/didi/DDMQ/wiki/%E8%87%AA%E5%8A%A8%E4%B8%BB%E4%BB%8E%E5%88%87%E6%8D%A2">https://github.com/didi/DDMQ/wiki/%E8%87%AA%E5%8A%A8%E4%B8%BB%E4%BB%8E%E5%88%87%E6%8D%A2</a></p>
<h3 id="flush-机制">flush 机制</h3>
<p>rocketmq 支持 同步刷盘和异步刷盘两种模式, 同步刷盘一定程度上降低了 rocketmq 的处理能力.</p>
<h3 id="同步异步-刷盘">同步/异步 刷盘</h3>
<h4 id="抽象-1">抽象</h4>
<ul>
<li>GroupCommitService: 同步刷盘策略, 每次写入数据都会触发磁盘刷盘</li>
<li>FlushRealTimeService: 近实时刷盘策略, 数据页数量或者刷新时间到, 会触发刷新</li>
</ul>
<h4 id="同步实现">同步实现</h4>
<p>因为上层抽象了 future模式, 导致写入有两个入口, 对应的 刷盘也有两个入口: <code>CommitLog#submitFlushRequest</code> 和 <code>CommitLog#handleDiskFlush</code>.</p>
<p>无论同步异步还是同步, 进入同步刷盘的逻辑流程如下:</p>
<pre><code>GroupCommitService#putRequest -&gt; MappedFileQueue#flush( -&gt; MappedFile#flush) + GroupCommitRequest#wakeupCustomer (发回响应)
</code></pre><p>文件的第一次刷新需要刷新元数据. 使用双buffer进行读写, 每次读完buffer, 会刷新一次checkpoint.</p>
<h4 id="异步刷盘">异步刷盘</h4>
<p>FlushRealTimeService 触发刷新的条件有两个:</p>
<ul>
<li>刷新间隔超过配置的 <code>flushCommitLogThoroughInterval</code>, 默认 10s, 会触发无脑刷新</li>
<li>刷新的page数量超过配置的 <code>flushCommitLogLeastPages</code>, 默认是 4</li>
</ul>
<p>近实时刷新每次刷新的时候都会更新checkpoint. 刷新的时候支持一个参数配置: <code>flushCommitLogTimed</code>, 按时间刷新, 就是等待指定的 <code>flushIntervalCommitLog</code> 才进行刷新, 默认false, 就是有写入请求触发才进行刷新. 注意, 这个只是触发条件. 真正刷新需要传递 <code>flushLeastPages</code> 参数的, 0是无脑刷新, 有值 值需要满足 待刷新page数 符合需求才会刷新. 因此 在满足 <code>flushCommitLogThoroughInterval</code> 的情况下, flushLeastPages 就是传0</p>
]]></content>
</item>

<item>
<title>Tcp_user_timeout</title>
<link>https://xujianhai.fun/posts/tcp_user_timeout/</link>
Expand Down
Loading

0 comments on commit 5a20d11

Please sign in to comment.