Skip to content

Commit 3edee33

Browse files
committed
mtq finished
1 parent ba70267 commit 3edee33

File tree

1 file changed

+71
-15
lines changed

1 file changed

+71
-15
lines changed

docs/parallel/practice/multi-threading-queue.md

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
支持多线程共享的队列是一种相当常见且重要的数据结构。正如我们在[多线程编程](../multi-threading.md)中看到的那样,需要多线程的场景很多都会表现为一种生产者和消费者之间的协作关系。为了让生产者得以将数据传递给消费者,我们往往就需要一个先进先出(FIFO)的数据结构:队列。
44

5-
在传统的单线程程序里,实现一个队列相当简单。然而在多线程的环境下,我们的队列必须满足以下额外的条件
5+
在传统的单线程程序里,实现一个队列相当简单。不过在多线程的环境下,除了无死锁(deadlock-free)等基本条件之外,我们的队列还必须满足以下额外的条件
66

77
* 线程安全(thread-safe)
88

@@ -11,9 +11,9 @@
1111
* 异常安全(exception-safe)
1212

1313
当一个线程调用数据结构提供的函数中途出现异常时,数据结构内部状态不会损坏,其他线程依然可以正常访问并得到正确的结果。并且也不会出现内存泄露。
14+
1415
虽然异常安全在单线程场景下也很重要,但它值得在多线程场景中被再次强调。因为多线程下因异常安全造成的 bug 往往更加难以复现、排查,有时甚至会造成整个软件的崩溃。
1516

16-
1717
在 C++ 中实现这样的一个队列,至少存在两种方法:基于互斥量的方法和基于原子量的方法。为了简单起见,我们在本节中只介绍基于互斥量的方法。
1818

1919
而根据生产者和消费者的数量不同,可以分为四种队列:
@@ -72,7 +72,7 @@ public:
7272
7373
* `push()` 函数中调用的 `q.push()` 在底层会调用内部容器(通常是 `std::deque` 或 `std::list`)的 `push_back()` 函数。
7474
75-
这里,STL 保证 `push_back() ` 在因为内存分配出错或者元素拷贝或移动出错时,会表现得就像函数没有被调用过一样。在出现异常后,C++ 会自动进行栈展开(stack unwinding),`lock` 的析构函数被调用,我们的锁因此被解开。
75+
这里,STL 保证 `push_back()` 在因为内存分配出错或者元素拷贝或移动出错时,会表现得就像函数没有被调用过一样。在出现异常后,C++ 会自动进行栈展开(stack unwinding),`lock` 的析构函数被调用,我们的锁因此被解开。
7676
7777
因此,我们的 `push()` 函数是异常安全的。
7878
@@ -109,10 +109,6 @@ public:
109109
queue(const queue& other) = delete;
110110
queue& operator=(const queue& other) = delete;
111111
112-
bool empty() {
113-
return !head;
114-
}
115-
116112
void push(T element) {
117113
auto ptr = std::make_unique(std::move(element));
118114
auto new_tail = ptr.get();
@@ -161,18 +157,14 @@ class queue {
161157
node* tail;
162158

163159
public:
164-
queue() : head(std::make_unique()), tail(head.get()) {}
160+
queue() : head(std::make_unique<T>()), tail(head.get()) {}
165161
// 为了简便,我们暂时不考虑拷贝构造和赋值
166162
queue(const queue& other) = delete;
167163
queue& operator=(const queue& other) = delete;
168164

169-
bool empty() {
170-
return head.get() == tail;
171-
}
172-
173165
void push(T element) {
174166
auto element_ptr = std::make_shared(std::move(element));
175-
auto node_ptr = std::make_unique();
167+
auto node_ptr = std::make_unique<T>();
176168
auto new_tail = node_ptr.get();
177169
// 将当前尾节点的值设为 `element_ptr`,而新建的空节点 `node_ptr` 作为新的尾节点
178170
tail->data = element_ptr;
@@ -181,7 +173,7 @@ public:
181173
}
182174

183175
std::shared_ptr<T> pop() {
184-
if (empty())
176+
if (head.get() == tail)
185177
return std::shared_ptr<T>();
186178
auto res = head->data;
187179
auto old_head = std::move(head);
@@ -195,7 +187,71 @@ public:
195187

196188
最值得讨论的是 `push()` 的修改。我们在这里的策略是:**将用户输入的值存到当前作为尾节点的空节点,然后再新建一个空节点附加到链表尾部。**这样一来,我们发现即使队列里只有一个数据,链表里仍会有两个节点,此时并发的 `pop()``push()` 调用再也不会访问同一个节点了。
197189

198-
不过,事情还没有结束,竞态条件依然存在!注意 `pop()` 中调用的 `empty()`,它引用了 `tail` 来做比较,正是这里和 `push()` 一起产生了竞态条件。解决方法很简单,上个锁就好了。虽然我们最终还是引入了互斥量,但是对比上面的例子我们可以看到:**临界区变小了,仅限于对 `tail` 的访问,因此并发性能得到了提升。**
190+
接下来,我们在这种设计的基础上放置互斥量。和简单的实现不同,我们追求尽可能大的并发访问性。经过分析,我们可以发现这些可能造成竞态条件的情况:
191+
192+
1. 多个线程并发调用 `push()`
193+
2. 多个线程并发调用 `pop()`
194+
3. 多个线程并发调用 `push()``pop()`
195+
196+
这几种情况的竞争焦点都是 `head``tail`,我们可以分别给它们引入一个互斥量进行保护。最后的代码如下所示:
197+
198+
```cpp
199+
template<typename T>
200+
class queue {
201+
struct node {
202+
std::shared_ptr<T> element;
203+
std::unique_ptr<node> next;
204+
};
205+
206+
std::unique_ptr<node> head;
207+
node* tail;
208+
std::mutex head_mutex;
209+
std::mutex tail_mutex;
210+
211+
node* get_tail() {
212+
std::scoped_lock lock(tail_mutex);
213+
return tail;
214+
}
215+
216+
std::unique_ptr<node> pop_head() {
217+
// 给 `head` 加锁,防止并发的 `pop()` 导致竞态条件
218+
std::scoped_lock lock(head_mutex);
219+
// `get_tail()` 会给 `tail` 加锁,从而避免了并发的 `pop()` 和 `push()` 可能导致的竞态条件
220+
if (head.get() == get_tail())
221+
return std::unique_ptr<node>();
222+
auto old_head = std::move(head);
223+
head = std::move(old_head->next);
224+
return old_head;
225+
}
226+
227+
public:
228+
queue() : head(std::make_unique<T>()), tail(head.get()) {}
229+
// 为了简便,我们暂时不考虑拷贝构造和赋值
230+
queue(const queue& other) = delete;
231+
queue& operator=(const queue& other) = delete;
232+
233+
void push(T element) {
234+
auto element_ptr = std::make_shared(std::move(element));
235+
auto node_ptr = std::make_unique<T>();
236+
auto new_tail = node_ptr.get();
237+
// 仅在需要访问 tail 时才加锁,这里的锁可以避免并发的 `push()` 导致的竞态条件
238+
std::scoped_lock lock(tail_mutex);
239+
tail->data = element_ptr;
240+
tail->next = std::move(node_ptr);
241+
tail = new_tail;
242+
}
243+
244+
std::shared_ptr<T> pop() {
245+
auto old_head = pop_head();
246+
if (old_head)
247+
return old_head->data;
248+
else
249+
return std::shared_ptr<T>();
250+
}
251+
};
252+
```
253+
254+
从上面的代码来看,我们通过引入两个互斥量,成功地解决了可能出现的竞态条件。虽然我们最终还是引入了互斥量,但是对比上面的例子我们可以看到:**`push()``pop()` 只在恰当的时候才给互斥量上锁,临界区变小了,并发性能得到了提升。**上面的实现依然是异常安全的,至于具体的分析则留给读者。
199255

200256
## 小结
201257

0 commit comments

Comments
 (0)