Senlin's Blog


  • 分类

  • 归档

  • 标签

  • 关于

在 Boost.Asio 中使用协程

发表于 2017-10-03   |   分类于 网络编程   |   阅读次数

  从 1.54.0 版本开始,Boost.Asio 开始支持协程。异步编程是复杂的,协程可以让我们以同步的方式编写出异步的代码,在提高代码可读性的同时又不会丢失性能。
  在 Boost.Asio 要怎样才能使用协程呢?可以使用boost::asio::spawn()开启一个协程:

1
2
3
4
5
6
boost::asio::spawn(strand, echo);
void echo(boost::asio::yield_context yield) // 协程
{
// ...
}

  spawn()的第一个参数可以是io_service,也可以是strand(如果需要在多线程中保证同步,就需要使用strand,具体可以参见 [浅谈 Boost.Asio 的多线程模型])


  协程可以提供代码可读性,例如,如果没有使用协程,那么我们需要编写很多回调函数:

1
2
3
4
5
6
7
void handleRead(boost::system::error_code ec, std::size_t bytes_transferred)
{
// ...
}
socket.async_read_some(boost::asio::buffer(buffer, buffer.size()),
handleRead);

  使用协程之后,就不需要回调函数了:

1
2
3
4
5
6
7
8
9
10
11
try
{
std::size_t n = socket_.async_read_some(
boost::asio::buffer(buffer, buffer.size()),
yield
);
}
catch (std::exception& e)
{
// ...
}

  上面的代码,如果出现错误就会抛出异常。当然我们也可以使用错误码替代异常:

1
2
3
4
5
boost::system::error_code ec;
std::size_t n = socket_.async_read_some(
boost::asio::buffer(data, data.size()),
yield[ec]
);

阅读全文 »

浅谈 Boost.Asio 的多线程模型

发表于 2017-09-17   |   分类于 网络编程   |   阅读次数

  Boost.Asio 有两种支持多线程的方式,第一种方式比较简单:在多线程的场景下,每个线程都持有一个io_service,并且每个线程都调用各自的io_service的run()方法。
  另一种支持多线程的方式:全局只分配一个io_service,并且让这个io_service在多个线程之间共享,每个线程都调用全局的io_service的run()方法。

每个线程一个 I/O Service

  让我们先分析第一种方案:在多线程的场景下,每个线程都持有一个io_service (通常的做法是,让线程数和 CPU 核心数保持一致)。那么这种方案有什么特点呢?

  • 在多核的机器上,这种方案可以充分利用多个 CPU 核心。
  • 某个 socket 描述符并不会在多个线程之间共享,所以不需要引入同步机制。
  • 在 event handler 中不能执行阻塞的操作,否则将会阻塞掉io_service所在的线程。

  下面我们实现了一个AsioIOServicePool,封装了线程池的创建操作 [完整代码]:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class AsioIOServicePool
{
public:
using IOService = boost::asio::io_service;
using Work = boost::asio::io_service::work;
using WorkPtr = std::unique_ptr<Work>;
AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency())
: ioServices_(size),
works_(size),
nextIOService_(0)
{
for (std::size_t i = 0; i < size; ++i)
{
works_[i] = std::unique_ptr<Work>(new Work(ioServices_[i]));
}
for (std::size_t i = 0; i < ioServices_.size(); ++i)
{
threads_.emplace_back([this, i] ()
{
ioServices_[i].run();
});
}
}
AsioIOServicePool(const AsioIOServicePool &) = delete;
AsioIOServicePool &operator=(const AsioIOServicePool &) = delete;
// 使用 round-robin 的方式返回一个 io_service
boost::asio::io_service &getIOService()
{
auto &service = ioServices_[nextIOService_++];
if (nextIOService_ == ioServices_.size())
{
nextIOService_ = 0;
}
return service;
}
void stop()
{
for (auto &work: works_)
{
work.reset();
}
for (auto &t: threads_)
{
t.join();
}
}
private:
std::vector<IOService> ioServices_;
std::vector<WorkPtr> works_;
std::vector<std::thread> threads_;
std::size_t nextIOService_;
};

  AsioIOServicePool使用起来也很简单:

1
2
3
4
5
6
7
8
9
10
std::mutex mtx; // protect std::cout
AsioIOServicePool pool;
boost::asio::steady_timer timer{pool.getIOService(), std::chrono::seconds{2}};
timer.async_wait([&mtx] (const boost::system::error_code &ec)
{
std::lock_guard<std::mutex> lock(mtx);
std::cout << "Hello, World! " << std::endl;
});
pool.stop();

一个 I/O Service 与多个线程

  另一种方案则是先分配一个全局io_service,然后开启多个线程,每个线程都调用这个io_service的run()方法。这样,当某个异步事件完成时,io_service就会将相应的 event handler 交给任意一个线程去执行。
  然而这种方案在实际使用中,需要注意一些问题:

  • 在 event handler 中允许执行阻塞的操作 (例如数据库查询操作)。
  • 线程数可以大于 CPU 核心数,譬如说,如果需要在 event handler 中执行阻塞的操作,为了提高程序的响应速度,这时就需要提高线程的数目。
  • 由于多个线程同时运行事件循环(event loop),所以会导致一个问题:即一个 socket 描述符可能会在多个线程之间共享,容易出现竞态条件 (race condition)。譬如说,如果某个 socket 的可读事件很快发生了两次,那么就会出现两个线程同时读同一个 socket 的问题 (可以使用strand解决这个问题)。

  下面实现了一个线程池,在每个 worker 线程中执行io_service的run()方法 [完整代码]:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class AsioThreadPool
{
public:
AsioThreadPool(int threadNum = std::thread::hardware_concurrency())
: work_(new boost::asio::io_service::work(service_))
{
for (int i = 0; i < threadNum; ++i)
{
threads_.emplace_back([this] () { service_.run(); });
}
}
AsioThreadPool(const AsioThreadPool &) = delete;
AsioThreadPool &operator=(const AsioThreadPool &) = delete;
boost::asio::io_service &getIOService()
{
return service_;
}
void stop()
{
work_.reset();
for (auto &t: threads_)
{
t.join();
}
}
private:
boost::asio::io_service service_;
std::unique_ptr<boost::asio::io_service::work> work_;
std::vector<std::thread> threads_;
};

阅读全文 »

gRPC 的 C++ 动态线程池源码分析

发表于 2017-09-09   |   分类于 并发编程   |   阅读次数

固定线程池

  提到线程池,通常说的都是固定大小的线程池,固定线程池的原理是这样的:

  • 线程池由一个线程安全的队列,以及多个 worker 线程组成。
  • 可以有多个 producer 线程,它们负责提交任务给线程池。
  • 接收到新任务之后,线程池会唤醒某个 worker 线程,worker 线程醒来后会取出任务并执行。


  虽然固定线程池实现起来很简单,但却有着几个缺陷:

  • 无法动态扩容:worker 线程的个数是固定的,不能随着任务数的增长而增长。
  • 无法动态缩容:如果有很多 worker 线程处于空闲状态,就会造成资源的浪费。

动态线程池

  对于线程池,我们希望它可以动态增长,这样才不会造成任务队列的堆积,另一反面,也希望它能适当回收一些空闲的线程,以节省系统资源。

线程池接口

  gRPC 内部使用了动态线程池,它的接口长这样:

1
2
3
4
5
6
7
8
class DynamicThreadPool
{
public:
DynamicThreadPool(int reserve_threads);
~DynamicThreadPool();
void Add(const std::function<void()> &callback); // 提交任务
};

  那么DynamicThreadPool是怎样管理线程的呢?

  • DynamicThreadPool并不会限制线程的数量,理论上这意味着线程数量可以无限增长。
  • 构造函数接收一个参数reserve_threads,这个参数与线程池的缩容策略有关:它表示线程池最多只能有reserve_threads个空闲线程。也就是说,如果线程池的空闲线程数量多于这个值,那么多出来的那些线程就会被系统回收。

线程池的构造

  首先看看DynamicThreadPool的数据成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DynamicThreadPool
{
private:
std::mutex mu_; // 互斥锁,保护数据成员
std::condition_variable cv_; // 条件变量
std::condition_variable shutdown_cv_; // 条件变量,与线程池析构相关
bool shutdown_; // 线程池是否即将析构
std::queue<std::function<void()>> callbacks_; // 任务队列
int reserve_threads_; // 最大空闲线程数
int nthreads_; // 当前线程数
int threads_waiting_; // 空闲线程数
std::list<DynamicThread*> dead_threads_; // 保存已经终止的线程
};

  DynamicThreadPool的构造函数会先创建reserve_threads个线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
DynamicThreadPool::DynamicThreadPool(int reserve_threads)
: shutdown_(false),
reserve_threads_(reserve_threads),
nthreads_(0),
threads_waiting_(0)
{
for (int i = 0; i < reserve_threads_; i++)
{
std::lock_guard<std::mutex> lock(mu_);
nthreads_++;
new DynamicThread(this); // 创建新线程
}
}

  可以看到,线程池不会直接使用std::thread,而是使用自己封装的DynamicThread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class DynamicThreadPool
{
private:
class DynamicThread
{
public:
DynamicThread(DynamicThreadPool* pool);
~DynamicThread();
private:
DynamicThreadPool* pool_;
std::unique_ptr<std::thread> thd_;
void ThreadFunc();
};
}

  与std::thread相比,DynamicThread遵循了 RAII 的原则:DynamicThread在析构时,会调用线程的join()函数,用来确保正确地释放线程资源:

1
2
3
4
5
6
7
8
9
10
11
DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool *pool)
: pool_(pool),
thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this))
{
}
DynamicThreadPool::DynamicThread::~DynamicThread()
{
thd_->join();
thd_.reset();
}

阅读全文 »

谈谈 Libevent 的应用层 buffer

发表于 2017-08-20   |   分类于 网络编程   |   阅读次数

  非阻塞网络编程通常有一个难点,就是如何管理应用层的缓冲区:从用户态来讲,每个 socket 都必须有一个输入缓冲区和一个输出缓冲区。为什么需要缓冲区的?让我们举个例子理解一下:

  • 假设某个 socket 可读,并且程序已经从这个 socket 读完了数据,有 10KB 数据。但是需要 100KB 才构成一条完整的消息。那么这 10KB 数据怎么办呢?可以先把它暂时放在输入缓冲区中,等到凑齐了 100KB 再一起处理。
  • 假设程序需要发送 100KB 的数据,但是调用write()最多写入了 10KB,那么剩下的 90KB 数据怎么办呢?可以先 append 到输出缓冲区中,等到下次 socket 变得可写时再发送出去。

缓冲区处理

  很幸运,Libevent 提供了bufferevent,让缓冲区的处理变得很简单。bufferevent的结构大概是这样的,它包含一个 socket 描述符,以及一个输入缓冲区和一个输出缓冲区:

1
2
3
4
5
6
struct bufferevent {
evutil_socket_t fd; // socket 描述符
evbuffer *input; // 输入缓冲区
evbuffer *output; // 输出缓冲区
// ...
};

  可以使用bufferevent_socket_new()创建bufferevent结构:

1
2
3
4
5
6
7
struct bufferevent *bufferevent_socket_new(
struct event_base *base, // 事件循环
evutil_socket_t fd, // socket 描述符, 必须先设置成非阻塞的
enum bufferevent_options options // 选项
);
void bufferevent_free(struct bufferevent *bev);

  其中的options可以设置成BEV_OPT_CLOSE_ON_FREE,也就是说当调用bufferevent_free(),相应的 socket 描述符也会被close()掉。
  bufferevent会自动帮我们管理应用层的缓冲区,那么它具体是怎样起作用的呢?

  • 如果 socket 可读,bufferevent会自动读取 socket 中的数据,并放到输入缓冲区中。
  • 如果 socket 可写,bufferevent会自动将输出缓冲区中的数据写到 socket 中。

  为了让bufferevent自动帮我们管理缓冲区,还有一个条件,那就是要开启它的读功能和写功能:

1
2
bufferevent_enable(b, EV_READ); // 开启读功能
bufferevent_enable(b, EV_WRITE); // 开启写功能

  开启读功能之后,如果 socket 可读,bufferevent才会自动读取 socket 中的数据到输入缓冲区中,写功能的作用也同理。默认情况下,使用bufferevent_socket_new()创建bufferevent之后,其实它已经自动开启了写功能了。


  一个bufferevent可以设置三个回调函数,分别是读取回调、写入回调和事件回调。可以调用bufferevent_setcb()设置相应的回调函数:

1
2
3
4
5
6
7
void bufferevent_setcb(
struct bufferevent *bufev, // bufferevent 指针
bufferevent_data_cb readcb, // 读取回调
bufferevent_data_cb writecb, // 写入回调
bufferevent_event_cb eventcb, // 事件回调
void *cbarg // 传递给回调函数的参数
);

  那么这三个回调函数什么时候才会被调用呢?

  • 当输入缓冲区的数据大于或等于输入低水位时,读取回调就会被调用。默认情况下,输入低水位的值是 0,也就是说,只要 socket 变得可读,就会调用读取回调。
  • 当输出缓冲区的数据小于或等于输出低水位时,写入回调就会被调用。默认情况下,输出低水位的值是 0,也就是说,只有当输出缓冲区的数据都发送完了,才会调用写入回调。因此,默认情况下的写入回调也可以理解成为 write complete callback。
  • 当连接建立、连接关闭、连接超时或者连接发生错误时,则会调用事件回调。

  除此之外,我们还可以设置bufferevent的输入高水位,那么什么是输入高水位呢?默认情况下,bufferevent的输入缓冲区是可以无限增长的,但有时候我们想限制一个 TCP 连接的流量,这时候就可以设置一个输入高水位,这样就能限制输入缓冲区的大小了,保证它不会超过输入高水位。可以使用bufferevent_setwatermark()设置水位线:

1
2
void bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark);

  譬如说,我们设置某个bufferevent的输入高水位为 128 MB:

1
bufferevent_setwatermark(b, EV_READ, 0, 128 * 1024 * 1024);

阅读全文 »

Libevent 编程指南

发表于 2017-08-12   |   分类于 网络编程   |   阅读次数

如何安装

  Libevent 是一个高性能,跨平台的 C 语言网络库。Libevent 当前最新的版本是 2.1.8,下面是在 Linux 安装 Libevent 的步骤:

1
2
3
4
5
6
$ wget https://github.com/libevent/libevent/releases/download/release-2.1.8-stable/libevent-2.1.8-stable.tar.gz
$ tar -xzvf libevent-2.1.8-stable.tar.gz
$ cd libevent-2.1.8-stable
$ ./configure
$ make
$ sudo make install

基本概念

  Libevent是基于 Reactor 模式的网络库,在 Reactor 模式中,通常都有一个事件循环(Event Loop),在 Libevent 中,这个事件循环就是event_base结构体:

1
2
3
struct event_base *event_base_new(void); // 创建事件循环
void event_base_free(struct event_base *base); // 销毁事件循环
int event_base_dispatch(struct event_base *base); // 运行事件循环

  通常来说,事件循环主要有两个作用:

  • 用来管理事件,比如说添加我们感兴趣的事件,修改事件或删除事件。
  • 用来轮询它管理的所有事件,如果发现有事件活跃 (avtive),就调用相应的回调函数去处理事件。

  Libevent 使用event结构体来代表事件,可以使用event_new()创建一个事件:

1
2
3
4
5
struct event *event_new(struct event_base *base, // 事件循环
evutil_socket_t fd, // 文件描述符
short what, // 事件类型
event_callback_fn cb, // 回调函数
void *arg); // 传递给回调函数的参数

  创建一个事件之后,要怎么把它加入到事件循环呢?可以使用event_add()函数:

1
2
int event_add(struct event *ev, // 事件
const struct timeval *tv); // 超时时间

  默认情况下,当一个事件变得活跃时,Libevent 会执行这个事件的回调函数,但同时也会将这个事件从事件循环中移除,例如,下面的程序,定时器只会触发一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <event2/event.h>
#include <iostream>
#include <string>
void timer_cb(evutil_socket_t fd, short what, void *arg)
{
auto str = static_cast<std::string *>(arg);
std::cout << *str << std::endl;
}
int main()
{
std::string str = "Hello, World!";
auto *base = event_base_new();
struct timeval five_seconds = {1, 0};
auto *ev = event_new(base, -1, EV_TIMEOUT, timer_cb, (void *)&str);
event_add(ev, &five_seconds);
event_base_dispatch(base);
event_free(ev);
event_base_free(base);
return 0;
}

  那要怎么样才能让事件不被移除呢?当创建事件时,在事件类型加上EV_PERSIST就可以。让我们修改上面的程序,让定时器每秒就触发一次:

1
auto *ev = event_new(base, -1, EV_TIMEOUT|EV_PERSIST, timer_cb, (void *)&str);

阅读全文 »
1…456…13

高性能

61 日志
13 分类
14 标签
GitHub 知乎
© 2015 - 2022
由 Hexo 强力驱动
主题 - NexT.Mist
  |   总访问量: