see #2: revision of parallelism and concurrency

This commit is contained in:
Changkun Ou
2019-07-16 14:38:40 +02:00
parent 864ef221e6
commit 5aff35b0f7
11 changed files with 359 additions and 202 deletions

View File

@@ -78,12 +78,12 @@
+ `std::regex_match` + `std::regex_match`
+ `std::match_results` + `std::match_results`
- [**Chapter 07 Parallelism and Concurrency**](./07-thread.md) - [**Chapter 07 Parallelism and Concurrency**](./07-thread.md)
+ 7.1 `std::thread` + 7.1 Basic of Parallelism
+ 7.2 `std::mutex` and `std::unique_lock` + 7.2 Mutex and Critical Section
+ 7.3 `std::future` and `std::packaged_task` + 7.3 Futures
+ 7.4 `std::condition_variable` + 7.4 Condition Variable
+ 7.5 `std::atomic` and memory order + 7.5 Atomic Operation and Memory Order
+ 7.6 Transactional memory + 7.6 Transactional Memory
+ 7.7 Coroutine + 7.7 Coroutine
- [**Chapter 08 File System**](./08-filesystem.md) - [**Chapter 08 File System**](./08-filesystem.md)
+ 8.1 Documents and links + 8.1 Documents and links

View File

@@ -6,54 +6,78 @@ order: 7
# 第 7 章 并行与并发 # 第 7 章 并行与并发
> 内容修订中
[TOC] [TOC]
## 7.1 线程与并行 ## 7.1 线程与并行
### std::thread ### std::thread
`std::thread` 用于创建一个执行的线程实例,所以它是一切并发编程的基础,使用时需要包含 `<thread>` 头文件,它提供了很多基本的线程操作,例如`get_id()`来获取所创建线程的线程 ID例如使用 `join()` 来加入一个线程等等,例如: `std::thread` 用于创建一个执行的线程实例,所以它是一切并发编程的基础,使用时需要包含 `<thread>` 头文件,
它提供了很多基本的线程操作,例如 `get_id()` 来获取所创建线程的线程 ID
例如使用 `join()` 来加入一个线程等等,例如:
```cpp ```cpp
#include <iostream> #include <iostream>
#include <thread> #include <thread>
void foo() {
std::cout << "hello world" << std::endl;
}
int main() { int main() {
std::thread t(foo); std::thread t([](){
std::cout << "hello world." << std::endl;
});
t.join(); t.join();
return 0; return 0;
} }
``` ```
## 7.2 std::mutex, std::unique\_lock ## 7.2 互斥量与临界区
我们在操作系统的相关知识中已经了解过了有关并发技术的基本知识mutex 就是其中的核心之一。C++11引入了 mutex 相关的类,其所有相关的函数都放在 `<mutex>` 头文件中。 我们在操作系统、亦或是数据库的相关知识中已经了解过了有关并发技术的基本知识,`mutex` 就是其中的核心之一。
C++11 引入了 `mutex` 相关的类,其所有相关的函数都放在 `<mutex>` 头文件中。
`std::mutex` 是 C++11 中最基本的 `mutex` 类,通过实例化 `std::mutex` 可以创建互斥量,而通过其成员函数 `lock()` 可以进行上锁,`unlock()` 可以进行解锁。但是在在实际编写代码的过程中,最好不去直接调用成员函数,因为调用成员函数就需要在每个临界区的出口处调用 `unlock()`,当然,还包括异常。这时候 C++11 还为互斥量提供了一个 RAII 语法的模板类`std::lock_gurad`。RAII 在不失代码简洁性的同时,很好的保证了代码的异常安全性。 `std::mutex` 是 C++11 中最基本的 `mutex` 类,通过实例化 `std::mutex` 可以创建互斥量,
而通过其成员函数 `lock()` 可以进行上锁,`unlock()` 可以进行解锁。
但是在在实际编写代码的过程中,最好不去直接调用成员函数,
因为调用成员函数就需要在每个临界区的出口处调用 `unlock()`,当然,还包括异常。
这时候 C++11 还为互斥量提供了一个 RAII 语法的模板类 `std::lock_gurad`
RAII 在不失代码简洁性的同时,很好的保证了代码的异常安全性。
在 RAII 用法下,对于临界区的互斥量的创建只需要在作用域的开始部分,例如: 在 RAII 用法下,对于临界区的互斥量的创建只需要在作用域的开始部分,例如:
```cpp ```cpp
void some_operation(const std::string &message) { #include <iostream>
static std::mutex mutex; #include <thread>
std::lock_guard<std::mutex> lock(mutex);
// ...操作 int v = 1;
// 当离开这个作用域的时候互斥锁会被析构同时unlock互斥锁 void critical_section(int change_v) {
// 因此这个函数内部的可以认为是临界区 static std::mutex mtx;
std::lock_guard<std::mutex> lock(mtx);
// 执行竞争操作
v = change_v;
// 离开此作用域后 mtx 会被释放
}
int main() {
std::thread t1(critical_section, 2), t2(critical_section, 3);
t1.join();
t2.join();
std::cout << v << std::endl;
return 0;
} }
``` ```
由于 C++保证了所有栈对象在声明周期结束时会被销毁,所以这样的代码也是异常安全的。无论 `some_operation()` 正常返回、还是在中途抛出异常,都会引发堆栈回退,也就自动调用了 `unlock()` 由于 C++ 保证了所有栈对象在声明周期结束时会被销毁,所以这样的代码也是异常安全的。
无论 `critical_section()` 正常返回、还是在中途抛出异常,都会引发堆栈回退,也就自动调用了 `unlock()`
`std::unique_lock` 则相对于 `std::lock_guard` 出现的,`std::unique_lock` 更加灵活,`std::unique_lock` 的对象会以独占所有权(没有其他的 `unique_lock` 对象同时拥有某个 `mutex` 对象的所有权)的方式管理 `mutex` 对象上的上锁和解锁的操作。所以在并发编程中,推荐使用 `std::unique_lock` `std::unique_lock` 则相对于 `std::lock_guard` 出现的,`std::unique_lock` 更加灵活,
`std::unique_lock` 的对象会以独占所有权(没有其他的 `unique_lock` 对象同时拥有某个 `mutex` 对象的所有权)
的方式管理 `mutex` 对象上的上锁和解锁的操作。所以在并发编程中,推荐使用 `std::unique_lock`
`std::lock_guard` 不能显式的调用 `lock``unlock``std::unique_lock` 可以在声明后的任意位置调用 可以缩小锁的作用范围,提供更高的并发度。 `std::lock_guard` 不能显式的调用 `lock``unlock``std::unique_lock` 可以在声明后的任意位置调用
可以缩小锁的作用范围,提供更高的并发度。
如果你用到了条件变量 `std::condition_variable::wait` 则必须使用 `std::unique_lock` 作为参数。 如果你用到了条件变量 `std::condition_variable::wait` 则必须使用 `std::unique_lock` 作为参数。
@@ -62,121 +86,156 @@ void some_operation(const std::string &message) {
```cpp ```cpp
#include <iostream> #include <iostream>
#include <thread> #include <thread>
#include <mutex>
std::mutex mtx; int v = 1;
void block_area() { void critical_section(int change_v) {
static std::mutex mtx;
std::unique_lock<std::mutex> lock(mtx); std::unique_lock<std::mutex> lock(mtx);
//...临界区 // 执行竞争操作
v = change_v;
std::cout << v << std::endl;
// 将锁进行释放
lock.unlock(); lock.unlock();
   //...some other code
lock.lock(); // can lock again // 在此期间,任何人都可以抢夺 v 的持有权
// 开始另一组竞争操作,再次加锁
lock.lock();
v += 1;
std::cout << v << std::endl;
} }
int main() { int main() {
std::thread thd1(block_area); std::thread t1(critical_section, 2), t2(critical_section, 3);
t1.join();
thd1.join(); t2.join();
return 0; return 0;
} }
``` ```
## 7.3 std::future, std::packaged\_task ## 7.3 期物
`std::future` 则是提供了一个访问异步操作结果的途径,这句话很不好理解。为了理解这个特性,我们需要先理解一下在 C++11之前的多线程行为。 期物Future表现为 `std::future`,它提供了一个访问异步操作结果的途径,这句话很不好理解。
为了理解这个特性,我们需要先理解一下在 C++11 之前的多线程行为。
试想,如果我们的主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务,并返回我一个结果。而这时候,线程 A 可能正在忙其他的事情,无暇顾及 B 的结果,所以我们会很自然的希望能够在某个特定的时间获得线程 B 的结果。 试想,如果我们的主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务,并返回我一个结果。
而这时候,线程 A 可能正在忙其他的事情,无暇顾及 B 的结果,
所以我们会很自然的希望能够在某个特定的时间获得线程 B 的结果。
在 C++11 的 `std::future` 被引入之前,通常的做法是:创建一个线程A在线程A里启动任务 B当准备完毕后发送一个事件并将结果保存在全局变量中。而主函数线程 A 里正在做其他的事情,当需要结果的时候,调用一个线程等待函数来获得执行的结果。 在 C++11 的 `std::future` 被引入之前,通常的做法是:
创建一个线程 A在线程 A 里启动任务 B当准备完毕后发送一个事件并将结果保存在全局变量中。
而主函数线程 A 里正在做其他的事情,当需要结果的时候,调用一个线程等待函数来获得执行的结果。
而 C++11 提供的 `std::future` 简化了这个流程,可以用来获取异步任务的结果。自然地,我们很容易能够想象到把它作为一种简单的线程同步手段。 而 C++11 提供的 `std::future` 简化了这个流程,可以用来获取异步任务的结果。
自然地我们很容易能够想象到把它作为一种简单的线程同步手段即屏障barrier
此外,`std::packaged_task` 可以用来封装任何可以调用的目标,从而用于实现异步的调用。例如: 为了看一个例子,我们这里额外使用 `std::packaged_task`,它可以用来封装任何可以调用的目标,从而用于实现异步的调用。
举例来说:
```cpp ```cpp
#include <iostream> #include <iostream>
#include <future> #include <future>
#include <thread> #include <thread>
int main() int main() {
{
// 将一个返回值为7的 lambda 表达式封装到 task 中 // 将一个返回值为7的 lambda 表达式封装到 task 中
// std::packaged_task 的模板参数为要封装函数的类型 // std::packaged_task 的模板参数为要封装函数的类型
std::packaged_task<int()> task([](){return 7;}); std::packaged_task<int()> task([](){return 7;});
// 获得 task 的 future // 获得 task 的期物
std::future<int> result = task.get_future(); // 在一个线程中执行 task std::future<int> result = task.get_future(); // 在一个线程中执行 task
std::thread(std::move(task)).detach(); std::cout << "Waiting..."; std::thread(std::move(task)).detach();
result.wait(); std::cout << "waiting...";
result.wait(); // 在此设置屏障,阻塞到期物的完成
// 输出执行结果 // 输出执行结果
std::cout << "Done!" << std:: endl << "Result is " << result.get() << '\n'; std::cout << "done!" << std:: endl << "future result is " << result.get() << std::endl;
return 0;
} }
``` ```
在封装好要调用的目标后,可以使用 `get_future()` 来获得一个 `std::future` 对象,以便之后实施线程同步。 在封装好要调用的目标后,可以使用 `get_future()` 来获得一个 `std::future` 对象,以便之后实施线程同步。
## 7.4 std::condition_variable ## 7.4 条件变量
`std::condition_variable` 是为了解决死锁而生的。当互斥操作不够用而引入的。比如,线程可能需要等待某个条件为真才能继续执行,而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时,就会发生死锁。所以,`condition_variable` 实例被创建出现主要就是用于唤醒等待线程从而避免死锁。`std::condition_variable``notify_one()` 用于唤醒一个线程;`notify_all()` 则是通知所有线程。下面是一个生产者和消费者模型的例子: 条件变量 `std::condition_variable` 是为了解决死锁而生当互斥操作不够用而引入的。
比如,线程可能需要等待某个条件为真才能继续执行,
而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时,就会发生死锁。
所以,`condition_variable` 实例被创建出现主要就是用于唤醒等待线程从而避免死锁。
`std::condition_variable``notify_one()` 用于唤醒一个线程;
`notify_all()` 则是通知所有线程。下面是一个生产者和消费者模型的例子:
```cpp ```cpp
#include <condition_variable> #include <queue>
#include <chrono>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#include <queue> #include <condition_variable>
#include <chrono>
int main()
{ int main() {
// 生产者数量
std::queue<int> produced_nums; std::queue<int> produced_nums;
// 互斥锁 std::mutex mtx;
std::mutex m; std::condition_variable cv;
// 条件变量 bool notified = false; // 通知信号
std::condition_variable cond_var;
// 结束标志
bool done = false;
// 通知标志
bool notified = false;
// 生产者线程 // 生产者
std::thread producer([&]() { auto producer = [&]() {
for (int i = 0; i < 5; ++i) { for (int i = 0; ; i++) {
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(900));
// 创建互斥锁 std::unique_lock<std::mutex> lock(mtx);
std::unique_lock<std::mutex> lock(m); std::cout << "producing " << i << std::endl;
std::cout << "producing " << i << '\n';
produced_nums.push(i); produced_nums.push(i);
notified = true; notified = true;
// 通知一个线程 cv.notify_all(); // 此处也可以使用 notify_one
cond_var.notify_one();
} }
done = true; };
notified = true; // 消费者
cond_var.notify_one(); auto consumer = [&]() {
}); while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 消费者线程 while (!notified) { // avoid spurious wakeup
std::thread consumer([&]() { cv.wait(lock);
std::unique_lock<std::mutex> lock(m);
while (!done) {
while (!notified) { // 循环避免虚假唤醒
cond_var.wait(lock);
} }
// 短暂取消锁,使得生产者有机会在消费者消费空前继续生产
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // 消费者慢于生产者
lock.lock();
while (!produced_nums.empty()) { while (!produced_nums.empty()) {
std::cout << "consuming " << produced_nums.front() << '\n'; std::cout << "consuming " << produced_nums.front() << std::endl;
produced_nums.pop(); produced_nums.pop();
} }
notified = false; notified = false;
} }
}); };
producer.join(); // 分别在不同的线程中运行
consumer.join(); std::thread p(producer);
std::thread cs[2];
for (int i = 0; i < 2; ++i) {
cs[i] = std::thread(consumer);
}
p.join();
for (int i = 0; i < 2; ++i) {
cs[i].join();
}
return 0;
} }
``` ```
值得一提的是,在生产者中我们虽然可以使用 `notify_one()`,但实际上并不建议在此处使用,
因为在多消费者的情况下,我们的消费者实现中简单放弃了锁的持有,这使得可能让其他消费者
争夺此锁,从而更好的利用多个消费者之间的并发。话虽如此,但实际上因为 `std::mutex` 的排他性,
我们根本无法期待多个消费者能真正意义上的并行消费队列的中生产的内容,我们仍需要粒度更细的手段。
## 7.5 原子操作与内存一致性
## 7.6 事务内存
## 总结 ## 总结
C++11 语言层提供了并发编程的相关支持,本节简单的介绍了 `std::thread`/`std::mutex`/`std::future` 这些并发编程中不可回避的重要工具。 C++11 语言层提供了并发编程的相关支持,本节简单的介绍了 `std::thread`/`std::mutex`/`std::future` 这些并发编程中不可回避的重要工具。

View File

@@ -78,13 +78,12 @@
+ `std::regex_match` + `std::regex_match`
+ `std::match_results` + `std::match_results`
- [**第 7 章 并行与并发**](./07-thread.md) - [**第 7 章 并行与并发**](./07-thread.md)
+ 7.1 `std::thread` + 7.1 并发基础
+ 7.2 `std::mutex``std::unique_lock` + 7.2 互斥量与临界区
+ 7.3 `std::future``std::packaged_task` + 7.3 期物
+ 7.4 `std::condition_variable` + 7.4 条件变量
+ 7.5 `std::atomic` 与内存顺序 + 7.5 原子操作与内存一致性
+ 7.6 事务内存 + 7.6 事务内存
+ 7.7 协程
- [**第 8 章 文件系统**](./08-filesystem.md) - [**第 8 章 文件系统**](./08-filesystem.md)
+ 8.1 文档与链接 + 8.1 文档与链接
+ 8.2 `std::filesystem` + 8.2 `std::filesystem`

View File

@@ -1,37 +0,0 @@
//
// 7.1.cpp
// modern c++ tutorial
//
// created by changkun at changkun.de
// https://github.com/changkun/modern-cpp-tutorial
//
// 线程支持库
#include <iostream>
#include <future>
#include <thread>
void foo() {
std::cout << "hello world" << std::endl;
}
void foo2() {
// 将一个返回值为7的 lambda 表达式封装到 task 中
// std::packaged_task 的模板参数为要封装函数的类型
std::packaged_task<int()> task([](){return 7;});
// 获得 task 的 future
std::future<int> result = task.get_future(); // 在一个线程中执行 task
std::thread(std::move(task)).detach(); std::cout << "Waiting...";
result.wait();
// 输出执行结果
std::cout << "Done!" << std:: endl << "Result is " << result.get() << '\n';
}
int main() {
std::thread t(foo);
foo2();
t.join();
return 0;
}

View File

@@ -0,0 +1,19 @@
//
// 7.1.thread.basic.cpp
// chapter 7 parallelism and concurrency
// modern c++ tutorial
//
// created by changkun at changkun.de
// https://github.com/changkun/modern-cpp-tutorial
//
#include <iostream>
#include <thread>
int main() {
std::thread t([](){
std::cout << "hello world." << std::endl;
});
t.join();
return 0;
}

View File

@@ -1,63 +0,0 @@
//
// 7.2.cpp
// modern c++ tutorial
//
// created by changkun at changkun.de
// https://github.com/changkun/modern-cpp-tutorial
//
// 生产者消费者模型
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <chrono>
int main()
{
// 生产者数量
std::queue<int> produced_nums;
// 互斥锁
std::mutex m;
// 条件变量
std::condition_variable cond_var;
// 结束标志
bool done = false;
// 通知标志
bool notified = false;
// 生产者线程
std::thread producer([&]() {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
// 创建互斥锁
std::unique_lock<std::mutex> lock(m);
std::cout << "producing " << i << '\n';
produced_nums.push(i);
notified = true;
// 通知一个线程
cond_var.notify_one();
}
done = true;
notified = true;
cond_var.notify_one();
});
// 消费者线程
std::thread consumer([&]() {
std::unique_lock<std::mutex> lock(m);
while (!done) {
while (!notified) { // 循环避免虚假唤醒
cond_var.wait(lock);
}
while (!produced_nums.empty()) {
std::cout << "consuming " << produced_nums.front() << '\n';
produced_nums.pop();
}
notified = false;
}
});
producer.join();
consumer.join();
}

View File

@@ -0,0 +1,32 @@
//
// 7.2.critical.section.a.cpp
// chapter 7 parallelism and concurrency
// modern c++ tutorial
//
// created by changkun at changkun.de
// https://github.com/changkun/modern-cpp-tutorial
//
#include <iostream>
#include <thread>
int v = 1;
void critical_section(int change_v) {
static std::mutex mtx;
std::lock_guard<std::mutex> lock(mtx);
// do contention operations
v = change_v;
// mtx will be destructed when exit this region
}
int main() {
std::thread t1(critical_section, 2), t2(critical_section, 3);
t1.join();
t2.join();
std::cout << v << std::endl;
return 0;
}

View File

@@ -0,0 +1,39 @@
//
// 7.3.critical.section.b.cpp
// chapter 7 parallelism and concurrency
// modern c++ tutorial
//
// created by changkun at changkun.de
// https://github.com/changkun/modern-cpp-tutorial
//
#include <iostream>
#include <thread>
int v = 1;
void critical_section(int change_v) {
static std::mutex mtx;
std::unique_lock<std::mutex> lock(mtx);
// do contention operations
v = change_v;
std::cout << v << std::endl;
// release the lock
lock.unlock();
// during this period,
// others are allowed to acquire v
// start another group of contention operations
// lock again
lock.lock();
v += 1;
std::cout << v << std::endl;
}
int main() {
std::thread t1(critical_section, 2), t2(critical_section, 3);
t1.join();
t2.join();
return 0;
}

31
code/7/7.4.futures.cpp Normal file
View File

@@ -0,0 +1,31 @@
//
// 7.2.critical.section.a.cpp
// chapter 7 parallelism and concurrency
// modern c++ tutorial
//
// created by changkun at changkun.de
// https://github.com/changkun/modern-cpp-tutorial
//
#include <iostream>
#include <thread>
#include <iostream>
#include <future>
#include <thread>
void foo() {
}
int main() {
// pack a lambda expression that returns 7 into a std::packaged_task
std::packaged_task<int()> task([](){return 7;});
// get the future of task
std::future<int> result = task.get_future(); // run task in a thread
std::thread(std::move(task)).detach();
std::cout << "waiting...";
result.wait(); // block until future has arrived
// output result
std::cout << "done!" << std:: endl << "future result is " << result.get() << std::endl;
return 0;
}

View File

@@ -0,0 +1,64 @@
//
// 7.5.cpp
// chapter 7 parallelism and concurrency
// modern c++ tutorial
//
// created by changkun at changkun.de
// https://github.com/changkun/modern-cpp-tutorial
//
#include <queue>
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
int main() {
std::queue<int> produced_nums;
std::mutex mtx;
std::condition_variable cv;
bool notified = false; // notification sign
auto producer = [&]() {
for (int i = 0; ; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::unique_lock<std::mutex> lock(mtx);
std::cout << "producing " << i << std::endl;
produced_nums.push(i);
notified = true;
cv.notify_all();
}
};
auto consumer = [&]() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
while (!notified) { // avoid spurious wakeup
cv.wait(lock);
}
// temporal unlock to allow producer produces more rather than
// let consumer hold the lock until its consumed.
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // consumer is slower
lock.lock();
if (!produced_nums.empty()) {
std::cout << "consuming " << produced_nums.front() << std::endl;
produced_nums.pop();
}
notified = false;
}
};
std::thread p(producer);
std::thread cs[2];
for (int i = 0; i < 2; ++i) {
cs[i] = std::thread(consumer);
}
p.join();
for (int i = 0; i < 2; ++i) {
cs[i].join();
}
return 0;
}

14
code/7/Makefile Normal file
View File

@@ -0,0 +1,14 @@
#
# modern cpp tutorial
#
# created by changkun at changkun.de
# https://github.com/changkun/modern-cpp-tutorial
#
all: $(patsubst %.cpp, %.out, $(wildcard *.cpp))
%.out: %.cpp Makefile
clang++ $< -o $@ -std=c++2a -pedantic
clean:
rm *.out