C++ 并发编程入门

本文介绍 Modern C++ 标准库提供的关于多线程编程的工具。如标题所示,本文定位是入门,不讨论内存序、无锁编程等并发编程的高级主题。本文内容主要参考《C++ 并发编程实战(第二版)》一书,对并发编程感兴趣推荐阅读原书。

1. 线程

C++ 中使用线程的方式非常简单,只需要包含 thread 头文件,然后向 thread 构造函数传入可调用函数和实参即可。一些示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <functional>
#include <iostream>
#include <thread>

void hello() { std::cout << "Hello Concurrent World!\n"; }

struct HelloThread {
void operator()() const { std::cout << "Hello Thread!" << std::endl; }
};

void bind_sample(int a, int b) { std::cout << a << " + " << b << " = " << a + b << std::endl; }

void increase(int &i) { ++i; }

class AddTen {
public:
void add(int value) { std::cout << value << " + " << ten << " = " << value + ten << std::endl; }

private:
int ten{10};
};

int main() {
std::thread t1(hello); // 无参函数
t1.join();

std::thread t2(HelloThread{}); // 可调用对象
t2.join();

std::thread t3([]() { std::cout << "Hello Lambda!" << std::endl; }); // lambda 表达式
t3.join();

std::thread t4(bind_sample, 1, 2); // 有参函数
t4.join();

auto f = std::bind(bind_sample, std::placeholders::_1, 4);
std::thread t5(f, 5); // bind
t5.join();

int i = 0;
std::thread t6(increase, std::ref(i)); // 传入引用类型参数
t6.join();
std::cout << "i = " << i << std::endl;

AddTen obj;
std::thread t7(&AddTen::add, &obj, 1); // 传入对象调用指定函数
t7.join();
return 0;
}

运行结果如下:

1
2
3
4
5
6
7
Hello Concurrent World!
Hello Thread!
Hello Lambda!
1 + 2 = 3
5 + 4 = 9
i = 1
1 + 10 = 11

上面的代码示例了如何传入各种类型的函数、可调用对象等构造 thread ,以在单独的线程中运行任务。需要注意的是,传入的参数默认是进行拷贝的,因此,如果函数形参类型是引用,需要使用 std::ref() 以指定引用语义,如上述代码 41 行所示。

线程构造完成后,即自动在新线程中运行指定的任务。此时需要使用 t.join() 进行汇合,表示等待线程执行完成。也可以使用 t.detach() 表示不等待线程执行完成。同时 thread 对象是不可拷贝的,只具有移动语义。可以使用 vector 等容器管理线程对象。

2. 锁

当多个线程并发读写某个数据时,可能会出现错误。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <thread>

int n = 0;
constexpr int total = 100000;

void add() {
for (int i = 0; i < total; ++i) {
++n;
}
}

int main() {
std::thread t1(add);
std::thread t2(add);
t1.join();
t2.join();

std::cout << n << std::endl;
return 0;
}

使用 g++ xxx.cpp 编译上述源文件(不使用任何编译优化选项)并多次运行可执行文件,可以发现每次程序的输出都不确定:

1
2
3
4
5
6
7
8
$ ./a.out 
112924
$ ./a.out
188328
$ ./a.out
135646
$ ./a.out
131277

输出并不是预期的 200000 ,而似乎是随机数。这是因为多个线程并发读写同一个变量(数据竞争),导致了不合理的数据状态。解决这一问题的最简单方法就是加锁。

C++ 中锁的使用非常简单,只需要包含 mutex 头文件,然后在进入临界区之前调用 lock 函数;在离开临界区时调用 unlock 函数,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <mutex>
#include <thread>

int n = 0;
std::mutex m;
constexpr int total = 100000;

void add() {
for (int i = 0; i < total; ++i) {
m.lock(); // 进入临界区
++n;
m.unlock(); // 离开临界区
}
}

除了 mutex 类外,C++ 还提供了 shared_mutex,也即读写锁,可用于读多写少的场景,以提高并发度。

上述基于锁的写法虽然解决了数据竞争的问题,但是必须保证对 lockunlock 方法配对调用,否则就会导致锁一直被某个线程占用,阻塞其它线程。C++ 中提倡的写法是基于 RAII 的锁管理,包括 lock_guardunique_lockshared_lock。这些类是一个包装类,可以指定这些类在构造函数中调用 lock 方法,在析构函数中自动调用 unlock 方法对锁进行管理。这样,程序员就不需要在每个可能的退出分支中调用 unlock 函数,降低出现 bug 的概率。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>
#include <mutex>
#include <thread>

int n = 0;
std::mutex m;
constexpr int total = 100000;

void add() {
for (int i = 0; i < total; ++i) {
std::lock_guard lock(m); // RAII 管理锁
// 或者 std::unique_lock lock(m);
// 或者 std::scoped_lock lock(m);
++n;
}
}

当需要对多个锁进行加锁时,为了防止死锁,需要按照相同的顺序进行加锁。C++ 提供了 std::lock 函数以解决这一问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <mutex>
#include <thread>

int n = 0;
std::mutex m1;
std::mutex m2;
constexpr int total = 100000;

void add() {
for (int i = 0; i < total; ++i) {
std::unique_lock lock1(m1, std::defer_lock); // std::defer_lock 表示推迟上锁
std::unique_lock lock2(m2, std::defer_lock);
std::lock(lock1, lock2); // 按固定顺序上锁 避免死锁
++n;
}
}

3. call_once

在实际项目中有一个常见的需求:某个对象只需要构造一次,但是其消耗的资源较大,我们希望只有在真的需要用到它时在进行构造。这一需求类似于线程安全的单例模式。在此我们不考虑其究竟有几种并发安全的写法,而只介绍一种实用的、C++ 语言也支持的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

void heavyInit() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Resource prepared!" << std::endl;
}

std::once_flag work_once_flag;

void work() {
std::call_once(work_once_flag, heavyInit); // 保证 heavyInit 函数只被调用一次
// do work using the prepared resource
}

int main() {
int workers = 10;
std::vector<std::thread> threads;
for (int i = 0; i < workers; ++i) {
threads.emplace_back(work);
}

for (auto &t : threads) {
t.join();
}
return 0;
}

代码非常简单,只需要包含 mutex 头文件,定义 std::call_once_flag 类型变量,并使用 std::call_once 方法,传入对应的 std::call_once_flag 和方法名和参数,即可保证某方法只被多个线程调用一次,且保证并发安全。

4. 条件变量

在实际编程中,我们经常遇到一种情况:一个线程准备数据;另一个线程等待数据,待数据准备好后对其进行处理。如果仅使用上述已经介绍的知识,我们可以写出类似下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

bool ok = false;
std::mutex m;

void provide() {
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // prepare data
std::unique_lock lock(m); // 加锁以保证写 ok 的并发安全性
ok = true;
lock.unlock();
}

void consume() {
while (true) {
std::unique_lock lock(m); // 加锁以保证读取到正确的 ok 值
if (ok) {
break;
}
lock.unlock();
std::cout << "comsume thread waked without data..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// data prepared now, handle it
}

int main() {
std::thread t1(provide);
std::thread t2(consume);
t1.join();
t2.join();
return 0;
}

核心思路是:使用一个标记变量,和一个保护该变量的锁。生产数据的线程完成数据生产后,将标记变量赋值为 true;消费线程使用轮询+休眠策略,直到标记变量变为 true 后,进行数据处理。

这么写虽然能够解决问题,但是不够优雅:消费线程不应该使用轮询+休眠策略,而是应该一直处于休眠中,待生产线程准备好数据后,通知消费进程即可,消费线程唤醒进行数据处理工作。实际上,条件变量就是为了解决这个问题,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

std::condition_variable cv;
bool ok = false;
std::mutex m;

void provide() {
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // prepare data
std::unique_lock lock(m); // 加锁以保证写 ok 的并发安全性
ok = true;
cv.notify_one(); // 通知 consume 线程数据已准备好
lock.unlock();
}

void consume() {
std::unique_lock lock(m); // 加锁以保证读取到正确的 ok 值
cv.wait(lock, [&]() { // wait 函数传入 lambda 表达式,表示等待 lambda 表达式条件满足时,唤醒线程
if (!ok) {
std::cout << "comsume thread waked without data..." << std::endl;
}
return ok;
});
// data prepared now, handle it
}

int main() {
std::thread t1(provide);
std::thread t2(consume);
t1.join();
t2.join();
return 0;
}

使用条件变量时,仍需要锁对临界区进行保护。cv.wait 函数传入锁和线程等待的条件,以避免假唤醒。此外,条件变量还有 wait_forwait_until 等接口,能够实现线程的限时等待。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
using namespace std::chrono_literals;

auto one_day = 24h;
auto half_an_hour = 30min;
auto max_time_between_messages = 30ms;

std::condition_variable cv;
bool done;
std::mutex m;

bool wait_loop() {
auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
std::cout << "timer begin..." << std::endl;
std::unique_lock<std::mutex> lk(m);
while (!done) {
if (cv.wait_until(lk, timeout) == std::cv_status::timeout) { // 指定等待时间
break; // 超时 退出循环
}
std::cout << "wake up while timer not out..." << std::endl;
}
std::cout << "done = " << done << std::endl;
return done;
}

void notify() {
for (int i = 0; i < 10; ++i) {
cv.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

int main() {
auto start = std::chrono::high_resolution_clock::now();
std::thread t(notify);
wait_loop();
t.join();
auto stop = std::chrono::high_resolution_clock::now();
std::cout << "do_something() took " << std::chrono::duration<double>(stop - start).count() << " seconds" << std::endl;
return 0;
}

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ ./a.out 
timer begin...
wake up while timer not out...
wake up while timer not out...
wake up while timer not out...
wake up while timer not out...
wake up while timer not out...
wake up while timer not out...
wake up while timer not out...
wake up while timer not out...
wake up while timer not out...
wake up while timer not out...
done = 0
do_something() took 0.500422 seconds

使用条件变量,我们还可以实现并发安全的队列(本段代码主要来自《C++ 并发编程实战(第二版)》一书):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

// 线程安全的队列
template <typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond; // 条件变量

public:
threadsafe_queue() {}

threadsafe_queue(threadsafe_queue const &other) {
std::lock_guard<std::mutex> lk(other.mut); // 拷贝构造函数,对数据源加锁
data_queue = other.data_queue;
}

void push(T new_value) {
std::lock_guard<std::mutex> lk(mut); // 加锁,保证对 data_queue 访问的安全性
data_queue.push(new_value);
data_cond.notify_one(); // 通知调用 wait_and_pop 的线程
}

// 阻塞方法
void wait_and_pop(T &value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); }); // 等待期待的条件:队列非空
value = data_queue.front();
data_queue.pop();
}

// 阻塞方法
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); }); // 等待期待的条件:队列非空
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

// 非阻塞方法
bool try_pop(T &value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) return false; // 队列空,返回 false
value = data_queue.front();
data_queue.pop();
return true;
}

// 非阻塞方法
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty()) return std::shared_ptr<T>(); // 队列空,返回空指针
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

int main() {
std::mutex cout_mutex;

threadsafe_queue<int> q;
std::vector<std::thread> threads;
int n = 10;
for (int i = 0; i < n; ++i) {
if (i % 2 == 0) {
threads.emplace_back(std::thread([&]() {
for (int i = 0; i < 10; ++i) {
q.push(i);
}
}));
} else {
threads.emplace_back(std::thread([&]() {
int value;
for (int i = 0; i < 10; ++i) {
q.wait_and_pop(value);
}
}));
}
}
for (auto &t : threads) {
t.join();
}
std::cout << "q.empty() = " << q.empty() << std::endl;
}

5. 异步任务

除了上述提供的较为低级的并发编程工具,C++ 也提供了一些较为高级的接口,能让我们迅速的使用多线程技术以充分利用多核 CPU 的能力。

C++ 的 future 头文件中提供了 std::async 方法,能够直接让系统开启一个线程执行指定的任务:

1
2
3
4
5
6
7
8
9
10
11
12
#include <future>
#include <iostream>

int find_the_answer_to_ltuae() { return 42; }

void do_other_stuff() {}

int main() {
std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout << "The answer is " << the_answer.get() << std::endl; // get 方法获取异步任务返回的结果
}

运行结果如下:

1
2
$ ./a.out 
The answer is 42

async 可以看作时 thread 的封装,提供了更简单的使用方法供用户使用。因此,调用 async 方法时,和 thread 的构造函数类似,参数传递为拷贝语义,具体说明见下面代码(本段代码主要来自《C++ 并发编程实战(第二版)》一书):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <future>
#include <string>

struct X {
void foo(int, std::string const &);

std::string bar(std::string const &);
};

X x;
auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用 p->foo(42, "hello"),p 是指向 x 的指针
auto f2 = std::async(&X::bar, x, "goodbye"); // 调用 tmpx.bar("goodbye"), tmpx 是 x 的拷贝副本

struct Y {
double operator()(double);
};

Y y;
auto f3 = std::async(Y(), 3.141); // 调用 tmpy(3.141),tmpy 通过 Y 的移动构造函数得到
auto f4 = std::async(std::ref(y), 2.718); // 调用 y(2.718)

X baz(X &);
auto f6 = std::async(baz, std::ref(x)); // 调用 baz(x)

class move_only {
public:
move_only();
move_only(move_only &&);
move_only(move_only const &) = delete;
move_only &operator=(move_only &&);
move_only &operator=(move_only const &) = delete;
void operator()();
};

auto f5 = std::async(move_only()); // 调用 tmp(),tmp 是通过 std::move(move_only()) 构造得到

利用 async 接口,我们能够方便地实现一个多线程版本的快速排序算法(本段代码主要来自《C++ 并发编程实战(第二版)》一书):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <algorithm>
#include <future>
#include <iostream>
#include <list>
#include <thread>

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.size() <= 1) {
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const &pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(), [&](T const &t) { return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
std::future<std::list<T> > new_lower(std::async(&parallel_quick_sort<T>, std::move(lower_part))); // 开辟另一线程
auto new_higher(parallel_quick_sort(std::move(input)));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}

int main() {
std::list<int> lst{-1, 12, 3, 3, 9, 32, 0, -9};
auto sorted_lst = parallel_quick_sort(lst);
for (auto &elem : sorted_lst) {
std::cout << elem << " ";
}
std::cout << std::endl;
return 0;
}

除了 asyncfuture 头文件还提供了 std::promisestd::packaged_task 等高级编程接口,这里不具体介绍,可参考 https://zh.cppreference.com/w/cpp/thread#future

6. 原子类型

除了上述工具,C++ 还提供了相对而言轻量级、高性能的原子类型。对应本文开始的代码,使用原子类型能够在基本不改变原有代码的情况下得到预期的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <atomic>
#include <iostream>
#include <thread>

std::atomic<int> n = 0; // 原子类型
constexpr int total = 100000;

void add() {
for (int i = 0; i < total; ++i) {
++n; // 并发读写安全
}
}

int main() {
std::thread t1(add);
std::thread t2(add);
t1.join();
t2.join();

std::cout << n << std::endl;
return 0;
}

运行结果:

1
2
$ ./a.out 
200000

使用原子类型,也可以方便地解决前文提到的生产者消费者问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>

std::vector<int> data;
std::atomic_bool data_ready(false); // 原子类型

void reader_thread() {
while (!data_ready.load()) { // 并发安全读
std::cout << "not prepared yet" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::cout << "The answer=" << data[0] << "\n";
}

void writer_thread() {
data.push_back(42);
data_ready = true; // 并发安全写
}

int main() {
std::thread t1(reader_thread);
std::thread t2(writer_thread);
t1.join();
t2.join();
return 0;
}

关于原子类型更多信息可参考 https://zh.cppreference.com/w/cpp/atomic/atomic

7. 总结

本文仅仅是关于 C++ 并发编程的最简单介绍,介绍了线程、锁、RAII 管理锁、条件变量、原子类型、异步任务等的使用方法。内存模型、内存序、并行算法库等内容将在以后介绍。


C++ 并发编程入门
https://arcsin2.cloud/2024/04/19/C-并发编程入门/
作者
arcsin2
发布于
2024年4月19日
许可协议