gRPC 的 C++ 动态线程池源码分析

固定线程池

  提到线程池,通常说的都是固定大小的线程池,固定线程池的原理是这样的:

  • 线程池由一个线程安全的队列,以及多个 worker 线程组成。
  • 可以有多个 producer 线程,它们负责提交任务给线程池。
  • 接收到新任务之后,线程池会唤醒某个 worker 线程,worker 线程醒来后会取出任务并执行。


  虽然固定线程池实现起来很简单,但却有着几个缺陷:

  • 无法动态扩容:worker 线程的个数是固定的,不能随着任务数的增长而增长。
  • 无法动态缩容:如果有很多 worker 线程处于空闲状态,就会造成资源的浪费。

动态线程池

  对于线程池,我们希望它可以动态增长,这样才不会造成任务队列的堆积,另一反面,也希望它能适当回收一些空闲的线程,以节省系统资源。

线程池接口

  gRPC 内部使用了动态线程池,它的接口长这样:

1
2
3
4
5
6
7
8
class DynamicThreadPool
{
public:
DynamicThreadPool(int reserve_threads);
~DynamicThreadPool();
void Add(const std::function<void()> &callback); // 提交任务
};

  那么DynamicThreadPool是怎样管理线程的呢?

  • DynamicThreadPool并不会限制线程的数量,理论上这意味着线程数量可以无限增长。
  • 构造函数接收一个参数reserve_threads,这个参数与线程池的缩容策略有关:它表示线程池最多只能有reserve_threads个空闲线程。也就是说,如果线程池的空闲线程数量多于这个值,那么多出来的那些线程就会被系统回收。

线程池的构造

  首先看看DynamicThreadPool的数据成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DynamicThreadPool
{
private:
std::mutex mu_; // 互斥锁,保护数据成员
std::condition_variable cv_; // 条件变量
std::condition_variable shutdown_cv_; // 条件变量,与线程池析构相关
bool shutdown_; // 线程池是否即将析构
std::queue<std::function<void()>> callbacks_; // 任务队列
int reserve_threads_; // 最大空闲线程数
int nthreads_; // 当前线程数
int threads_waiting_; // 空闲线程数
std::list<DynamicThread*> dead_threads_; // 保存已经终止的线程
};

  DynamicThreadPool的构造函数会先创建reserve_threads个线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
DynamicThreadPool::DynamicThreadPool(int reserve_threads)
: shutdown_(false),
reserve_threads_(reserve_threads),
nthreads_(0),
threads_waiting_(0)
{
for (int i = 0; i < reserve_threads_; i++)
{
std::lock_guard<std::mutex> lock(mu_);
nthreads_++;
new DynamicThread(this); // 创建新线程
}
}

  可以看到,线程池不会直接使用std::thread,而是使用自己封装的DynamicThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class DynamicThreadPool
{
private:
class DynamicThread
{
public:
DynamicThread(DynamicThreadPool* pool);
~DynamicThread();
private:
DynamicThreadPool* pool_;
std::unique_ptr<std::thread> thd_;
void ThreadFunc();
};
}

  与std::thread相比,DynamicThread遵循了 RAII 的原则:DynamicThread在析构时,会调用线程的join()函数,用来确保正确地释放线程资源:

1
2
3
4
5
6
7
8
9
10
11
DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool *pool)
: pool_(pool),
thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this))
{
}
DynamicThreadPool::DynamicThread::~DynamicThread()
{
thd_->join();
thd_.reset();
}

线程池的析构

  DynamicThread在退出之前,会将自己添加到线程池的dead_threads中(在适当的时机,线程池会deletedead_threads中的所有线程,保证资源的释放)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void DynamicThreadPool::DynamicThread::ThreadFunc()
{
// 执行工作
pool_->ThreadFunc();
// 执行完工作,这时 std::thread 将要退出
std::unique_lock<std::mutex> lock(pool_->mu_);
pool_->nthreads_--; // 当前线程数减 1
// 将自己添加到 dead_threads 中
pool_->dead_threads_.push_back(this);
// 如果线程池正在析构 (休眠等待所有线程退出)
// 并且所有线程均已退出了,那就唤醒线程池
if ((pool_->shutdown_) && (pool_->nthreads_ == 0))
{
pool_->shutdown_cv_.notify_one();
}
}

  线程池的析构函数首先会唤醒所有休眠的线程,然后等待所有线程都退出,之后再调用reapThreads()清理掉所有线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
DynamicThreadPool::~DynamicThreadPool()
{
std::unique_lock<std::mutex> lock_(mu_);
shutdown_ = true;
// 唤醒所有线程
cv_.notify_all();
// 等待所有线程都退出
while (nthreads_ != 0)
{
shutdown_cv_.wait(lock_);
}
// 清理掉所有终止的线程
ReapThreads(&dead_threads_);
}
void DynamicThreadPool::ReapThreads(std::list<DynamicThread*> *tlist)
{
for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t))
{
delete *t;
}
}

任务的提交与执行

  线程池的Add()函数用来提交任务,任务会被放到任务队列中,并唤醒一个空闲的线程去处理任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void DynamicThreadPool::Add(const std::function<void ()> &callback)
{
std::lock_guard<std::mutex> lock(mu_);
// 将任务添加到任务队列中
callbacks_.push(callback);
// 如果没有空闲的线程,就创建新的线程
if (threads_waiting_ == 0)
{
nthreads_++;
new DynamicThread(this);
}
else
{
// 唤醒一个空闲的线程
cv_.notify_one();
}
// 释放掉已经终止的线程
if (!dead_threads_.empty())
{
ReapThreads(&dead_threads_);
}
}

  而ThreadFunc()则展示了线程消费任务的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void DynamicThreadPool::ThreadFunc()
{
for (;;)
{
std::unique_lock<std::mutex> lock(mu_);
// 如果任务队列为空,那就让自己休眠
if (!shutdown_ && callbacks_.empty())
{
// 如果已经有足够多的空闲线程,那么就退出自己
if (threads_waiting_ >= reserve_threads_)
{
break;
}
threads_waiting_++;
cv_.wait(lock); // 进入休眠
threads_waiting_--;
}
// 判断 shutdown 之前需要保证所有任务都被执行完
if (!callbacks_.empty())
{
auto cb = callbacks_.front();
callbacks_.pop();
lock.unlock();
cb();
}
else if (shutdown_)
{
break;
}
}
}

  完整的代码可以参照这里:grpc_dynamic_thread_pool