Muduo网络库实现(二):事件循环

Reactor 模型

在聊事件循环具体实现之前,讲一下网络编程中常用的并发模型。Linux 下,对文件描述符的操作默认都是阻塞的,如调用accept接受客户端连接时,是从已完成连接队列头返回下一个已完成的连接,如果没有客户端连接,进程会休眠(阻塞在调用处)。再如对文件描述符调用read系统调用读取消息,如果对方没有发来消息则会一直阻塞。这种阻塞工作模式的缺点是显而易见的:同一时间只能服务一个客户,阻塞在一个系统调用时,其他客户无法和服务端连接。

现代操作系统提供了 IO 多路复用(IO multiplexing)技术来解决上述问题,IO 多路复用思路是用户向内核注册感兴趣的文件描述符,以及这些描述符上感兴趣的事件(如可读事件或可写事件),然后等待事件发生时内核通知用户。在类 Unix 操作系统中一般有select,poll等系统调用,在 Linux 上有更加优秀的epoll_wait调用。

那么网络编程的模式就变成了如下的模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
while (!stop_server) {
int number = epoll_wait(epfd, events, MAX_EVENT_NUMBER, -1); // 没有事件发生时阻塞在这里
// 发生的事件在 events 中
for (int i=0; i<number; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
// 新客户连接事件发生
struct sockaddr_in client_address;
socklen_t client_address_len = sizeof client_address;
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_address_len);
if (connfd < 0) {
printf("accept error, errno is:%d\n", errno);
continue;
}
...
// 注册新客户文件描述符上感兴趣的事件
} else if (events[i].events & EPOLLIN) {
// 之前注册的读事件
...
ret = read(...)
...
} else if (events[i].events & EPOLLOUT) {
// 文件描述符可写
...
ret = write(...)
...
}
}
}

上述模式可以同时服务多个客户,在没有事件发生的时候程序阻塞在多路复用调用处,当感兴趣的事件发生时去处理相应的事件,由于发生事件是内核通知的,程序在readaccept调用后会立即返回,并重新等待事件发生。这种模式很好,实际上,Reactor 模式就是对上面的模式进行了封装,使得更加易用且不用考虑底层细节。

Reactor 模式主要有 Reactor 和 Event Handler 两个核心模块组成,其中 Reactor 负责监听事件,当事件发生时通过事件分发机制将其发给 Event Handler 进行处理。根据 Reactor 数量以及 Event Handler 是否由多个线程(或进程)进行处理,可以有以下几个典型方案(图来自某篇知乎文章,但好像不是作者画的,链接放在末尾):

  • 单 Reactor 单线程

单 Reactor 单线程模式其实就是上面示例程序的封装,如果是建立连接事件,就通过 Acceptor 来处理,如果是其它业务事件,就通过 Handler 对象来处理。这种方案缺点是无法利用 多核 CPU 的性能,接收连接和处理客户请求都在一个线程里。

  • 单 Reactor 多线程

单 Reactor 多线程将 Reactor 放在一个单独的线程中,当有事件发生时将其分发至不同的子线程进行事件处理,这些子线程一般使用线程池来实现。主线程只负责事件相关的部分,如新连接到来,读取文件描述符中的数据,向文件描述符中写数据,业务处理都放到单独的线程中进行处理,这样能够充分发挥多核 CPU 的能力。

  • 多 Reactor 多线程

多 Reactor 多线程模式减轻了主线程的工作负担,主线程只负责监听建立连接的请求,当新连接建立事件发生时,通过 Acceptor 获取到客户对应的文件描述符,将其分发给子线程,子线程内部也有一个事件循环,它负责对主线程分配的客户进行服务,读取数据、处理业务和发送数据。

Muduo 库支持单 Reactor 单线程和多 Reactor 多线程,对于后者采用 one loop per thread 的形式。下面分别介绍 Muduo 库对于这些模块的详细设计,其对事件循环的主要封装类为 EventLoop, Channel 和 Poller,其中 EventLoop 是事件循环的“驱动者”,它持有 Poller 对象,并通过其提供的 poll 方法进行事件循环(也就是上面的 IO 多路复用调用),当发生事件后调用 Channel 中的事件处理函数进行处理。

Channel

Channel 类对某个文件描述符、相关事件、处理事件的方法进行了封装,其主要构成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Channel : noncopyable{
public:
typedef std::function<void()> EventCallback;
typedef std::function<void(Timestamp )> ReadEventCallback;

Channel(EventLoop* loop, int fd);
~Channel() = default;

void handleEvent(Timestamp receiveTime); // 事件处理函数
void setReadCallback(const ReadEventCallback& cb) { readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; }
void setCloseCallback(const EventCallback& cb) { closeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; }

int fd() const { return fd_; }
int events() const { return events_; }
void set_revents(int revent) { revents_ = revent; }
bool isNoneEvent() const { return events_ == kNoneEvent; }

void tie(const std::shared_ptr<void>&);
void enableReading() {
events_ |= kReadEvent;
update();
}
void enableWriting() {
events_ |= kWriteEvent;
update();
}
void disableWriting() {
events_ &= ~kWriteEvent;
update();
}
void disableAll() {
events_ = kNoneEvent;
update();
}
bool isWriting() const { return events_ & kWriteEvent; }

// for poller
int index() const { return index_; }
void setIndex(int idx) { index_ = idx; }

EventLoop* ownerLoop() { return loop_; }
void remove();

std::string reventsToString() const;

private:
void update();
void handleEventWithGuard(Timestamp receiveTime);

EventLoop* loop_; // Channel 所处的事件循环(一个Channel不能在多个 loop 中使用)
const int fd_; // 事件的文件描述符
int events_; // 感兴趣的事件
int revents_; // 实际发生的事件
int index_; // 保存自己在 poller 的 fd 列表中的索引(或当前状态),便于查找更新当前 Channel

// 事件处理函数
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;

std::weak_ptr<void> tie_; // 用于保存 Channel 持有者(TcpConnection)的弱引用,防止在处理事件时持有者被销毁
bool tied_;

static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
};

需要注意的有以下几点:

  1. 更新 Channel 中的感兴趣事件实际需要通过 EventLoop 进行,EventLoop 通过其持有的 Poller 来实际执行,Channel 和 Poller 并没有直接的联系。
1
2
3
4
5
6
7
void Channel::update() {
loop_->updateChannel(this);
}

void Channel::remove() {
loop_->removeChannel(this);
}
  1. handleEvent(Timestamp)方法在事件发生(IO 多路复用返回)之后被 EventLoop 调用,根据发生事件的类型来调用相关的回调函数(用户注册的回调)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void Channel::handleEventWithGuard(Timestamp receiveTime) {
if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) {
LOG_WARN << "Channel::handleEvent POLLHUP";
if (closeCallback_) {
closeCallback_();
}
}
if (revents_ & POLLNVAL) {
LOG_WARN << "Channel::handleEvent POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL)) {
if (errorCallback_) {
errorCallback_();
}
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {
if (readCallback_) {
readCallback_(receiveTime);
}
}
if (revents_ & POLLOUT) {
if (writeCallback_) {
writeCallback_();
}
}
}
  1. std::weak_ptr<void> tie_ 成员用于持有一个对象的弱引用,这里是持有 TcpConnection 类的弱引用,这个类是使用本网络库的用户直接可见的,为了防止用户在处理事件时(使用Channel类时)意外的将 TcpConnection 类销毁(从而导致其持有的 Channel 也被销毁),因此在处理 Channel 中的事件时将 tie_ 提升为强引用,这样保证安全。
1
2
3
4
5
6
7
8
9
10
11
12
void Channel::handleEvent(Timestamp receiveTime) {
std::shared_ptr<void > guard;
if (tied_) {
// 保证持有者(TcpConnection)还在
guard = tie_.lock();
if (guard) {
handleEventWithGuard(receiveTime);
}
} else {
handleEventWithGuard(receiveTime);
}
}

Poller

Poller 类是对 IO 多路复用系统调用的一个封装类,可以向其中注册感兴趣的 Channel,阻塞等待事件发生,事件发生之后返回发生的事件(通过 Channel 类)。为了同时支持 pollepoll_wait 两种多路复用调用,将 Poller 类实现成一个抽象类,并定义好暴露给用户的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Poller : noncopyable{
public:
typedef std::vector<Channel*> ChannelList;
explicit Poller(EventLoop* loop);
virtual ~Poller();

// 阻塞等待事件发生,在事件发生或到期后通过 activeChannels 返回发生的事件
virtual Timestamp poll(int timeouts, ChannelList* activeChannels) = 0;
// 更新事件
virtual void updateChannel(Channel* channel) = 0;
virtual void removeChannel(Channel* channel) = 0;
void assertInLoopThread();

private:
EventLoop* ownerLoop_;
};

具体的实现类为 EpollPoller 和 PollPoller,这里以 EpollPoller 为例说明主要的实现,其内部持有了 events_ 成员用于保存发生的事件(作为 epoll_wait 的参数),持有 std::map<int, Channel*> channels_ 成员记录注册的 Channel 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class EpollPoller : public Poller {
public:
explicit EpollPoller(EventLoop* loop_);
~EpollPoller() override;

void updateChannel(Channel *channel) override;
void removeChannel(Channel *channel) override;
Timestamp poll(int timeouts, ChannelList *activeChannels) override;
private:
static const int kInitEventListSize = 16;
void update(int operation, Channel* channel);
void fillActiveChannels(int numEvent, ChannelList* activeChannels) const;

int epfd_;
std::vector<epoll_event> events_; // 用来存已经发生的事件
std::map<int, Channel*> channels_;
};

poll 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Timestamp EpollPoller::poll(int timeouts, Poller::ChannelList *activeChannels) {
int numEvents = ::epoll_wait(epfd_, &*events_.begin(),
static_cast<int>(events_.size()), timeouts);
Timestamp now = Timestamp::now();
if (numEvents > 0) {
LOG_TRACE << numEvents << " events happened";
fillActiveChannels(numEvents, activeChannels);
if (static_cast<size_t>(numEvents) == events_.size()) {
events_.resize(events_.size() * 2);
}
} else if (numEvents == 0) {
LOG_TRACE << "nothing happened";
} else {
if (errno != EINTR) {
LOG_FATAL << "EpollPoller::poll()";
}
}
return now;
}

实际就是调用了 IO 多路复用 epoll_wait,并将发生的事件填充到相应的 Channel 中。

EventLoop

基本功能

EventLoop 是整个事件循环的核心。核心功能是驱动 Poller 将事件循环跑起来,因此其简化版本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class EventLoop {
public:
typedef std::function<void()> Functor;
EventLoop();
~EventLoop();

void loop(); // 启动事件循环
void quit();

void assertInLoopThread() {
if (!isInLoopThread()) {
abortNotInLoopThread(); // 终止程序
}
}
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

// 被 Channel 调用,进而调用 poller 的相关方法
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);

private:
void abortNotInLoopThread();

std::atomic_bool looping_;
std::atomic_bool quit_;
const pid_t threadId_;

std::unique_ptr<Poller> poller_;
Timestamp pollReturnTime_;
std::vector<Channel*> activeChannels_; // 当前有事件发生的 Channel
};
  1. 为了避免复杂的竞争关系,Muduo 设计一个线程中只能存在一个 EventLoop 对象,通过 assertInLoopThread() 来保证在当前线程,将其放在其它代码前面可以保证后面的代码只在 loop 所在的线程调用,防止别的线程调用引发竞争。具体实现就是在 EventLoop 对象初始化时保存当前的线程 ID,在 assertInLoopThread() 中比较当前线程 ID 和之前保存的线程 ID。获取当前线程 ID 使用 Linux 提供的系统调用:
1
t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));

为了避免频繁系统调用带来的开销,实际上使用线程局部变量 __thread 标志来缓存线程的 ID,这里不展开说了。

  1. EventLoop 通过 std::unique_ptr 持有了一个 Poller 对象,在 loop 中调用 poller_->poll(),返回结果放在 activeChannels_中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void EventLoop::loop() {
assertInLoopThread();
looping_ = true;
LOG_TRACE << "EventLoop " << this << " start looping";

while (!quit_) {
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeout, &activeChannels_);

for (auto c : activeChannels_) {
LOG_TRACE << "{" << c->reventsToString() << "}";
}

for (auto c : activeChannels_) {
c->handleEvent(pollReturnTime_);
}
doPendingFunctors(); // 事件循环唤醒之后处理在 loop 中排队的事件
}

looping_ = false;
LOG_TRACE << "EventLoop " << this << " stop looping";
}

唤醒事件循环

一般情况下,事件循环每次超时 kPollTimeout 后会唤醒一次,Muduo 中默认是 10 s,有时候我们希望立即在主循环中执行某个任务,也就是上面的doPendingFunctors(),这时候需要唤醒事件循环。

这里唤醒事件循环机制通过 Linux 中的 eventfd 来实现,eventfd 是在 Linux 2.6.22 版本后引入的,专门用于事件通知。它可以创建一个文件描述符,这个文件描述符内容为一个计数器,对其write能够增加计数,对其read能够将计数清零,且这个文件描述符可以使用 IO 多路复用机制来监听。

使用 Channel 对 wakeupFd_ 进行封装,并在 EventLoop 构造函数中注册对其读事件的监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// EventLoop.h
private:
int wakeupFd_; // 使用 eventfd 实现的事件循环唤醒机制
std::unique_ptr<Channel> wakeupChannel_;

std::mutex functorMutex_;
std::vector<Functor> pendingFunctors_; // 事件循环中等待执行的函数(醒来立即执行)
std::atomic_bool callingPendingFunctors_;

// EventLoop.cc
EventLoop::EventLoop()
: looping_(false),
quit_(false),
threadId_(CurrentThread::tid()),
poller_(new EpollPoller(this)),
timerQueue_(std::make_unique<TimerQueue>(this)),
wakeupFd_(detail::createEventFd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
callingPendingFunctors_(false)
{
...
wakeupChannel_->setReadCallback([this](Timestamp t) {
handleWakeup();
});
wakeupChannel_->enableReading();
}

唤醒和处理唤醒事件就是向 eventFd_ 的读写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void EventLoop::handleWakeup() const {
uint64_t one = 1;
ssize_t n = ::read(wakeupFd_, &one, sizeof(one));
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}

void EventLoop::wakeup() const {
LOG_TRACE << "wakeup!";
uint64_t one = 1;
ssize_t n = ::write(wakeupFd_, &one, sizeof(one));
if (n != sizeof(one)) {
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}

事件循环被唤醒之后会执行doPendingFunctors()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void EventLoop::doPendingFunctors() {
std::vector<Functor> functors;
callingPendingFunctors_ = true;

// 将队列中的 Functors 交换出来再执行,减小临界区大小
{
std::lock_guard<std::mutex> lockGuard(functorMutex_);
functors.swap(pendingFunctors_);
}

for (auto& f : functors) {
f();
}
callingPendingFunctors_ = false;
}

定时器

事件循环一般还要实现定时器效果,也就是在定时器到期后执行某段代码。这里实现了一个定时器队列,定时机制是通过 Linux 提供的 timerfd 实现的,相关的系统调用有以下 3 个:

1
2
3
4
5
6
// 创建一个 timerfd,返回文件描述符
int timerfd_create(int clockid, int flags);
// 设置超时时间,在到期之后文件描述符变为可读
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
// 获取指定 timerfd 距离下一次超时还剩的时间
int timerfd_gettime(int fd, struct itimerspec *curr_value);

Muduo 通过std::set(内部实现为红黑树)对定时器进行管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TimerQueue : noncopyable {
public:
typedef std::function<void()> TimerCallback;
explicit TimerQueue(EventLoop* loop);
~TimerQueue();
// 添加一个定时器
void addTimer(const TimerCallback& cb, Timestamp when, double interval);

private:
typedef std::pair<Timestamp, std::shared_ptr<Timer>> Entry; // TODO: use unique_ptr
typedef std::set<Entry> TimerList;

void handleRead();
void addTimerInLoop(const std::shared_ptr<Timer>& t);

bool insert(const std::shared_ptr<Timer>& t);
std::vector<Entry> getExpired(Timestamp now);

void reset(const std::vector<Entry>& expired, Timestamp now);

EventLoop* loop_;
const int timerFd_;
Channel timerFdChannel_;
TimerList timers_;
};

其中 Timer 类型是对超时时间、回调函数的简单封装,具体见源代码。和上面的wakeupFd_一样,通过 Channel 对 timerFd_ 进行封装,并注册对其读事件的监听:

1
2
3
4
5
6
7
8
9
10
11
TimerQueue::TimerQueue(EventLoop *loop)
: loop_(loop),
timerFd_(createTimerFd()),
timerFdChannel_(loop, timerFd_),
timers_()
{
timerFdChannel_.setReadCallback([this](Timestamp t) {
handleRead();
});
timerFdChannel_.enableReading();
}

在超时事件发生后,timerFd_ 可读从而唤醒事件循环,并调用相关的处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
void TimerQueue::handleRead() {
loop_->assertInLoopThread();
Timestamp now = Timestamp::now();
readTimerFd(timerFd_, now); // 清除 timerFd_ 的可读事件
std::vector<Entry> expired = getExpired(now); // 获取所有超时的定时器

for (const auto& entry : expired) {
entry.second->run(); // 执行定时器函数
}

reset(expired, now);
}

限于篇幅,一些其它的细节就不展开介绍了。总的来说,通过 Linux 中提供的 eventfdtimerfd 机制,将超时和唤醒事件都统一到 IO 多路复用也就是事件循环中处理,简化了设计。

总结

本来想一举将 Muduo 中事件循环的细节将清楚,但限于篇幅,这里还是只将重点的设计进行了介绍,详细的细节还是需要阅读源码甚至一步步调试。这里还是粘出我参照 Muduo 实现的一份网络库:https://github.com/xiaoqieF/CppNet.

参考文献

  1. https://github.com/xiaoqieF/CppNet.git
  2. https://github.com/chenshuo/muduo.git
  3. https://www.zhihu.com/question/26943938