One loop per thread
EventLoopThread
为了实现之前所说的多Reactor多线程模型,每个线程里含有且只含有一个事件循环(EventLoop),首先需要对这种线程做一个简单的包装,也就是这里实现的 EventLoopThread 类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class EventLoopThread : noncopyable { public: typedef std::function<void(EventLoop*)> ThreadInitCallback; explicit EventLoopThread(ThreadInitCallback cb = ThreadInitCallback{}); ~EventLoopThread(); EventLoop* startLoop();
private: void threadFunc();
EventLoop* loop_; std::unique_ptr<std::thread> thread_; std::mutex mutex_; std::condition_variable cond_; ThreadInitCallback cb_; };
|
这个类的用户调用startLoop
方法来开启一个新的线程,并在该线程里启动一个事件循环EventLoop
,这个方法返回该事件循环的地址(指针)。核心在于线程函数threadFunc
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| EventLoop *EventLoopThread::startLoop() { assert(thread_ == nullptr); thread_ = std::make_unique<std::thread>(&EventLoopThread::threadFunc, this); { std::unique_lock<std::mutex> lock(mutex_); while (loop_ == nullptr) { cond_.wait(lock); } } return loop_; }
void EventLoopThread::threadFunc() { EventLoop loop; if (cb_) { cb_(&loop); } { std::lock_guard<std::mutex> lockGuard(mutex_); loop_ = &loop; cond_.notify_one(); } loop.loop(); }
|
threadFunc
函数在运行时栈上生成了一个EventLoop
类实例,调用用户指定的回调函数,之后调用loop.loop()
运行事件循环。
EventLoopThreadPool
通过EventLoopThreadPool
类将上面的线程实现一个线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class EventLoopThreadPool : noncopyable { public: typedef std::function<void(EventLoop*)> ThreadInitCallback; explicit EventLoopThreadPool(EventLoop* baseLoop); ~EventLoopThreadPool(); void setThreadNum(int numThreads) { numThreads_ = numThreads; } void start(const ThreadInitCallback& cb = ThreadInitCallback{}); EventLoop* getNextLoop();
private: EventLoop* baseLoop_; bool started_; int numThreads_; size_t next_; std::vector<std::unique_ptr<EventLoopThread>> threads_; std::vector<EventLoop*> loops_; };
|
start
方法创建若干个EventLoopThread
线程,并保存它们的EventLoop
地址,需要注意的是这里可以创建numThreads_
为0的实例,这时候线程池中不含有线程,只含有一个主事件循环,模型也退化成单线程单 Reactor 模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void EventLoopThreadPool::start(const EventLoopThreadPool::ThreadInitCallback &cb) { assert(!started_); baseLoop_->assertInLoopThread();
started_ = true; for (int i=0; i<numThreads_; ++i) { threads_.emplace_back(new EventLoopThread(cb)); loops_.emplace_back(threads_[i]->startLoop()); } if (numThreads_ == 0 && cb) { cb(baseLoop_); } }
|
用户通过getNextLoop
方法可以通过 round-robin 的方式获取到线程池中的一个事件循环:
1 2 3 4 5 6 7 8 9 10 11
| EventLoop *EventLoopThreadPool::getNextLoop() { baseLoop_->assertInLoopThread(); EventLoop* loop = baseLoop_;
if (!loops_.empty()) { loop = loops_[next_]; next_ = (next_ + 1) % loops_.size(); } return loop; }
|
TCP 服务器
Acceptor
为了实现 TCP 服务器,首先需要开启监听套接字,对某个端口进行监听并接受客户端的连接,在 Reactor 模型中,这个行为通过 Acceptor
进行封装,并将其放在主事件循环中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class Acceptor : noncopyable { public: typedef std::function<void(int sockfd, const InetAddress&)> NewConnectionCallback; Acceptor(EventLoop* loop, const InetAddress& listenAddr); ~Acceptor();
void setNewConnectionCallback(const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } bool listening() const { return listening_; } void listen();
private: void handleRead();
EventLoop* loop_; Socket acceptSocket_; Channel acceptChannel_; NewConnectionCallback newConnectionCallback_; bool listening_; };
|
这个类持有了一个Socket
对象,这个类是对套接字基本操作进行了一个简单的封装,细节见源代码,这里就不展开介绍了。为了让这个连接套接字统一放到事件循环中进行事件监听,还持有了一个Channel
类的对象,在连接套接字发生可读事件时(也就是客户端请求连接到达时)会调用handleRead
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr(0); int connfd = acceptSocket_.accept(&peerAddr); if (connfd > 0) { if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); } else { LOG_TRACE << "no newConnectionCallback"; ::close(connfd); } } else { LOG_ERROR << "accept() error"; if (errno == EMFILE) { LOG_ERROR << "sockfd reached limit"; } } }
|
在handleRead
方法中(也就是客户端连接来到时),调用accept
方法接受客户端连接并通过回调newConnectionCallback_
将获得的新套接字文件描述符connfd
和客户端地址peerAddr
传递出去。
TcpConnection
TcpConnection
对一次 TCP 连接进行封装,这个类是 Muduo 网络库对使用者开放的类,可以通过TcpConnection
来接收和发送消息、关闭连接等。TcpConnection
本身通过共享指针来管理,这也是整个库中实现最复杂的类之一,在头文件中的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection> { public: TcpConnection(EventLoop* loop, std::string name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection();
EventLoop* getLoop() const { return loop_; } const std::string& name() const { return name_; } const InetAddress& localAddr() const { return localAddr_; } const InetAddress& peerAddr() const { return peerAddr_; } bool connected() const { return state_ == kConnected; }
void send(const void* message, size_t len); void send(const std::string& message); void send(Buffer* buf);
void shutdown(); void setTcpNoDelay(bool on);
void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; } void setHighWaterMarkCallback(const HighWaterMarkCallback& cb) { highWaterMarkCallback_ = cb; }
void connectEstablished(); void connectDestroyed();
private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
void handleRead(Timestamp receiveTime); void handleWrite(); void handleClose(); void handleError(); void sendInLoop(const void* message, size_t len); void shutdownInLoop();
EventLoop* loop_; std::string name_; std::atomic<StateE> state_; std::unique_ptr<Channel> channel_; std::unique_ptr<Socket> socket_;
InetAddress localAddr_; InetAddress peerAddr_;
ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; HighWaterMarkCallback highWaterMarkCallback_; CloseCallback closeCallback_;
size_t highWaterMark_; Buffer inputBuffer_; Buffer outputBuffer_; };
|
可以看到这个类持有了一个Socket
对象和该套接字文件描述符对应的Channel
对象,而且还持有了两个Buffer
对象,这两个缓冲区分别是用户输入和输出缓冲区(在应用层的缓冲区),缓冲区的设计是仿照Netty网络库中的,这里也不详细介绍了。用户可以通过这个类设置一些相关的事件回调函数,如连接建立和断开的回调函数、套接字接收到消息的回调函数、写缓冲区写完毕回调函数等。
TcpConnection
还有两个公开的特别的函数,分别是connectEstablished
和connectDestroyed
函数,这两个函数都只能在TcpConnection
自身所在的线程调用,也就是loop_
事件循环所在的线程,对应到多线程多 Reactor 模型中就是子线程中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void TcpConnection::connectEstablished() { loop_->assertInLoopThread(); assert(state_ == kConnecting); state_ = kConnected; channel_->tie(shared_from_this()); channel_->enableReading();
connectionCallback_(shared_from_this()); }
void TcpConnection::connectDestroyed() { loop_->assertInLoopThread(); if (state_ == kConnected) { state_ = kDisconnected; channel_->disableAll(); connectionCallback_(shared_from_this()); } channel_->remove(); }
|
调用 connectEstablished
之后就开启了该连接的事件监听,调用 connectDestroyed
之后就从事件循环中移除了这个连接。
我们关注一下 TCP 连接上数据接收和发送过程。
通过Channel
注册接收数据回调handleRead
,当事件循环通知当前 TCP 数据连接有数据到来时,会调用handleRead
回调函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void TcpConnection::handleRead(Timestamp receiveTime) { loop_->assertInLoopThread(); int saveErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &saveErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = saveErrno; LOG_ERROR << "TcpConnection::handleRead"; handleError(); } }
|
这个函数逻辑很简单,就是将套接字内核缓冲区的数据读取到用户inputBuffer_
中,并通过messageCallback_
回调通知用户数据的到来。这里需要注意的是,Epoll 采用的是 LT 触发方式,也就是只要套接字内核读缓冲区里还存在数据,事件循环就会一直通知,也就会一直调用handleRead
函数进行处理,因此在handleRead
尽量一次性就将套接字内核缓冲区的数据全部读出至用户缓冲区。
用户调用send
函数向对方套接字发送数据,send
函数将数据发送放到TcpConnection
所在的事件循环子线程中(这样避免数据竞争):
1 2 3 4 5 6 7 8 9 10 11
| void TcpConnection::send(const std::string &message) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(message.c_str(), message.size()); } else { loop_->runInLoop([this, &message]() { sendInLoop(message.c_str(), message.size()); }); } } }
|
在子线程中运行sendInLoop
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| void TcpConnection::sendInLoop(const void *message, size_t len) { loop_->assertInLoopThread(); ssize_t wrote = 0; size_t remaining = len; if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { wrote = ::write(channel_->fd(), message, len); if (wrote >= 0) { remaining = len - wrote; if (remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop([this]() { writeCompleteCallback_(shared_from_this()); }); } } else { wrote = 0; if (errno != EWOULDBLOCK) { LOG_ERROR << "TcpConnection::sendInLoop"; } } } assert(remaining <= len); if (remaining > 0) { LOG_TRACE << "write more data"; size_t oldLen = outputBuffer_.readableBytes(); if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop([this, cur = oldLen+remaining]() { highWaterMarkCallback_(shared_from_this(), cur); }); } outputBuffer_.append(static_cast<const char*>(message) + wrote, remaining); if (!channel_->isWriting()) { channel_->enableWriting(); } } }
|
逻辑略微复杂一些,当前 TCP 连接的用户写缓冲区没有数据时,直接调用write
系统调用写一次数据(实际上是将用户数据复制到套接字内核写缓冲区中),如果成功就返回(这是写入数据不多时经常发生的情况,套接字内核写缓冲区有很大的空间)。如果没有一次性将数据完全复制到套接字的内核写缓冲区中(一般是套接字内核写缓冲区还有很多数据没来得及发送到对方),这时就将数据保存在用户写缓冲区outputBuffer_
中,并开启事件循环对套接字可写事件的监听。
开启事件循环对写事件的监听后,当套接字内核写缓冲区数据变少之后通过可写事件通知,这时会调用handleWrite
回调函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void TcpConnection::handleWrite() { loop_->assertInLoopThread(); if (channel_->isWriting()) { ssize_t n = ::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes()); if (n > 0) { outputBuffer_.retrieve(n); if (outputBuffer_.readableBytes() == 0) { channel_->disableWriting(); if (writeCompleteCallback_) { loop_->queueInLoop([this]() { writeCompleteCallback_(shared_from_this()); }); } if (state_ == kDisconnecting) { shutdownInLoop(); } } else { LOG_TRACE << "write more data"; } } else { LOG_ERROR << "TcpConnection::handleWrite"; } } else { LOG_TRACE << "Connection is down, no more writing"; } }
|
这个函数将用户写缓冲区的数据复制到套接字内核写缓冲区中,当用户写缓冲区数据写完之后,关闭事件循环对可写事件的监听,不然会一直触发调用handleWrite
函数,这也是采用 LT 触发方式处理写事件需要注意的点。
在上面接收数据的函数handleRead
中,如果读取到 0 字节(也就是接收到对方发送的 Fin 分节)就会调用handleClose
函数来断开连接:
1 2 3 4 5 6 7 8
| void TcpConnection::handleClose() { loop_->assertInLoopThread(); LOG_TRACE << "TcpConnection::handleClose state = " << state_; state_ = kDisconnected; channel_->disableAll(); connectionCallback_(shared_from_this()); closeCallback_(shared_from_this()); }
|
TcpServer
TcpServer
结合上面的Acceptor
,EventLoopThreadPool
以及TcpConnection
来实现了一个多线程 TCP 服务器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class TcpServer : noncopyable { public: typedef std::function<void(EventLoop*)> ThreadInitCallback; TcpServer(EventLoop* loop, const InetAddress& listenAddr, std::string name); ~TcpServer();
const std::string& hostPort() const { return hostPort_; } const std::string& name() const { return name_; }
void setThreadNum(int numThreads); void setThreadInitCallback(const ThreadInitCallback& cb) { threadInitCallback_ = cb; } void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } void setWriteCompleteCallback (const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; }
void start();
private: void newConnection(int sockfd, const InetAddress& peerAddr); void removeConnection(const TcpConnectionPtr& conn); void removeConnectionInLoop(const TcpConnectionPtr& conn);
EventLoop* loop_; const std::string hostPort_; const std::string name_; std::unique_ptr<Acceptor> acceptor_; std::unique_ptr<EventLoopThreadPool> threadPool_;
ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; ThreadInitCallback threadInitCallback_;
std::atomic_bool started_; int nextConnId_; std::unordered_map<std::string, TcpConnectionPtr> connections_; };
|
用户调用start
函数开启事件循环子线程池,并开启Acceptor
的监听:
1 2 3 4 5 6 7 8 9 10
| void TcpServer::start() { loop_->assertInLoopThread(); if (!started_) { started_ = true; threadPool_->start(threadInitCallback_); } if (!acceptor_->listening()) { acceptor_->listen(); } }
|
在Acceptor
上发生新连接时调用回调函数newConnection
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) { loop_->assertInLoopThread(); EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[32]; snprintf(buf, sizeof(buf), ":%s#%d", hostPort_.c_str(), nextConnId_); ++ nextConnId_; std::string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); InetAddress localAddr(detail::getLocalAddr(sockfd)); TcpConnectionPtr conn = std::make_shared<TcpConnection>(ioLoop, connName, sockfd, localAddr, peerAddr); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback([this](const TcpConnectionPtr& p) { removeConnection(p); }); ioLoop->runInLoop([conn]() { conn->connectEstablished(); }); }
|
这个回调发生在主线程中,从子线程池中获取到一个子线程的事件循环,并初始化一个TcpConnection
对象实例放在该事件循环中,设置好相应的回调函数并在子线程事件循环中执行conectEstablished
开启事件循环对于该连接事件的监听。
参考文献
- https://github.com/xiaoqieF/CppNet.git
- https://github.com/chenshuo/muduo.git