C++ 实现阻塞队列

队列的异常安全

  考虑一下 STL 中 queue 的接口:

1
2
3
4
5
6
7
8
template <typename T, typename Container = std::deque<T>>
class queue
{
T& front();
void push( const T &elem );
void pop();
bool empty() const; // avoid undefined behaviour before calling front()
};

  我们知道,front()用来获得队列头部元素,而pop()则令头部元素出列。但是为什么不设计成这样呢:

1
2
3
4
5
template <typename T, typename Container = std::deque<T>>
class queue
{
T pop();
};

  其实这是为了保证异常安全(Exception Safe),像下面这个例子:

1
2
3
queue<Widget> q;
// ...
Widget value = q.top();

  设想一下,要是top()将元素出列,并且将这个元素赋值给 value 时,若其拷贝构造函数发生了异常,那么,这个元素就会永远丢失了。

关于实现的细节

  我们将设计一个 ThreadQueue,并且具有以下的特点:

  • 允许多个读者 ( Reader ) 和多个写者 ( Writer ) 并发地访问队列。
  • 元素出列的操作将会阻塞,直到队列不为空。
  • 元素出列时保证异常安全性。

  我们知道,在调用std::queue<T>front()之前,我们需要保证队列不为空,否则这种行为是未定义的。但是在多线程环境下,我们通常不需要front()操作,而只是调用pop()将元素返回。pop()操作是阻塞的,也就是说,在调用pop()时,要是队列没有元素,那么pop()将会阻塞,以等待新元素入列:

1
2
3
4
5
template <typename T, typename Container = std::queue<T>>
class ThreadQueue
{
T pop();
};

  但是,我们知道,pop()并不是异常安全的,那怎么实现异常安全的pop()呢?

1
2
3
4
5
template <typename T, typename Container = std::queue<T>>
class ThreadQueue
{
T pop( T &elem );
};

  在调用pop()之前,我们需要传递一个元素作为参数,以存储将要出列的元素:

1
2
3
4
ThreadQueue<Widget> q;
// ...
Widget widget;
q.pop( widget );

阻塞队列的实现

  阻塞队列实际上就是典型的生产者-消费者模型,可想而知,应当使用条件变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
template <typename T, typename Container = std::queue<T>>
class ThreadQueue
{
public:
using container_type = Container;
using value_type = typename Container::value_type;
using reference = typename Container::reference;
using const_reference = typename Container::const_reference;
using size_type = typename Container::size_type;
using mutex_type = std::mutex;
using condition_variable_type = std::condition_variable;
private:
Container queue_;
mutable mutex_type mutex_;
condition_variable_type cond_;
public:
ThreadQueue() = default;
ThreadQueue( const ThreadQueue & ) = delete;
ThreadQueue &operator=( const ThreadQueue & ) = delete;
void pop( reference elem )
{
std::unique_lock<mutex_type> lock( mutex_ );
cond_.wait( lock, [this]() { return !queue_.empty(); } );
elem = std::move( queue_.front() );
queue_.pop();
}
bool try_pop( reference elem )
{
std::unique_lock<mutex_type> lock( mutex_ );
if( queue_.empty() ) {
return false;
}
elem = std::move( queue_.front() );
queue_.pop();
return true;
}
bool empty() const
{
std::lock_guard<mutex_type> lock( mutex_ );
return queue_.empty();
}
size_type size() const
{
std::lock_guard<mutex_type> lock( mutex_ );
return queue_.size();
}
void push( const value_type &elem )
{
{
std::lock_guard<mutex_type> lock( mutex_ );
queue_.push( elem );
}
cond_.notify_one();
}
void push( value_type &&elem )
{
{
std::lock_guard<mutex_type> lock( mutex_ );
queue_.push( std::move( elem ) );
}
cond_.notify_one();
}
};

  注意到,要是队列是空的,那么pop()操作将会阻塞,因此,我们提供了一个try_pop()操作,要是队列是空的,调用try_pop()将会立即返回而不会阻塞。

生产者与消费者

  我们知道,ThreadQueue 可以用在多个生产者,多个消费者的场景下,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include "ThreadQueue.hpp"
#include <iostream>
#include <string>
#include <chrono>
#include <sstream>
#include <thread>
using namespace std;
ThreadQueue<string> messageQueue;
mutex print_mtx;
void provider( int product_num, int wait_seconds )
{
for( int i = 0; i < product_num; ++i )
{
string message( "Message-" );
message.append( to_string( i ) );
messageQueue.push( message );
this_thread::sleep_for( chrono::seconds( wait_seconds ) );
}
lock_guard<mutex> guard( print_mtx );
cout << "All works done!" << endl;
}
void consumer( int consumer_id )
{
while( true )
{
string message;
messageQueue.pop( message );
{
lock_guard<mutex> guard( print_mtx );
cout << "consumer-" << consumer_id << " receive: " << message << endl;
}
}
}
int main()
{
thread pvi{ provider, 10, 1 };
thread csm1{ consumer, 1 };
thread csm2{ consumer, 2 };
pvi.join();
csm1.join();
csm2.join();
return 0;
}
/**
$ clang++ -std=c++11 -o main ThreadQueueTest.cpp -pthread
$ ./main
consumer-1 receive: Message-0
consumer-2 receive: Message-1
consumer-1 receive: Message-2
consumer-2 receive: Message-3
consumer-1 receive: Message-4
consumer-2 receive: Message-5
consumer-1 receive: Message-6
consumer-2 receive: Message-7
consumer-1 receive: Message-8
consumer-2 receive: Message-9
All works done!
**/

参考资料