C++ 多线程编程精要

CPU 亲和性

  Linux 可以运行在多处理器的机器上,为了维持多个CPU之间的负载均衡,线程可能会被OS调度到其它CPU上,这种情况下线程就无法利用原先CPU上边的缓存了,也就降低了CPU cache的命中率了。所谓的CPU亲和性,就是让线程在指定的CPU上长时间运行而不被调度到其它CPU上边,以提高CPU cache的命中率。
  在Linux中,可以使用pthread_setaffinity_np()为线程设置CPU的亲和性:

1
2
int pthread_setaffinity_np(pthread_t thread, size_t cpusetsize,
const cpu_set_t *cpuset);

  注意到,它的第一个参数类型是pthread_t,代表 Linux 线程的ID。在C++中,每个std::thread都对应着底层操作系统的一个线程,不过我们可以使用std::threadnative_handle()函数来返回它对应的 OS 线程的ID,在Linux系统中,这个函数的返回值类型是pthread_t
  下面的例子,我们让线程绑定到 CPU-0 上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <thread>
#include <chrono>
#include <sched.h>
int main()
{
std::thread t([]() {
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Thread #" << std::this_thread::get_id()
<< ": on CPU" << sched_getcpu() << std::endl;
}
});
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
pthread_setaffinity_np(t.native_handle(), sizeof(cpu_set_t), &cpuset);
t.join();
return 0;
}

改善 std::thread

  在工程实践中,你会发现std::thread并不像想象中的那么完美。一个明显的问题是,当std::thread即将析构时,你要人为地确保实际的线程已经结束运行了或者已经分离了(可以分别调用它的join()detach()成员函数),否则程序将会抛出std::terminate异常。当然,由于std::thread的析构函数没有保证正确地释放资源,这也同时导致了std::thread不是异常安全的。
  下面提供了一个ThreadRAII类,它在析构时会等待线程运行结束,或者直接分离线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ThreadRAII
{
public:
enum class DtorAction {join, detach};
template<typename... Ts>
ThreadRAII(DtorAction a, Ts&&... params)
: action_(a), thread_(std::forward<Ts>(params)...)
{
}
ThreadRAII(ThreadRAII &&) = default;
ThreadRAII &operator=(ThreadRAII &&) = default;
~ThreadRAII()
{
if (thread_.joinable()) {
if (action_ == DtorAction::join) {
thread_.join();
} else {
thread_.detach();
}
}
}
std::thread &get() { return thread_; }
private:
DtorAction action_;
std::thread thread_;
};

  创建ThreadRAII对象时,可以顺带指定它的析构行为,到底是等待线程运行结束呢还是直接分离线程。下面的例子中,程序退出时,ThreadRAII对象的析构函数会保证线程正常结束运行:

1
2
3
4
5
6
7
void sayHello() { cout << "Hello" << endl; }
int main()
{
ThreadRAII t{ThreadRAII::DtorAction::join, sayHello};
return 0;
}

任务并发

  除了std::thread,C++标准库还引入了std::async,提供了所谓的基于任务的并发,也可以说std::async在某些方面弥补std::thread的缺陷。std::thread一个恼人的地方是,你没办法直接地拿到任务的返回值,而必须借助一些迂回的方式:

1
2
3
4
5
6
7
8
void sum(const vector<int> &values, int *result)
{
*result = std::accumulate(values.cbegin(), values.cend(), 0);
}
int results = 0;
vector<int> values = {1, 2, 3, 4, 5};
thread t(sum, values, &results);
t.join();

  而std::async会返回一个std::future对象,通过这个对象就可以获得任务的返回值了,可以看到std::async让代码变得简洁得多了:

1
2
3
4
5
6
7
8
int sum(const vector<int> &values)
{
return std::accumulate(values.cbegin(), values.cend(), 0);
}
int results = 0;
vector<int> values = {1, 2, 3, 4, 5};
auto fut = std::async(sum, values);
cout << "results: " << fut.get() << endl;

理解 std::async 的行为

  当std::async启动一个任务时,它到底会不会创建一个新的线程呢?答案取决于你是怎样使用std::async的,当你把任务交给std::async去执行的时候,可以同时指定任务的启动方式:

  • std::launch::async表示异步执行,也就是说把任务放到新的线程中并且立即执行。
  • std::launch::deferred意味着任务会推迟执行,也就是说只有当你调用了std::future对象的get()wait()方法时,任务才会执行,这时std::async会把任务放到当前线程中执行。

  假设我们没有指定使用哪种启动方式,那么std::async会自动在这两种方式中选择一种,这种情况下,我们就无法知道任务到底会不会在新线程中执行了。然而这种不确定性一不小心就会给代码引入 bug,并且这种 bug 很难重现。例如,使用线程局部存储时,就可能遇到不可思议的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
thread_local int val = 0;
int getVal()
{
return val;
}
int main()
{
val = 100;
auto fut = std::async(getVal);
std::cout << fut.get() << std::endl;
return 0;
}

  任务的返回结果是不可预测的,因为它返回变量val的值。如果std::async采用的是std::launch::async方式,任务将返回100,否则将返回0,这使得代码中任何依赖这个结果的地方都变得不可确定了,很容易引入难以检测的bug。
  一种更好的做法是,使用std::async时总是让它异步执行,这可以减少不必要的麻烦,可以用一个函数来封装这个行为:

1
2
3
4
5
6
7
template <typename F, typename... Ts>
inline auto reallyAsync(F&& f, Ts&&... params)
{
return std::async(std::launch::async,
std::forward<F>(f),
std::forward<Ts>(params)...);
}

参考资料