Muduo网络库实现(三):TCP服务器

One loop per thread

EventLoopThread

为了实现之前所说的多Reactor多线程模型,每个线程里含有且只含有一个事件循环(EventLoop),首先需要对这种线程做一个简单的包装,也就是这里实现的 EventLoopThread 类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class EventLoopThread : noncopyable {
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
explicit EventLoopThread(ThreadInitCallback cb = ThreadInitCallback{});
~EventLoopThread();
EventLoop* startLoop();

private:
void threadFunc();

EventLoop* loop_;
std::unique_ptr<std::thread> thread_;
std::mutex mutex_;
std::condition_variable cond_;
ThreadInitCallback cb_;
};

这个类的用户调用startLoop方法来开启一个新的线程,并在该线程里启动一个事件循环EventLoop,这个方法返回该事件循环的地址(指针)。核心在于线程函数threadFunc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
EventLoop *EventLoopThread::startLoop() {
assert(thread_ == nullptr);
thread_ = std::make_unique<std::thread>(&EventLoopThread::threadFunc, this);
{
std::unique_lock<std::mutex> lock(mutex_);
while (loop_ == nullptr) {
cond_.wait(lock);
}
}
return loop_;
}

void EventLoopThread::threadFunc() {
EventLoop loop;
if (cb_) {
cb_(&loop);
}
{
std::lock_guard<std::mutex> lockGuard(mutex_);
loop_ = &loop;
cond_.notify_one();
}
loop.loop();
}

threadFunc函数在运行时栈上生成了一个EventLoop类实例,调用用户指定的回调函数,之后调用loop.loop()运行事件循环。

EventLoopThreadPool

通过EventLoopThreadPool类将上面的线程实现一个线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class EventLoopThreadPool : noncopyable {
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
explicit EventLoopThreadPool(EventLoop* baseLoop);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
void start(const ThreadInitCallback& cb = ThreadInitCallback{});
EventLoop* getNextLoop();

private:
EventLoop* baseLoop_;
bool started_;
int numThreads_;
size_t next_;
std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop*> loops_;
};

start方法创建若干个EventLoopThread线程,并保存它们的EventLoop地址,需要注意的是这里可以创建numThreads_为0的实例,这时候线程池中不含有线程,只含有一个主事件循环,模型也退化成单线程单 Reactor 模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void EventLoopThreadPool::start(const EventLoopThreadPool::ThreadInitCallback &cb) {
assert(!started_);
// 保证在 baseLoop 所在的线程执行,避免多个线程执行当前函数导致竞争
baseLoop_->assertInLoopThread();

started_ = true;
for (int i=0; i<numThreads_; ++i) {
threads_.emplace_back(new EventLoopThread(cb));
loops_.emplace_back(threads_[i]->startLoop());
}
if (numThreads_ == 0 && cb) {
cb(baseLoop_);
}
}

用户通过getNextLoop方法可以通过 round-robin 的方式获取到线程池中的一个事件循环:

1
2
3
4
5
6
7
8
9
10
11
EventLoop *EventLoopThreadPool::getNextLoop() {
baseLoop_->assertInLoopThread();
EventLoop* loop = baseLoop_; // 如果没有 subLoop,则返回 baseLoop

if (!loops_.empty()) {
// round-robin
loop = loops_[next_];
next_ = (next_ + 1) % loops_.size();
}
return loop;
}

TCP 服务器

Acceptor

为了实现 TCP 服务器,首先需要开启监听套接字,对某个端口进行监听并接受客户端的连接,在 Reactor 模型中,这个行为通过 Acceptor 进行封装,并将其放在主事件循环中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Acceptor : noncopyable {
public:
typedef std::function<void(int sockfd, const InetAddress&)> NewConnectionCallback;
Acceptor(EventLoop* loop, const InetAddress& listenAddr);
~Acceptor();

void setNewConnectionCallback(const NewConnectionCallback& cb) {
newConnectionCallback_ = cb;
}
bool listening() const { return listening_; }
void listen();

private:
void handleRead(); // 新连接到来后回调

EventLoop* loop_;
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listening_;
};

这个类持有了一个Socket对象,这个类是对套接字基本操作进行了一个简单的封装,细节见源代码,这里就不展开介绍了。为了让这个连接套接字统一放到事件循环中进行事件监听,还持有了一个Channel类的对象,在连接套接字发生可读事件时(也就是客户端请求连接到达时)会调用handleRead方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void Acceptor::handleRead() {
loop_->assertInLoopThread();
InetAddress peerAddr(0);
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd > 0) {
if (newConnectionCallback_) {
newConnectionCallback_(connfd, peerAddr);
} else {
LOG_TRACE << "no newConnectionCallback";
::close(connfd);
}
} else {
LOG_ERROR << "accept() error";
if (errno == EMFILE) {
LOG_ERROR << "sockfd reached limit";
}
}
}

handleRead方法中(也就是客户端连接来到时),调用accept方法接受客户端连接并通过回调newConnectionCallback_将获得的新套接字文件描述符connfd和客户端地址peerAddr传递出去。

TcpConnection

TcpConnection对一次 TCP 连接进行封装,这个类是 Muduo 网络库对使用者开放的类,可以通过TcpConnection来接收和发送消息、关闭连接等。TcpConnection本身通过共享指针来管理,这也是整个库中实现最复杂的类之一,在头文件中的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class TcpConnection : noncopyable,
public std::enable_shared_from_this<TcpConnection>
{
public:
TcpConnection(EventLoop* loop,
std::string name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();

EventLoop* getLoop() const { return loop_; }
const std::string& name() const { return name_; }
const InetAddress& localAddr() const { return localAddr_; }
const InetAddress& peerAddr() const { return peerAddr_; }
bool connected() const { return state_ == kConnected; }

void send(const void* message, size_t len);
void send(const std::string& message);
void send(Buffer* buf);

void shutdown();
void setTcpNoDelay(bool on);

void setConnectionCallback(const ConnectionCallback& cb) {
connectionCallback_ = cb;
}
void setMessageCallback(const MessageCallback& cb) {
messageCallback_ = cb;
}
void setWriteCompleteCallback(const WriteCompleteCallback& cb) {
writeCompleteCallback_ = cb;
}
void setCloseCallback(const CloseCallback& cb) {
closeCallback_ = cb;
}
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb) {
highWaterMarkCallback_ = cb;
}

void connectEstablished();
void connectDestroyed();

private:
enum StateE {
kDisconnected,
kConnecting,
kConnected,
kDisconnecting
};

void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();

EventLoop* loop_;
std::string name_;
std::atomic<StateE> state_;
std::unique_ptr<Channel> channel_; // 持有 Channel
std::unique_ptr<Socket> socket_; // 持有 Socket

InetAddress localAddr_;
InetAddress peerAddr_;

ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;

size_t highWaterMark_;
Buffer inputBuffer_; // 用户读缓冲区
Buffer outputBuffer_; // 用户写缓冲区
};

可以看到这个类持有了一个Socket对象和该套接字文件描述符对应的Channel对象,而且还持有了两个Buffer对象,这两个缓冲区分别是用户输入和输出缓冲区(在应用层的缓冲区),缓冲区的设计是仿照Netty网络库中的,这里也不详细介绍了。用户可以通过这个类设置一些相关的事件回调函数,如连接建立和断开的回调函数、套接字接收到消息的回调函数、写缓冲区写完毕回调函数等。

TcpConnection还有两个公开的特别的函数,分别是connectEstablishedconnectDestroyed函数,这两个函数都只能在TcpConnection自身所在的线程调用,也就是loop_事件循环所在的线程,对应到多线程多 Reactor 模型中就是子线程中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void TcpConnection::connectEstablished() {
loop_->assertInLoopThread();
assert(state_ == kConnecting);
state_ = kConnected;
channel_->tie(shared_from_this()); // 防止 TcpConnection 被销毁了, channel_ 还在处理事件
channel_->enableReading();

connectionCallback_(shared_from_this());
}

void TcpConnection::connectDestroyed() {
loop_->assertInLoopThread();
if (state_ == kConnected) {
state_ = kDisconnected;
channel_->disableAll();
connectionCallback_(shared_from_this());
}
channel_->remove();
}

调用 connectEstablished 之后就开启了该连接的事件监听,调用 connectDestroyed 之后就从事件循环中移除了这个连接。

我们关注一下 TCP 连接上数据接收和发送过程。

  • 接收数据

通过Channel注册接收数据回调handleRead,当事件循环通知当前 TCP 数据连接有数据到来时,会调用handleRead回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TcpConnection::handleRead(Timestamp receiveTime) {
loop_->assertInLoopThread();
int saveErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &saveErrno);
if (n > 0) {
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
handleClose();
} else {
errno = saveErrno;
LOG_ERROR << "TcpConnection::handleRead";
handleError();
}
}

这个函数逻辑很简单,就是将套接字内核缓冲区的数据读取到用户inputBuffer_中,并通过messageCallback_回调通知用户数据的到来。这里需要注意的是,Epoll 采用的是 LT 触发方式,也就是只要套接字内核读缓冲区里还存在数据,事件循环就会一直通知,也就会一直调用handleRead函数进行处理,因此在handleRead尽量一次性就将套接字内核缓冲区的数据全部读出至用户缓冲区。

  • 发送数据

用户调用send函数向对方套接字发送数据,send函数将数据发送放到TcpConnection所在的事件循环子线程中(这样避免数据竞争):

1
2
3
4
5
6
7
8
9
10
11
void TcpConnection::send(const std::string &message) {
if (state_ == kConnected) {
if (loop_->isInLoopThread()) {
sendInLoop(message.c_str(), message.size());
} else {
loop_->runInLoop([this, &message]() {
sendInLoop(message.c_str(), message.size());
});
}
}
}

在子线程中运行sendInLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void TcpConnection::sendInLoop(const void *message, size_t len) {
loop_->assertInLoopThread();
ssize_t wrote = 0;
size_t remaining = len;
// 如果缓冲区没有待发数据,直接调用 write 写一次数据
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
wrote = ::write(channel_->fd(), message, len);
if (wrote >= 0) {
remaining = len - wrote;
if (remaining == 0 && writeCompleteCallback_) {
loop_->queueInLoop([this]() {
writeCompleteCallback_(shared_from_this());
});
}
} else {
wrote = 0;
if (errno != EWOULDBLOCK) {
/// FIXME: SIGPIPE
LOG_ERROR << "TcpConnection::sendInLoop";
}
}
}
assert(remaining <= len);
// 一次没写完
if (remaining > 0) {
LOG_TRACE << "write more data";
size_t oldLen = outputBuffer_.readableBytes();
// 高水位
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_ && highWaterMarkCallback_) {
loop_->queueInLoop([this, cur = oldLen+remaining]() {
highWaterMarkCallback_(shared_from_this(), cur);
});
}
outputBuffer_.append(static_cast<const char*>(message) + wrote, remaining);
if (!channel_->isWriting()) {
channel_->enableWriting(); // 关注 POLLOUT 事件
}
}
}

逻辑略微复杂一些,当前 TCP 连接的用户写缓冲区没有数据时,直接调用write系统调用写一次数据(实际上是将用户数据复制到套接字内核写缓冲区中),如果成功就返回(这是写入数据不多时经常发生的情况,套接字内核写缓冲区有很大的空间)。如果没有一次性将数据完全复制到套接字的内核写缓冲区中(一般是套接字内核写缓冲区还有很多数据没来得及发送到对方),这时就将数据保存在用户写缓冲区outputBuffer_中,并开启事件循环对套接字可写事件的监听。

开启事件循环对写事件的监听后,当套接字内核写缓冲区数据变少之后通过可写事件通知,这时会调用handleWrite回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void TcpConnection::handleWrite() {
loop_->assertInLoopThread();
if (channel_->isWriting()) {
ssize_t n = ::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) {
channel_->disableWriting();
if (writeCompleteCallback_) {
loop_->queueInLoop([this]() {
writeCompleteCallback_(shared_from_this());
});
}
if (state_ == kDisconnecting) {
shutdownInLoop();
}
} else {
LOG_TRACE << "write more data";
}
} else {
LOG_ERROR << "TcpConnection::handleWrite";
}
} else {
LOG_TRACE << "Connection is down, no more writing";
}
}

这个函数将用户写缓冲区的数据复制到套接字内核写缓冲区中,当用户写缓冲区数据写完之后,关闭事件循环对可写事件的监听,不然会一直触发调用handleWrite函数,这也是采用 LT 触发方式处理写事件需要注意的点。

  • 关闭连接

在上面接收数据的函数handleRead中,如果读取到 0 字节(也就是接收到对方发送的 Fin 分节)就会调用handleClose函数来断开连接:

1
2
3
4
5
6
7
8
void TcpConnection::handleClose() {
loop_->assertInLoopThread();
LOG_TRACE << "TcpConnection::handleClose state = " << state_;
state_ = kDisconnected;
channel_->disableAll();
connectionCallback_(shared_from_this());
closeCallback_(shared_from_this());
}

TcpServer

TcpServer结合上面的Acceptor,EventLoopThreadPool以及TcpConnection来实现了一个多线程 TCP 服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class TcpServer : noncopyable {
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
TcpServer(EventLoop* loop, const InetAddress& listenAddr, std::string name);
~TcpServer();

const std::string& hostPort() const { return hostPort_; }
const std::string& name() const { return name_; }

void setThreadNum(int numThreads);
void setThreadInitCallback(const ThreadInitCallback& cb) {
threadInitCallback_ = cb;
}
void setConnectionCallback(const ConnectionCallback& cb) {
connectionCallback_ = cb;
}
void setMessageCallback(const MessageCallback& cb) {
messageCallback_ = cb;
}
void setWriteCompleteCallback (const WriteCompleteCallback& cb) {
writeCompleteCallback_ = cb;
}

// not thread safe, call it in loop thread
void start();

private:
void newConnection(int sockfd, const InetAddress& peerAddr);
void removeConnection(const TcpConnectionPtr& conn);
void removeConnectionInLoop(const TcpConnectionPtr& conn);

EventLoop* loop_;
const std::string hostPort_;
const std::string name_;
std::unique_ptr<Acceptor> acceptor_;
std::unique_ptr<EventLoopThreadPool> threadPool_;

ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
ThreadInitCallback threadInitCallback_;

std::atomic_bool started_;
int nextConnId_;
std::unordered_map<std::string, TcpConnectionPtr> connections_;
};

用户调用start函数开启事件循环子线程池,并开启Acceptor的监听:

1
2
3
4
5
6
7
8
9
10
void TcpServer::start() {
loop_->assertInLoopThread();
if (!started_) {
started_ = true;
threadPool_->start(threadInitCallback_);
}
if (!acceptor_->listening()) {
acceptor_->listen();
}
}

Acceptor上发生新连接时调用回调函数newConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) {
loop_->assertInLoopThread(); // 在主线程
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[32];
snprintf(buf, sizeof(buf), ":%s#%d", hostPort_.c_str(), nextConnId_);
++ nextConnId_;
std::string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(detail::getLocalAddr(sockfd));
TcpConnectionPtr conn = std::make_shared<TcpConnection>(ioLoop,
connName, sockfd, localAddr, peerAddr);
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback([this](const TcpConnectionPtr& p) {
removeConnection(p);
});
ioLoop->runInLoop([conn]() {
conn->connectEstablished();
});
}

这个回调发生在主线程中,从子线程池中获取到一个子线程的事件循环,并初始化一个TcpConnection对象实例放在该事件循环中,设置好相应的回调函数并在子线程事件循环中执行conectEstablished开启事件循环对于该连接事件的监听。

参考文献

  1. https://github.com/xiaoqieF/CppNet.git
  2. https://github.com/chenshuo/muduo.git