Reactor 模型
在聊事件循环具体实现之前,讲一下网络编程中常用的并发模型。Linux 下,对文件描述符的操作默认都是阻塞的,如调用accept
接受客户端连接时,是从已完成连接队列头返回下一个已完成的连接,如果没有客户端连接,进程会休眠(阻塞在调用处)。再如对文件描述符调用read
系统调用读取消息,如果对方没有发来消息则会一直阻塞。这种阻塞工作模式的缺点是显而易见的:同一时间只能服务一个客户,阻塞在一个系统调用时,其他客户无法和服务端连接。
现代操作系统提供了 IO 多路复用(IO multiplexing)技术来解决上述问题,IO 多路复用思路是用户向内核注册感兴趣的文件描述符,以及这些描述符上感兴趣的事件(如可读事件或可写事件),然后等待事件发生时内核通知用户。在类 Unix 操作系统中一般有select
,poll
等系统调用,在 Linux 上有更加优秀的epoll_wait
调用。
那么网络编程的模式就变成了如下的模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| while (!stop_server) { int number = epoll_wait(epfd, events, MAX_EVENT_NUMBER, -1); for (int i=0; i<number; ++i) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_address_len = sizeof client_address; int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_address_len); if (connfd < 0) { printf("accept error, errno is:%d\n", errno); continue; } ... } else if (events[i].events & EPOLLIN) { ... ret = read(...) ... } else if (events[i].events & EPOLLOUT) { ... ret = write(...) ... } } }
|
上述模式可以同时服务多个客户,在没有事件发生的时候程序阻塞在多路复用调用处,当感兴趣的事件发生时去处理相应的事件,由于发生事件是内核通知的,程序在read
或accept
调用后会立即返回,并重新等待事件发生。这种模式很好,实际上,Reactor 模式就是对上面的模式进行了封装,使得更加易用且不用考虑底层细节。
Reactor 模式主要有 Reactor 和 Event Handler 两个核心模块组成,其中 Reactor 负责监听事件,当事件发生时通过事件分发机制将其发给 Event Handler 进行处理。根据 Reactor 数量以及 Event Handler 是否由多个线程(或进程)进行处理,可以有以下几个典型方案(图来自某篇知乎文章,但好像不是作者画的,链接放在末尾):
单 Reactor 单线程模式其实就是上面示例程序的封装,如果是建立连接事件,就通过 Acceptor 来处理,如果是其它业务事件,就通过 Handler 对象来处理。这种方案缺点是无法利用 多核 CPU 的性能,接收连接和处理客户请求都在一个线程里。
单 Reactor 多线程将 Reactor 放在一个单独的线程中,当有事件发生时将其分发至不同的子线程进行事件处理,这些子线程一般使用线程池来实现。主线程只负责事件相关的部分,如新连接到来,读取文件描述符中的数据,向文件描述符中写数据,业务处理都放到单独的线程中进行处理,这样能够充分发挥多核 CPU 的能力。
多 Reactor 多线程模式减轻了主线程的工作负担,主线程只负责监听建立连接的请求,当新连接建立事件发生时,通过 Acceptor 获取到客户对应的文件描述符,将其分发给子线程,子线程内部也有一个事件循环,它负责对主线程分配的客户进行服务,读取数据、处理业务和发送数据。
Muduo 库支持单 Reactor 单线程和多 Reactor 多线程,对于后者采用 one loop per thread 的形式。下面分别介绍 Muduo 库对于这些模块的详细设计,其对事件循环的主要封装类为 EventLoop, Channel 和 Poller,其中 EventLoop 是事件循环的“驱动者”,它持有 Poller 对象,并通过其提供的 poll
方法进行事件循环(也就是上面的 IO 多路复用调用),当发生事件后调用 Channel 中的事件处理函数进行处理。
Channel
Channel 类对某个文件描述符、相关事件、处理事件的方法进行了封装,其主要构成如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| class Channel : noncopyable{ public: typedef std::function<void()> EventCallback; typedef std::function<void(Timestamp )> ReadEventCallback;
Channel(EventLoop* loop, int fd); ~Channel() = default;
void handleEvent(Timestamp receiveTime); void setReadCallback(const ReadEventCallback& cb) { readCallback_ = cb; } void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; } void setCloseCallback(const EventCallback& cb) { closeCallback_ = cb; } void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; }
int fd() const { return fd_; } int events() const { return events_; } void set_revents(int revent) { revents_ = revent; } bool isNoneEvent() const { return events_ == kNoneEvent; }
void tie(const std::shared_ptr<void>&); void enableReading() { events_ |= kReadEvent; update(); } void enableWriting() { events_ |= kWriteEvent; update(); } void disableWriting() { events_ &= ~kWriteEvent; update(); } void disableAll() { events_ = kNoneEvent; update(); } bool isWriting() const { return events_ & kWriteEvent; }
int index() const { return index_; } void setIndex(int idx) { index_ = idx; }
EventLoop* ownerLoop() { return loop_; } void remove();
std::string reventsToString() const;
private: void update(); void handleEventWithGuard(Timestamp receiveTime);
EventLoop* loop_; const int fd_; int events_; int revents_; int index_;
ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_;
std::weak_ptr<void> tie_; bool tied_;
static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; };
|
需要注意的有以下几点:
- 更新 Channel 中的感兴趣事件实际需要通过 EventLoop 进行,EventLoop 通过其持有的 Poller 来实际执行,Channel 和 Poller 并没有直接的联系。
1 2 3 4 5 6 7
| void Channel::update() { loop_->updateChannel(this); }
void Channel::remove() { loop_->removeChannel(this); }
|
handleEvent(Timestamp)
方法在事件发生(IO 多路复用返回)之后被 EventLoop 调用,根据发生事件的类型来调用相关的回调函数(用户注册的回调)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void Channel::handleEventWithGuard(Timestamp receiveTime) { if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { LOG_WARN << "Channel::handleEvent POLLHUP"; if (closeCallback_) { closeCallback_(); } } if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handleEvent POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) { errorCallback_(); } } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) { readCallback_(receiveTime); } } if (revents_ & POLLOUT) { if (writeCallback_) { writeCallback_(); } } }
|
std::weak_ptr<void> tie_
成员用于持有一个对象的弱引用,这里是持有 TcpConnection
类的弱引用,这个类是使用本网络库的用户直接可见的,为了防止用户在处理事件时(使用Channel类时)意外的将 TcpConnection
类销毁(从而导致其持有的 Channel 也被销毁),因此在处理 Channel 中的事件时将 tie_
提升为强引用,这样保证安全。
1 2 3 4 5 6 7 8 9 10 11 12
| void Channel::handleEvent(Timestamp receiveTime) { std::shared_ptr<void > guard; if (tied_) { guard = tie_.lock(); if (guard) { handleEventWithGuard(receiveTime); } } else { handleEventWithGuard(receiveTime); } }
|
Poller
Poller 类是对 IO 多路复用系统调用的一个封装类,可以向其中注册感兴趣的 Channel,阻塞等待事件发生,事件发生之后返回发生的事件(通过 Channel 类)。为了同时支持 poll
和 epoll_wait
两种多路复用调用,将 Poller 类实现成一个抽象类,并定义好暴露给用户的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class Poller : noncopyable{ public: typedef std::vector<Channel*> ChannelList; explicit Poller(EventLoop* loop); virtual ~Poller();
virtual Timestamp poll(int timeouts, ChannelList* activeChannels) = 0; virtual void updateChannel(Channel* channel) = 0; virtual void removeChannel(Channel* channel) = 0; void assertInLoopThread();
private: EventLoop* ownerLoop_; };
|
具体的实现类为 EpollPoller 和 PollPoller,这里以 EpollPoller 为例说明主要的实现,其内部持有了 events_
成员用于保存发生的事件(作为 epoll_wait
的参数),持有 std::map<int, Channel*> channels_
成员记录注册的 Channel 对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class EpollPoller : public Poller { public: explicit EpollPoller(EventLoop* loop_); ~EpollPoller() override;
void updateChannel(Channel *channel) override; void removeChannel(Channel *channel) override; Timestamp poll(int timeouts, ChannelList *activeChannels) override; private: static const int kInitEventListSize = 16; void update(int operation, Channel* channel); void fillActiveChannels(int numEvent, ChannelList* activeChannels) const;
int epfd_; std::vector<epoll_event> events_; std::map<int, Channel*> channels_; };
|
poll
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Timestamp EpollPoller::poll(int timeouts, Poller::ChannelList *activeChannels) { int numEvents = ::epoll_wait(epfd_, &*events_.begin(), static_cast<int>(events_.size()), timeouts); Timestamp now = Timestamp::now(); if (numEvents > 0) { LOG_TRACE << numEvents << " events happened"; fillActiveChannels(numEvents, activeChannels); if (static_cast<size_t>(numEvents) == events_.size()) { events_.resize(events_.size() * 2); } } else if (numEvents == 0) { LOG_TRACE << "nothing happened"; } else { if (errno != EINTR) { LOG_FATAL << "EpollPoller::poll()"; } } return now; }
|
实际就是调用了 IO 多路复用 epoll_wait
,并将发生的事件填充到相应的 Channel 中。
EventLoop
基本功能
EventLoop 是整个事件循环的核心。核心功能是驱动 Poller 将事件循环跑起来,因此其简化版本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| class EventLoop { public: typedef std::function<void()> Functor; EventLoop(); ~EventLoop();
void loop(); void quit();
void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
void updateChannel(Channel* channel); void removeChannel(Channel* channel);
private: void abortNotInLoopThread(); std::atomic_bool looping_; std::atomic_bool quit_; const pid_t threadId_;
std::unique_ptr<Poller> poller_; Timestamp pollReturnTime_; std::vector<Channel*> activeChannels_; };
|
- 为了避免复杂的竞争关系,Muduo 设计一个线程中只能存在一个 EventLoop 对象,通过
assertInLoopThread()
来保证在当前线程,将其放在其它代码前面可以保证后面的代码只在 loop 所在的线程调用,防止别的线程调用引发竞争。具体实现就是在 EventLoop 对象初始化时保存当前的线程 ID,在 assertInLoopThread()
中比较当前线程 ID 和之前保存的线程 ID。获取当前线程 ID 使用 Linux 提供的系统调用:
1
| t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
|
为了避免频繁系统调用带来的开销,实际上使用线程局部变量 __thread
标志来缓存线程的 ID,这里不展开说了。
- EventLoop 通过
std::unique_ptr
持有了一个 Poller 对象,在 loop
中调用 poller_->poll()
,返回结果放在 activeChannels_
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| void EventLoop::loop() { assertInLoopThread(); looping_ = true; LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeout, &activeChannels_);
for (auto c : activeChannels_) { LOG_TRACE << "{" << c->reventsToString() << "}"; }
for (auto c : activeChannels_) { c->handleEvent(pollReturnTime_); } doPendingFunctors(); }
looping_ = false; LOG_TRACE << "EventLoop " << this << " stop looping"; }
|
唤醒事件循环
一般情况下,事件循环每次超时 kPollTimeout 后会唤醒一次,Muduo 中默认是 10 s,有时候我们希望立即在主循环中执行某个任务,也就是上面的doPendingFunctors()
,这时候需要唤醒事件循环。
这里唤醒事件循环机制通过 Linux 中的 eventfd
来实现,eventfd 是在 Linux 2.6.22 版本后引入的,专门用于事件通知。它可以创建一个文件描述符,这个文件描述符内容为一个计数器,对其write
能够增加计数,对其read
能够将计数清零,且这个文件描述符可以使用 IO 多路复用机制来监听。
使用 Channel 对 wakeupFd_ 进行封装,并在 EventLoop 构造函数中注册对其读事件的监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| private: int wakeupFd_; std::unique_ptr<Channel> wakeupChannel_;
std::mutex functorMutex_; std::vector<Functor> pendingFunctors_; std::atomic_bool callingPendingFunctors_;
EventLoop::EventLoop() : looping_(false), quit_(false), threadId_(CurrentThread::tid()), poller_(new EpollPoller(this)), timerQueue_(std::make_unique<TimerQueue>(this)), wakeupFd_(detail::createEventFd()), wakeupChannel_(new Channel(this, wakeupFd_)), callingPendingFunctors_(false) { ... wakeupChannel_->setReadCallback([this](Timestamp t) { handleWakeup(); }); wakeupChannel_->enableReading(); }
|
唤醒和处理唤醒事件就是向 eventFd_
的读写操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| void EventLoop::handleWakeup() const { uint64_t one = 1; ssize_t n = ::read(wakeupFd_, &one, sizeof(one)); if (n != sizeof one) { LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; } }
void EventLoop::wakeup() const { LOG_TRACE << "wakeup!"; uint64_t one = 1; ssize_t n = ::write(wakeupFd_, &one, sizeof(one)); if (n != sizeof(one)) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } }
|
事件循环被唤醒之后会执行doPendingFunctors()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true;
{ std::lock_guard<std::mutex> lockGuard(functorMutex_); functors.swap(pendingFunctors_); }
for (auto& f : functors) { f(); } callingPendingFunctors_ = false; }
|
定时器
事件循环一般还要实现定时器效果,也就是在定时器到期后执行某段代码。这里实现了一个定时器队列,定时机制是通过 Linux 提供的 timerfd
实现的,相关的系统调用有以下 3 个:
1 2 3 4 5 6
| int timerfd_create(int clockid, int flags);
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
int timerfd_gettime(int fd, struct itimerspec *curr_value);
|
Muduo 通过std::set
(内部实现为红黑树)对定时器进行管理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| class TimerQueue : noncopyable { public: typedef std::function<void()> TimerCallback; explicit TimerQueue(EventLoop* loop); ~TimerQueue(); void addTimer(const TimerCallback& cb, Timestamp when, double interval);
private: typedef std::pair<Timestamp, std::shared_ptr<Timer>> Entry; typedef std::set<Entry> TimerList;
void handleRead(); void addTimerInLoop(const std::shared_ptr<Timer>& t);
bool insert(const std::shared_ptr<Timer>& t); std::vector<Entry> getExpired(Timestamp now);
void reset(const std::vector<Entry>& expired, Timestamp now);
EventLoop* loop_; const int timerFd_; Channel timerFdChannel_; TimerList timers_; };
|
其中 Timer 类型是对超时时间、回调函数的简单封装,具体见源代码。和上面的wakeupFd_
一样,通过 Channel 对 timerFd_
进行封装,并注册对其读事件的监听:
1 2 3 4 5 6 7 8 9 10 11
| TimerQueue::TimerQueue(EventLoop *loop) : loop_(loop), timerFd_(createTimerFd()), timerFdChannel_(loop, timerFd_), timers_() { timerFdChannel_.setReadCallback([this](Timestamp t) { handleRead(); }); timerFdChannel_.enableReading(); }
|
在超时事件发生后,timerFd_
可读从而唤醒事件循环,并调用相关的处理函数:
1 2 3 4 5 6 7 8 9 10 11 12
| void TimerQueue::handleRead() { loop_->assertInLoopThread(); Timestamp now = Timestamp::now(); readTimerFd(timerFd_, now); std::vector<Entry> expired = getExpired(now);
for (const auto& entry : expired) { entry.second->run(); }
reset(expired, now); }
|
限于篇幅,一些其它的细节就不展开介绍了。总的来说,通过 Linux 中提供的 eventfd
和 timerfd
机制,将超时和唤醒事件都统一到 IO 多路复用也就是事件循环中处理,简化了设计。
总结
本来想一举将 Muduo 中事件循环的细节将清楚,但限于篇幅,这里还是只将重点的设计进行了介绍,详细的细节还是需要阅读源码甚至一步步调试。这里还是粘出我参照 Muduo 实现的一份网络库:https://github.com/xiaoqieF/CppNet.
参考文献
- https://github.com/xiaoqieF/CppNet.git
- https://github.com/chenshuo/muduo.git
- https://www.zhihu.com/question/26943938