diff --git a/book/en-us/toc.md b/book/en-us/toc.md index 1f4cf85..af32914 100644 --- a/book/en-us/toc.md +++ b/book/en-us/toc.md @@ -82,7 +82,7 @@ + 7.2 Mutex and Critical Section + 7.3 Futures + 7.4 Condition Variable - + 7.5 Atomic Operation and Memory Order + + 7.5 Atomic Operation and Memory Model + 7.6 Transactional Memory + 7.7 Coroutine - [**Chapter 08 File System**](./08-filesystem.md) diff --git a/book/zh-cn/07-thread.md b/book/zh-cn/07-thread.md index bc738ca..ab9b4d3 100644 --- a/book/zh-cn/07-thread.md +++ b/book/zh-cn/07-thread.md @@ -228,7 +228,7 @@ int main() { 争夺此锁,从而更好的利用多个消费者之间的并发。话虽如此,但实际上因为 `std::mutex` 的排他性, 我们根本无法期待多个消费者能真正意义上的并行消费队列的中生产的内容,我们仍需要粒度更细的手段。 -## 7.5 原子操作与内存一致性 +## 7.5 原子操作与内存模型 细心的读者可能会对前一小节中生产者消费者模型的例子可能存在编译器优化导致程序出错的情况产生疑惑。 例如,布尔值 `notified` 没有被 `volatile` 修饰,编译器可能对此变量存在优化,例如将其作为一个寄存器的值, @@ -241,7 +241,7 @@ int main() { int main() { int a = 0; - volatile int flag = 0; + int flag = 0; std::thread t1([&]() { while (flag != 1); @@ -261,17 +261,177 @@ int main() { } ``` +从直观上看,`t2` 中 `a = 5;` 这一条语句似乎总在 `flag = 1;` 之前得到执行,而 `t1` 中 `while (flag != 1)` +似乎保证了 `std::cout << "b = " << b << std::endl;` 不会再标记被改变前执行。从逻辑上看,似乎 `b` 的值应该等于 5。 +但实际情况远比此复杂得多,或者说这段代码本身属于未定义的行为,因为对于 `a` 和 `flag` 而言,他们在两个并行的线程中被读写, +出现了竞争。除此之外,即便我们忽略竞争读写,仍然可能收 CPU 的乱序执行,编译器对指令的重排的影响, +导致 `a = 5` 发生在 `flag = 1` 之后。从而 `b` 可能输出 0。 + +### 原子操作 + +`std::mutex` 可以解决上面出现的并发读写的问题,但互斥锁是操作系统级的功能, +这是因为一个互斥锁的实现通常包含两条基本原理: + +1. 提供线程间自动的状态转换,即『锁住』这个状态 +2. 保障在互斥锁操作期间,所操作变量的内存与临界区外进行隔离 + +这是一组非常强的同步条件,换句话说当最终编译为 CPU 指令时会表现为非常多的指令(我们之后再来看如何实现一个简单的互斥锁)。 +这对于一个仅需原子级操作(没有中间态)的变量,似乎太苛刻了。 + +关于同步条件的研究有着非常久远的历史,我们在这里不进行赘述。读者应该明白,在现代 CPU 体系结构下提供了 CPU 指令级的原子操作, +因此在 C++11 中多线程下共享变量的读写这一问题上,还引入了 `std::atomic` 模板,使得我们实例化一个原子类型,将一个 +原子类型读写操作从一组指令,最小化到单个 CPU 指令。例如: + +```cpp +std::atomic counter; +``` + +并为整数或浮点数的原子类型提供了基本的数值成员函数,举例来说, +包括 `fetch_add`, `fetch_sub` 等,同时通过重载方便的提供了对应的 `+`,`-` 版本。 +比如下面的例子: + +```cpp +#include +#include +#include + +std::atomic count = {0}; + +int main() { + std::thread t1([](){ + count.fetch_add(1); + }); + std::thread t2([](){ + count++; // 等价于 fetch_add + count += 1; // 等价于 fetch_add + }); + t1.join(); + t2.join(); + std::cout << count << std::endl; + return 0; +} +``` + +当然,并非所有的类型都能提供原子操作,这是因为原子操作的可行性取决于 CPU 的架构以及所实例化的类型结构是否满足该架构对内存对齐 +条件的要求,因而我们总是可以通过 `std::atomic::is_lock_free` 来检查该原子类型是否需支持原子操作,例如: + +```cpp +#include +#include + +struct A { + float x; + int y; + long long z; +}; + std::atomic a; + std::cout << std::boolalpha << a.is_lock_free() << std::endl; + return 0; +} +``` + +### 一致性模型 + +TODO: 加强这部分叙述的前后逻辑 + +并行执行的多个线程,从某种宏观层面上讨论,可以粗略的视为一种分布式系统。 +其中每个线程可以对应为一个集群节点,而线程间的通信也几乎等价于集群节点间的通信。 +理论上来说,一致性包含四种不同的类型: + +1. 线性一致性:又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写的数据,并且所有线程的操作顺序与全局时钟下的顺序是一致的。 + + ``` + x.write(1) x.read() + T1 ---------+----------------+------> + + + T2 -------------------+-------------> + x.write(2) + ``` + + 在这种情况下线程 `T1`, `T2` 对 `x` 的两次写操作是原子的,且 `x.write(1)` 是严格的发生在 `x.write(2)` 之前,`x.write(2)` 严格的发生在 `x.read()` 之前。 + 值得一提的是,线性一致性对全局时钟的要求是难以实现的,这也是人们不断研究比这个一致性更弱条件下其他一致性的算法的原因。 + +2. 顺序一致性:同样要求任何一次读操作都能读到数据最近一次写入的数据,但未要求与全局时钟的顺序一致。 + + ``` + x.write(1) x.write(3) x.read() + T1 ---------+-----------+----------+-----> + + + T2 ---------------+----------------------> + x.write(2) + + 或者 + + x.write(1) x.write(3) x.read() + T1 ---------+-----------+----------+-----> + + + T2 ------+-------------------------------> + x.write(2) + ``` + + 在顺序一致性的要求下,`x.read()` 必须读到最近一次写入的数据,因此 `x.write(2)` 与 `x.write(1)` 并无任何先后保障,即 只要 `T2` 的 `x.write(2)` 发生在 `x.write(3)` 之前即可。 + +3. 因果一致性:它的要求进一步降低,只需要有因果关系的操作顺序得到保障,而非因果关系的操作顺序则不做要求。 + +4. 最终一致性:TODO: + +### 内存顺序 + +TODO: + +C++11 为原子操作定义了六种不同的内存顺序 `std::memory_order` 的选项, +表达了四种多线程间的同步模型: + +- 宽松模型:在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间原子操作的顺序是任意的。类型通过 `std::memory_order_relaxed` 指定。我们来看一个例子: + + ```cpp + #include + #include + #include + #include + + std::atomic counter = {0}; + + int main() { + std::vector vt; + for (int i = 0; i < 100; ++i) { + vt.emplace_back([](){ + counter.fetch_add(1, std::memory_order_relaxed); + }); + } + + for (auto& t : vt) { + t.join(); + } + std::cout << "current counter:" << counter << std::endl; + return 0; + } + ``` + +- 释放/消费模型:在此模型中,我们开始限制进程间的操作顺序,如果某个线程依赖某个值,但另一个线程同时会对该值进行修改,即前者依赖后者。 + +- 释放/获取模型:在此模型下,我们可以进一步限制不同线程间原子操作的顺序,在释放(release)和获取(acquire)之间规定时序,即发生在释放操作之前的写操作,对其他线程的任何获取操作都是可见的,即发生顺序(happens-before)。 + + `std::memory_order_consume`、`std::memory_order_acquire`、`std::memory_order_release`、`std::memory_order_acq_rel` 这四种选项均为这两个模型服务的 TODO: 未写完 + +- 顺序一致模型:在此模型下,原子操作满足顺序一致性,进而可能对性能产生损耗。可显式的通过 `std::memory_order_seq_cst` 进行指定。 + ## 7.6 事务内存 - +TODO: C++20 放到第十章? ## 总结 C++11 语言层提供了并发编程的相关支持,本节简单的介绍了 `std::thread`/`std::mutex`/`std::future` 这些并发编程中不可回避的重要工具。 +除此之外,我们还介绍了 C++11 最重要的几个特性之一的『内存模型』, +它们为 C++ 在标准化高性能计算中提供了重要的基础。 ## 习题 -1. 请编写一个线程池,提供如下功能: +1. 请编写一个简单的线程池,提供如下功能: ```cpp ThreadPool p(4); // 指定四个工作线程 @@ -285,12 +445,15 @@ auto f = pool.enqueue([](int life) { std::cout << f.get() << std::endl; ``` +2. 请实现一个无锁版本的 FIFO 队列,提供 `enqueue` 和 `dequeue` 两个方法。 + [返回目录](./toc.md) | [上一章](./06-regex.md) | [下一章 文件系统](./08-filesystem.md) ## 进一步阅读的参考资料 -1. [C++ 并发编程\(中文版\)](https://www.gitbook.com/book/chenxiaowei/cpp_concurrency_in_action/details) -2. [线程支持库文档](http://en.cppreference.com/w/cpp/thread) +- [C++ 并发编程\(中文版\)](https://www.gitbook.com/book/chenxiaowei/cpp_concurrency_in_action/details) +- [线程支持库文档](http://en.cppreference.com/w/cpp/thread) +- Herlihy, M. P., & Wing, J. M. (1990). Linearizability: a correctness condition for concurrent objects. ACM Transactions on Programming Languages and Systems, 12(3), 463–492. https://doi.org/10.1145/78969.78972 ## 许可 diff --git a/book/zh-cn/toc.md b/book/zh-cn/toc.md index e6cd439..e2d3239 100644 --- a/book/zh-cn/toc.md +++ b/book/zh-cn/toc.md @@ -82,7 +82,7 @@ + 7.2 互斥量与临界区 + 7.3 期物 + 7.4 条件变量 - + 7.5 原子操作与内存一致性 + + 7.5 原子操作与内存模型 + 7.6 事务内存 - [**第 8 章 文件系统**](./08-filesystem.md) + 8.1 文档与链接 diff --git a/code/7/7.6.atomic.cpp b/code/7/7.6.atomic.cpp new file mode 100644 index 0000000..3d59425 --- /dev/null +++ b/code/7/7.6.atomic.cpp @@ -0,0 +1,19 @@ +#include +#include +#include + +std::atomic count = {0}; + +int main() { + std::thread t1([](){ + count.fetch_add(1); + }); + std::thread t2([](){ + count++; // identical to fetch_add + count += 1; // identical to fetch_add + }); + t1.join(); + t2.join(); + std::cout << count << std::endl; + return 0; +} \ No newline at end of file diff --git a/code/7/7.7.is.lock.free.cpp b/code/7/7.7.is.lock.free.cpp new file mode 100644 index 0000000..c936648 --- /dev/null +++ b/code/7/7.7.is.lock.free.cpp @@ -0,0 +1,12 @@ +#include +#include + +struct A { + float x; + int y; + long long z; +}; + std::atomic a; + std::cout << std::boolalpha << a.is_lock_free() << std::endl; + return 0; +} \ No newline at end of file diff --git a/code/7/7.8.relaxed.cpp b/code/7/7.8.relaxed.cpp new file mode 100644 index 0000000..eb93395 --- /dev/null +++ b/code/7/7.8.relaxed.cpp @@ -0,0 +1,21 @@ +#include +#include +#include +#include + +std::atomic counter = {0}; + +int main() { + std::vector vt; + for (int i = 0; i < 100; ++i) { + vt.emplace_back([](){ + counter.fetch_add(1, std::memory_order_relaxed); + }); + } + + for (auto& t : vt) { + t.join(); + } + std::cout << "current counter:" << counter << std::endl; + return 0; +} \ No newline at end of file