谈谈 Libevent 的应用层 buffer

  非阻塞网络编程通常有一个难点,就是如何管理应用层的缓冲区:从用户态来讲,每个 socket 都必须有一个输入缓冲区和一个输出缓冲区。为什么需要缓冲区的?让我们举个例子理解一下:

  • 假设某个 socket 可读,并且程序已经从这个 socket 读完了数据,有 10KB 数据。但是需要 100KB 才构成一条完整的消息。那么这 10KB 数据怎么办呢?可以先把它暂时放在输入缓冲区中,等到凑齐了 100KB 再一起处理。
  • 假设程序需要发送 100KB 的数据,但是调用write()最多写入了 10KB,那么剩下的 90KB 数据怎么办呢?可以先 append 到输出缓冲区中,等到下次 socket 变得可写时再发送出去。

缓冲区处理

  很幸运,Libevent 提供了bufferevent,让缓冲区的处理变得很简单。bufferevent的结构大概是这样的,它包含一个 socket 描述符,以及一个输入缓冲区和一个输出缓冲区:

1
2
3
4
5
6
struct bufferevent {
evutil_socket_t fd; // socket 描述符
evbuffer *input; // 输入缓冲区
evbuffer *output; // 输出缓冲区
// ...
};

  可以使用bufferevent_socket_new()创建bufferevent结构:

1
2
3
4
5
6
7
struct bufferevent *bufferevent_socket_new(
struct event_base *base, // 事件循环
evutil_socket_t fd, // socket 描述符, 必须先设置成非阻塞的
enum bufferevent_options options // 选项
);
void bufferevent_free(struct bufferevent *bev);

  其中的options可以设置成BEV_OPT_CLOSE_ON_FREE,也就是说当调用bufferevent_free(),相应的 socket 描述符也会被close()掉。
  bufferevent会自动帮我们管理应用层的缓冲区,那么它具体是怎样起作用的呢?

  • 如果 socket 可读,bufferevent会自动读取 socket 中的数据,并放到输入缓冲区中。
  • 如果 socket 可写,bufferevent会自动将输出缓冲区中的数据写到 socket 中。

  为了让bufferevent自动帮我们管理缓冲区,还有一个条件,那就是要开启它的读功能写功能

1
2
bufferevent_enable(b, EV_READ); // 开启读功能
bufferevent_enable(b, EV_WRITE); // 开启写功能

  开启读功能之后,如果 socket 可读,bufferevent才会自动读取 socket 中的数据到输入缓冲区中,写功能的作用也同理。默认情况下,使用bufferevent_socket_new()创建bufferevent之后,其实它已经自动开启了写功能了。


  一个bufferevent可以设置三个回调函数,分别是读取回调、写入回调和事件回调。可以调用bufferevent_setcb()设置相应的回调函数:

1
2
3
4
5
6
7
void bufferevent_setcb(
struct bufferevent *bufev, // bufferevent 指针
bufferevent_data_cb readcb, // 读取回调
bufferevent_data_cb writecb, // 写入回调
bufferevent_event_cb eventcb, // 事件回调
void *cbarg // 传递给回调函数的参数
);

  那么这三个回调函数什么时候才会被调用呢?

  • 输入缓冲区的数据大于或等于输入低水位时,读取回调就会被调用。默认情况下,输入低水位的值是 0,也就是说,只要 socket 变得可读,就会调用读取回调。
  • 输出缓冲区的数据小于或等于输出低水位时,写入回调就会被调用。默认情况下,输出低水位的值是 0,也就是说,只有当输出缓冲区的数据都发送完了,才会调用写入回调。因此,默认情况下的写入回调也可以理解成为 write complete callback。
  • 当连接建立、连接关闭、连接超时或者连接发生错误时,则会调用事件回调。

  除此之外,我们还可以设置bufferevent输入高水位,那么什么是输入高水位呢?默认情况下,bufferevent的输入缓冲区是可以无限增长的,但有时候我们想限制一个 TCP 连接的流量,这时候就可以设置一个输入高水位,这样就能限制输入缓冲区的大小了,保证它不会超过输入高水位。可以使用bufferevent_setwatermark()设置水位线:

1
2
void bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark);

  譬如说,我们设置某个bufferevent的输入高水位为 128 MB:

1
bufferevent_setwatermark(b, EV_READ, 0, 128 * 1024 * 1024);


  最后,让我们编写一个简单的 TCP Echo Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <arpa/inet.h>
#include <string.h>
#include <iostream>
void echo_read_cb(struct bufferevent *bev, void *ctx)
{
struct evbuffer *input = bufferevent_get_input(bev); // 输入缓存区
struct evbuffer *output = bufferevent_get_output(bev); // 输出缓存区
// 将输入缓冲区的数据移动到输出缓冲区
evbuffer_add_buffer(output, input);
}
void echo_event_cb(struct bufferevent *bev, short events, void *ctx)
{
if (events & BEV_EVENT_ERROR)
{
int err = EVUTIL_SOCKET_ERROR();
std::cerr << "Got an error from bufferevent: "
<< evutil_socket_error_to_string(err)
<< std::endl;
}
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
{
bufferevent_free(bev);
}
}
void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *address, int socklen, void *arg)
{
// 设置 socket 为非阻塞
evutil_make_socket_nonblocking(fd);
auto *base = evconnlistener_get_base(listener);
auto *b = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(b, echo_read_cb, nullptr, echo_event_cb, nullptr);
bufferevent_enable(b, EV_READ|EV_WRITE);
}
int main()
{
short port = 8000;
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
sin.sin_port = htons(port);
auto *base = event_base_new();
auto *listener = evconnlistener_new_bind(
base, accept_conn_cb, nullptr,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
reinterpret_cast<struct sockaddr *>(&sin), sizeof(sin));
if (listener == nullptr)
{
std::cerr << "Couldn't create listener" << std::endl;
return 1;
}
event_base_dispatch(base);
return 0;
}

参考资料