book: revision of ch07 & finish atomics

Update #2
This commit is contained in:
Changkun Ou
2019-07-19 10:59:01 +02:00
parent fdb34b80e5
commit 677f74b691
12 changed files with 327 additions and 81 deletions

View File

@@ -83,8 +83,6 @@
+ 7.3 Futures + 7.3 Futures
+ 7.4 Condition Variable + 7.4 Condition Variable
+ 7.5 Atomic Operation and Memory Model + 7.5 Atomic Operation and Memory Model
+ 7.6 Transactional Memory
+ 7.7 Coroutine
- [**Chapter 08 File System**](./08-filesystem.md) - [**Chapter 08 File System**](./08-filesystem.md)
+ 8.1 Documents and links + 8.1 Documents and links
+ 8.2 `std::filesystem` + 8.2 `std::filesystem`
@@ -97,10 +95,11 @@
+ Custom String Literal + Custom String Literal
+ 9.4 Math Library + 9.4 Math Library
- [**Chapter 10 Outlook: Introduction of C++20**](./10-cpp20.md) - [**Chapter 10 Outlook: Introduction of C++20**](./10-cpp20.md)
+ Concept + 10.1 Concept
+ Range + 10.2 Range
+ Module + 10.3 Module
+ Coroutine + 10.4 Coroutine
+ 10.5 Transaction Memory
- [**Appendix 1: Further Study Materials**](./appendix1.md) - [**Appendix 1: Further Study Materials**](./appendix1.md)
- [**Appendix 2: Modern C++ Best Practices**](./appendix2.md) - [**Appendix 2: Modern C++ Best Practices**](./appendix2.md)

View File

@@ -332,70 +332,112 @@ struct A {
### 一致性模型 ### 一致性模型
TODO: 加强这部分叙述的前后逻辑
并行执行的多个线程,从某种宏观层面上讨论,可以粗略的视为一种分布式系统。 并行执行的多个线程,从某种宏观层面上讨论,可以粗略的视为一种分布式系统。
其中每个线程可以对应为一个集群节点,而线程间的通信也几乎等价于集群节点间的通信。 在分布式系统中,任何通信乃至本地操作都需要消耗一定时间,甚至出现不可靠的通信。
理论上来说,一致性包含四种不同的类型:
如果我们强行将一个变量 `v` 在多个线程之间的操作设为原子操作,即任何一个线程在操作完 `v` 后,
其他线程均能**同步**感知到 `v` 的变化,则对于变量 `v` 而言,表现为顺序执行的程序,它并没有由于引入多线程
而得到任何效率上的收益。对此有什么办法能够适当的加速呢?答案便是削弱原子操作的在进程间的同步条件。
从原理上看,每个线程可以对应为一个集群节点,而线程间的通信也几乎等价于集群节点间的通信。
削弱进程间的同步条件,通常我们会考虑四种不同的一致性模型:
1. 线性一致性:又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写的数据,并且所有线程的操作顺序与全局时钟下的顺序是一致的。 1. 线性一致性:又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写的数据,并且所有线程的操作顺序与全局时钟下的顺序是一致的。
``` ```
x.write(1) x.read() x.store(1) x.load()
T1 ---------+----------------+------> T1 ---------+----------------+------>
T2 -------------------+-------------> T2 -------------------+------------->
x.write(2) x.store(2)
``` ```
在这种情况下线程 `T1`, `T2` 对 `x` 的两次写操作是原子的,且 `x.write(1)` 是严格的发生在 `x.write(2)` 之前,`x.write(2)` 严格的发生在 `x.read()` 之前。 在这种情况下线程 `T1`, `T2` 对 `x` 的两次写操作是原子的,且 `x.store(1)` 是严格的发生在 `x.store(2)` 之前,`x.store(2)` 严格的发生在 `x.load()` 之前。
值得一提的是,线性一致性对全局时钟的要求是难以实现的,这也是人们不断研究比这个一致性更弱条件下其他一致性的算法的原因。 值得一提的是,线性一致性对全局时钟的要求是难以实现的,这也是人们不断研究比这个一致性更弱条件下其他一致性的算法的原因。
2. 顺序一致性:同样要求任何一次读操作都能读到数据最近一次写入的数据,但未要求与全局时钟的顺序一致。 2. 顺序一致性:同样要求任何一次读操作都能读到数据最近一次写入的数据,但未要求与全局时钟的顺序一致。
``` ```
x.write(1) x.write(3) x.read() x.store(1) x.store(3) x.load()
T1 ---------+-----------+----------+-----> T1 ---------+-----------+----------+----->
T2 ---------------+----------------------> T2 ---------------+---------------------->
x.write(2) x.store(2)
或者 或者
x.write(1) x.write(3) x.read() x.store(1) x.store(3) x.load()
T1 ---------+-----------+----------+-----> T1 ---------+-----------+----------+----->
T2 ------+-------------------------------> T2 ------+------------------------------->
x.write(2) x.store(2)
``` ```
在顺序一致性的要求下,`x.read()` 必须读到最近一次写入的数据,因此 `x.write(2)` 与 `x.write(1)` 并无任何先后保障,即 只要 `T2` 的 `x.write(2)` 发生在 `x.write(3)` 之前即可。 在顺序一致性的要求下,`x.load()` 必须读到最近一次写入的数据,因此 `x.store(2)` 与 `x.store(1)` 并无任何先后保障,即 只要 `T2` 的 `x.store(2)` 发生在 `x.store(3)` 之前即可。
3. 因果一致性:它的要求进一步降低,只需要有因果关系的操作顺序得到保障,而非因果关系的操作顺序则不做要求。 3. 因果一致性:它的要求进一步降低,只需要有因果关系的操作顺序得到保障,而非因果关系的操作顺序则不做要求。
4. 最终一致性TODO: ```
a = 1 b = 2
T1 ----+-----------+---------------------------->
T2 ------+--------------------+--------+-------->
x.store(3) c = a + b y.load()
或者
a = 1 b = 2
T1 ----+-----------+---------------------------->
T2 ------+--------------------+--------+-------->
x.store(3) y.load() c = a + b
亦或者
b = 2 a = 1
T1 ----+-----------+---------------------------->
T2 ------+--------------------+--------+-------->
y.load() c = a + b x.store(3)
```
上面给出的三种例子都是属于因果一致的,因为整个过程中,只有 `c` 对 `a` 和 `b` 产生依赖,而 `x` 和 `y`
在此例子中表现为没有关系(但实际情况中我们需要更详细的信息才能确定 `x` 与 `y` 确实无关)
4. 最终一致性:是最弱的一致性要求,它只保障某个操作在未来的某个时间节点上会被观察到,但并未要求被观察到的时间。因此我们甚至可以对此条件稍作加强,例如规定某个操作被观察到的时间总是有界的。当然这已经不在我们的讨论范围之内了。
```
x.store(3) x.store(4)
T1 ----+-----------+-------------------------------------------->
T2 ---------+------------+--------------------+--------+-------->
x.read() x.read() x.read() x.read()
```
在上面的情况中,如果我们假设 x 的初始值为 0则 `T2` 中四次 `x.read()` 结果可能但不限于以下情况:
```
3 4 4 4 // x 的写操作被很快观察到
0 3 3 4 // x 的写操作被观察到的时间存在一定延迟
0 0 0 4 // 最后一次读操作读到了 x 的最终值,但此前的变化并未观察到
0 0 0 0 // 在当前时间段内 x 的写操作均未被观察到,但未来某个时间点上一定能观察到 x 为 4 的情况
```
### 内存顺序 ### 内存顺序
TODO: 为了追求极致的性能实现各种强度要求的一致性C++11 为原子操作定义了六种不同的内存顺序 `std::memory_order` 的选项,表达了四种多线程间的同步模型:
C++11 为原子操作定义了六种不同的内存顺序 `std::memory_order` 的选项, 1. 宽松模型:在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间原子操作的顺序是任意的。类型通过 `std::memory_order_relaxed` 指定。我们来看一个例子:
表达了四种多线程间的同步模型:
- 宽松模型:在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间原子操作的顺序是任意的。类型通过 `std::memory_order_relaxed` 指定。我们来看一个例子:
```cpp ```cpp
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
std::atomic<int> counter = {0}; std::atomic<int> counter = {0};
int main() {
std::vector<std::thread> vt; std::vector<std::thread> vt;
for (int i = 0; i < 100; ++i) { for (int i = 0; i < 100; ++i) {
vt.emplace_back([](){ vt.emplace_back([](){
@@ -407,21 +449,79 @@ C++11 为原子操作定义了六种不同的内存顺序 `std::memory_order`
t.join(); t.join();
} }
std::cout << "current counter:" << counter << std::endl; std::cout << "current counter:" << counter << std::endl;
return 0;
}
``` ```
- 释放/消费模型:在此模型中,我们开始限制进程间的操作顺序,如果某个线程依赖某个值,但另一个线程同时会对该值进行修改,即者依赖者。 2. 释放/消费模型:在此模型中,我们开始限制进程间的操作顺序,如果某个线程需要修改某个值,但另一个线程会对该值的某次操作产生依赖,即者依赖者。具体而言,线程 A 完成了三次对 `x` 的写操作,线程 `B` 仅依赖其中第三次 `x` 的写操作,与 `x` 的前两次写行为无关,则当 `A` 主动 `x.release()` 时候(即使用 `std::memory_order_release`),选项 `std::memory_order_consume` 能够确保 `B` 在调用 `x.load()` 时候观察到 `A` 中第三次对 `x` 的写操作。我们来看一个例子:
- 释放/获取模型在此模型下我们可以进一步限制不同线程间原子操作的顺序在释放release和获取acquire之间规定时序即发生在释放操作之前的写操作对其他线程的任何获取操作都是可见的即发生顺序happens-before ```cpp
std::atomic<int*> ptr;
int v;
std::thread producer([&]() {
int* p = new int(42);
v = 1024;
ptr.store(p, std::memory_order_release);
});
std::thread consumer([&]() {
int* p;
while(!(p = ptr.load(std::memory_order_consume)));
`std::memory_order_consume`、`std::memory_order_acquire`、`std::memory_order_release`、`std::memory_order_acq_rel` 这四种选项均为这两个模型服务的 TODO: 未写完 std::cout << "p: " << *p << std::endl;
std::cout << "v: " << v << std::endl;
});
producer.join();
consumer.join();
```
- 顺序一致模型:在此模型下,原子操作满足顺序一致性,进而可能对性能产生损耗。可显式的通过 `std::memory_order_seq_cst` 进行指定 3. 释放/获取模型:在此模型下,我们可以进一步加紧对不同线程间原子操作的顺序的限制,在释放 `std::memory_order_release` 和获取 `std::memory_order_acquire` 之间规定时序,即发生在释放操作之前的**所有**写操作对其他线程的任何获取操作都是可见的亦即发生顺序happens-before
## 7.6 事务内存 可以看到,`std::memory_order_release` 确保了它之后的写行为不会发生在释放操作之前,是一个向后的屏障,而 `std::memory_order_acquire` 确保了它之后的前的写行为,不会发生在该获取操作之后,是一个向前的屏障,对于选项 `std::memory_order_acq_rel` 而言,则结合了这两者的特点,唯一确定了一个内存屏障,使得当前线程对内存的读写不会被重排到此操作的前后。
TODO: C++20 放到第十章? 我们来看一个例子:
```cpp
std::vector<int> v;
std::atomic<int> flag = {0};
std::thread release([&]() {
v.push_back(42);
flag.store(1, std::memory_order_release);
});
std::thread acqrel([&]() {
int expected = 1; // must before compare_exchange_strong
while(!flag.compare_exchange_strong(expected, 2, std::memory_order_acq_rel)) {
expected = 1; // must after compare_exchange_strong
}
// flag has changed to 2
});
std::thread acquire([&]() {
while(flag.load(std::memory_order_acquire) < 2);
std::cout << v.at(0) << std::endl; // must be 42
});
release.join();
acqrel.join();
acquire.join();
```
在此例中我们使用了 `compare_exchange_strong`它便是比较交换原语Compare-and-swap primitive它有一个更弱的版本即 `compare_exchange_weak`,它允许即便交换成功,也仍然返回 `false` 失败。其原因是因为在某些平台上虚假故障导致的,具体而言,当 CPU 进行上下文切换时,另一线程加载同一地址产生的不一致。除此之外,`compare_exchange_strong` 的性能可能稍差于 `compare_exchange_weak`,但大部分情况下,`compare_exchange_strong` 应该被有限考虑。
4. 顺序一致模型:在此模型下,原子操作满足顺序一致性,进而可能对性能产生损耗。可显式的通过 `std::memory_order_seq_cst` 进行指定。最后来看一个例子:
```cpp
std::atomic<int> counter = {0};
std::vector<std::thread> vt;
for (int i = 0; i < 100; ++i) {
vt.emplace_back([](){
counter.fetch_add(1, std::memory_order_seq_cst);
});
}
for (auto& t : vt) {
t.join();
}
std::cout << "current counter:" << counter << std::endl;
```
这个例子与第一个宽松模型的例子本质上没有区别,仅仅只是将原子操作的内存顺序修改为了 `memory_order_seq_cst`,有兴趣的读者可以自行编写程序测量这两种不同内存顺序导致的性能差异。
## 总结 ## 总结
@@ -445,7 +545,7 @@ auto f = pool.enqueue([](int life) {
std::cout << f.get() << std::endl; std::cout << f.get() << std::endl;
``` ```
2. 请实现一个无锁版本的 FIFO 队列,提供 `enqueue` 和 `dequeue` 两个方法 2. 请使用 `std::atomic<book>` 实现一个互斥锁
[返回目录](./toc.md) | [上一章](./06-regex.md) | [下一章 文件系统](./08-filesystem.md) [返回目录](./toc.md) | [上一章](./06-regex.md) | [下一章 文件系统](./08-filesystem.md)

View File

@@ -73,6 +73,11 @@ TODO:
TODO: TODO:
## 事务内存
TODO:
## 总结 ## 总结
总的来说,终于在 C++20 中看到 Concepts/Ranges/Modules 这些令人兴奋的特性, 总的来说,终于在 C++20 中看到 Concepts/Ranges/Modules 这些令人兴奋的特性,

View File

@@ -83,7 +83,6 @@
+ 7.3 期物 + 7.3 期物
+ 7.4 条件变量 + 7.4 条件变量
+ 7.5 原子操作与内存模型 + 7.5 原子操作与内存模型
+ 7.6 事务内存
- [**第 8 章 文件系统**](./08-filesystem.md) - [**第 8 章 文件系统**](./08-filesystem.md)
+ 8.1 文档与链接 + 8.1 文档与链接
+ 8.2 `std::filesystem` + 8.2 `std::filesystem`
@@ -100,6 +99,7 @@
+ 10.2 Range + 10.2 Range
+ 10.3 Module + 10.3 Module
+ 10.4 Coroutine + 10.4 Coroutine
+ 10.5 事务内存
- [**附录 1进一步阅读的学习材料**](./appendix1.md) - [**附录 1进一步阅读的学习材料**](./appendix1.md)
- [**附录 2现代 C++ 的最佳实践**](./appendix2.md) - [**附录 2现代 C++ 的最佳实践**](./appendix2.md)

View File

@@ -6,6 +6,8 @@ struct A {
int y; int y;
long long z; long long z;
}; };
int main() {
std::atomic<A> a; std::atomic<A> a;
std::cout << std::boolalpha << a.is_lock_free() << std::endl; std::cout << std::boolalpha << a.is_lock_free() << std::endl;
return 0; return 0;

101
code/7/7.8.mem_order.cpp Normal file
View File

@@ -0,0 +1,101 @@
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
using namespace std;
using namespace std::chrono;
atomic<int> counter = {0};
const int N = 10000;
void relaxed_order() {
cout << "relaxed_order: " << endl;
vector<thread> vt;
for (int i = 0; i < N; ++i) {
vt.emplace_back([](){
counter.fetch_add(1, memory_order_relaxed);
});
}
auto t1 = high_resolution_clock::now();
for (auto& t : vt) {
t.join();
}
auto t2 = high_resolution_clock::now();
auto duration = ( t2 - t1 ).count();
cout << "relaxed order speed: " << duration / N << "ns" << endl;
}
void release_consume_order() {
cout << "release_consume_order: " << endl;
atomic<int*> ptr;
int v;
thread producer([&]() {
int* p = new int(42);
v = 1024;
ptr.store(p, memory_order_release);
});
thread consumer([&]() {
int* p;
while(!(p = ptr.load(memory_order_consume)));
cout << "p: " << *p << endl;
cout << "v: " << v << endl;
});
producer.join();
consumer.join();
}
void release_acquire_order() {
cout << "release_acquire_order: " << endl;
int v;
atomic<int> flag = {0};
thread release([&]() {
v = 42;
flag.store(1, memory_order_release);
});
thread acqrel([&]() {
int expected = 1; // must before compare_exchange_strong
while(!flag.compare_exchange_strong(expected, 2, memory_order_acq_rel)) {
expected = 1; // must after compare_exchange_strong
}
// flag has changed to 2
});
thread acquire([&]() {
while(flag.load(memory_order_acquire) < 2);
cout << "v: " << v << endl; // must be 42
});
release.join();
acqrel.join();
acquire.join();
}
void sequential_consistent_order() {
cout << "sequential_consistent_order: " << endl;
vector<thread> vt;
for (int i = 0; i < N; ++i) {
vt.emplace_back([](){
counter.fetch_add(1, memory_order_seq_cst);
});
}
auto t1 = high_resolution_clock::now();
for (auto& t : vt) {
t.join();
}
auto t2 = high_resolution_clock::now();
auto duration = ( t2 - t1 ).count();
cout << "sequential consistent speed: " << duration / N << "ns" << endl;
}
int main() {
relaxed_order();
release_consume_order();
release_acquire_order();
sequential_consistent_order();
return 0;
}

View File

@@ -1,21 +0,0 @@
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
std::atomic<int> counter = {0};
int main() {
std::vector<std::thread> vt;
for (int i = 0; i < 100; ++i) {
vt.emplace_back([](){
counter.fetch_add(1, std::memory_order_relaxed);
});
}
for (auto& t : vt) {
t.join();
}
std::cout << "current counter:" << counter << std::endl;
return 0;
}

16
exercises/7/7.1/Makefile Normal file
View File

@@ -0,0 +1,16 @@
#
# Makefile
#
# exercise solution 7.1 - chapter 7
# modern cpp tutorial
#
# created by changkun at changkun.de/modern-cpp
#
all: $(patsubst %.cpp, %.out, $(wildcard *.cpp))
%.out: %.cpp Makefile
clang++ $< -o $@ -std=c++2a -pedantic
clean:
rm *.out

44
exercises/7/7.2.mutex.cpp Normal file
View File

@@ -0,0 +1,44 @@
#include <atomic>
#include <thread>
#include <iostream>
class mutex {
std::atomic<bool> flag{false};
public:
void lock()
{
while (flag.exchange(true, std::memory_order_relaxed));
std::atomic_thread_fence(std::memory_order_acquire);
}
void unlock()
{
std::atomic_thread_fence(std::memory_order_release);
flag.store(false, std::memory_order_relaxed);
}
};
int a = 0;
int main() {
mutex mtx_a;
std::thread t1([&](){
mtx_a.lock();
a += 1;
mtx_a.unlock();
});
std::thread t2([&](){
mtx_a.lock();
a += 2;
mtx_a.unlock();
});
t1.join();
t2.join();
std::cout << a << std::endl;
return 0;
}