Muduo网络库实现(一):异步日志

关于日志

一般应用程序都会有一个日志模块,因为程序一旦发布或部署,就不允许直接通过调试来排查问题了,这时候日志成为观察程序运行情况的重要工具。

一般一条日志至少需要打印的信息有:日期、日志等级、打印日志的文件名和行号、日志内容等,在 Muduo 日志库中,还打印了产生日志的线程 ID。下面是我重写 Muduo 库后日志输出示例:

1
2
20230425 20:32:21.265016Z 14061 [TRACE] EpollPoller.cc:29 - updateChannel, fd = 4 events = 3
20230425 20:32:21.265059Z 14061 [TRACE] EventLoop.cc:45 - EventLoop created 0x7ffca56388a0 in thread: 14061

日志的最初原型是将一些信息打印到控制台,实际情况是将日志输出到文件中,根据写日志的时机不同,分为同步日志异步日志

  • 同步日志

同步日志是指产生日志的同时就将其写入至文件中,这种方式优点是设计简单、能够及时的将日志输出到文件中,缺点是会频繁的触发 IO 操作,且对写日志的线程阻塞。在客户端程序中,这种方式很常见,写入几条日志的时间相比于用户的反应时间来说可以忽略不计,用户很难察觉这点性能损失。即使是在 UI 线程中,只要日志量不大的情况下也不会造成什么影响。

  • 异步日志

对于服务端程序而言,使用同步写日志的方式会影响性能,服务器线程阻塞时间越长,其为客户端提供服务的时间就越短,同时为多个客户端提供服务的能力也越差。现代计算机的 CPU 一般都是具有多个核心的,因此服务端程序常采用多线程程序来尽量发挥每一个核心的能力,异步日志为了减少写日志对线程的阻塞,采用一个单独的线程来向磁盘写入日志,其它线程只是产生日志并将其交给写磁盘线程。

异步日志是一个典型的生产者消费者模型,分为日志前端和日志后端。前端就是生产者,也就是产生日志的一方,后端是日志的消费者,负责将日志写入磁盘,通常使用一个单独的线程来完成。Muduo 网络库实现了一个简单的异步日志系统,下面介绍这个日志系统的设计实现。

日志前端

FixedBuffer 设计

FixedBuffer实现了一个固定大小的缓冲区,其中使用了非类型模板参数,用于产生不同大小的固定缓冲区,设计比较简单,不详细描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
namespace CppNet {
namespace detail {
const int kSmallBuffer = 4000;
const int kLargeBuffer = 4000 * 1000;

// 使用非类型模板参数
template<int SIZE>
class FixedBuffer : noncopyable {
public:
FixedBuffer() : cur_(data_) {}
~FixedBuffer() = default;

void append(const char* buf, size_t len) {
if (static_cast<size_t>(avail()) > len) {
memcpy(cur_, buf, len);
cur_ += len;
}
}

const char* data() const {
return data_;
}
int length() const {
return cur_ - data_;
}
char* current() {
return cur_;
}
int avail() const {
return end() - cur_;
}
void add(size_t len) {
cur_ += len;
}
void reset() {
cur_ = data_;
}
void bzero() {
::bzero(data_, sizeof(data_));
}
std::string toString() const {
return std::string(data_, length());
}

private:
const char* end() const {
return data_ + sizeof(data_);
}
char data_[SIZE];
char* cur_;
};
}
}

LogStream 设计

LogStream 使用上面的 FixedBuffer<kSmallBuffer> 做了一个单条日志缓冲区(默认是4000字节大小),通过重写<<运算符来实现所有基本数据类型的格式化输出,在 LogStream.h 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
namespace CppNet {
class LogStream : noncopyable{
typedef LogStream self;
public:
typedef detail::FixedBuffer<detail::kSmallBuffer> Buffer;

self& operator<<(bool v) {
buffer_.append(v ? "1" : "0", 1);
return *this;
}
self& operator<<(short);
self& operator<<(unsigned short);
self& operator<<(int);
self& operator<<(unsigned int);
self& operator<<(long);
self& operator<<(unsigned long);
self& operator<<(long long);
self& operator<<(unsigned long long);

self& operator<<(float v);
self& operator<<(double);
self& operator<<(char v) {
buffer_.append(&v, 1);
return *this;
}
self& operator<<(const char* v) {
buffer_.append(v, strlen(v));
return *this;
}
self& operator<<(const void* v);
self& operator<<(const std::string& v) {
buffer_.append(v.c_str(), v.size());
return *this;
}

void append(const char* data, size_t len) {
buffer_.append(data, len);
}
const Buffer& buffer() const {
return buffer_;
}
void resetBuffer() {
buffer_.reset();
}
private:
static const int kMaxNumericSize = 48;
template<typename T>
void formatInteger(T); // 这里模板实现不在头文件可以,因为没有其他类外部的访问(private的模板函数可以在cpp实现)
Buffer buffer_;
};

实现上,整数类型(包括有符号和无符号)的格式化输出通过调用 formatInteger 来实现,浮点类型的格式化输出通过调用snprintf函数来实现。读者可以在实现文件中查看具体实现,源代码链接在文末给出。

为了实现用户自定义格式输出,定义了一个很小的格式化类Fmt,并定义 operator<<(LogStream& s, const Fmt& fmt) 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Fmt {
public:
template<typename T>
Fmt(const char* fmt, T val);

const char* data() const {
return buf_;
}
int length() const {
return length_;
}

private:
char buf_[32];
int length_;
};

// 在 Fmt 构造函数中通过 snprintf 进行数据格式化
template<typename T>
Fmt::Fmt(const char *fmt, T val) {
length_ = snprintf(buf_, sizeof(buf_), fmt, val);
assert(static_cast<size_t>(length_) < sizeof(buf_));
}

inline LogStream& operator<<(LogStream& s, const Fmt& fmt) {
s.append(fmt.data(), fmt.length());
return s;
}

Logging 设计

Logging.h 和 Logging.cc 文件定义和实现了用户使用的接口,包括 Logger 类和一系列宏,在 C++ 中,日志库中通过定义宏来输出行号、文件名等是十分常见的操作。日志生成和输出是通过 Logger 类的构造和析构来完成的。

首先,Logger 类采用 pImpl 手法将实现进行了隐藏,从而减少了文件之间的依赖,pImpl 手法在我之前的博客中有提到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Logger {
public:
enum LogLevel {
TRACE,
DEBUG,
INFO,
WARN,
ERROR,
FATAL,
NUM_LOG_LEVELS,
};

Logger(const char* file, int line);
Logger(const char* file, int line, LogLevel level);
~Logger();
LogStream& stream();

public:
static LogLevel logLevel() { return g_logLevel; }
static void setLogLevel(LogLevel level) { g_logLevel = level; }

typedef void (*OutputFunc)(const char* msg, int len);
typedef void (*FlushFunc)();
static void setOutput(OutputFunc o) { g_output = o; }
static void setFlush(FlushFunc f) { g_flush = f; }

private:
static LogLevel g_logLevel;
static OutputFunc g_output;
static FlushFunc g_flush;

class Impl;
std::unique_ptr<Impl> impl;
};

上述类的接口很简单,用户可以通过三个类静态方法来设置日志等级、日志输出接口以及刷新缓冲区接口。具体看看实现类(在 Logging.cc 文件中):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Logger::Impl {
public:
typedef Logger::LogLevel LogLevel;
Impl(LogLevel level, int old_errno, const char* file, int line)
: time_(Timestamp::now()),
stream_(),
level_(level),
line_(line),
basename_(file)
{
formatTime();
CurrentThread::tid();
stream_.append(CurrentThread::tidString(), CurrentThread::t_tidLen);
stream_.append(LogLevelName[level], 8);
if (old_errno != 0) {
stream_ << strerror_tl(old_errno) << " (errno=" << old_errno << ") ";
}
stream_.append(basename_.data(), basename_.size());
stream_ << ':' << line_ << " - ";
}

void formatTime();

Timestamp time_;
LogStream stream_;
LogLevel level_;
int line_;
SourceFile basename_;
};

可以看到实现类持有了一个 LogStream 成员,用来保存日志具体内容,在 Logger 构造时会构造 Impl 对象,在其构造函数中会将当前的时间、线程 ID、日志等级以及打印日志所在文件名以及行号格式化输出至 stream_ 中。

Logger 类的析构函数中,在 stream_ 中加入换行符 \n 并调用 g_output 输出函数进行日志的输出:

1
2
3
4
5
6
7
8
9
Logger::~Logger() {
stream() << '\n';
const LogStream::Buffer & buffer = impl->stream_.buffer();
g_output(buffer.data(), buffer.length());
if (impl->level_ == FATAL) {
g_flush();
abort();
}
}

默认是向控制台输出和刷新:

1
2
3
4
5
6
Logger::OutputFunc Logger::g_output = [](const char* msg, int len) {
fwrite(msg, 1, len, stdout); // ok, thread safe
};
Logger::FlushFunc Logger::g_flush = []() {
fflush(stdout);
};

最巧妙的地方是定义一系列的宏来自动的完成上述日志生成、输出:

1
2
3
4
5
6
7
8
9
#define LOG_TRACE if(CppNet::Logger::logLevel() <= CppNet::Logger::TRACE) \
CppNet::Logger(__FILE__, __LINE__, CppNet::Logger::TRACE).stream()
#define LOG_DEBUG if(CppNet::Logger::logLevel() <= CppNet::Logger::DEBUG) \
CppNet::Logger(__FILE__, __LINE__, CppNet::Logger::DEBUG).stream()
#define LOG_INFO if(CppNet::Logger::logLevel() <= CppNet::Logger::INFO) \
CppNet::Logger(__FILE__, __LINE__).stream()
#define LOG_WARN CppNet::Logger(__FILE__, __LINE__, CppNet::Logger::WARN).stream()
#define LOG_ERROR CppNet::Logger(__FILE__, __LINE__, CppNet::Logger::ERROR).stream()
#define LOG_FATAL CppNet::Logger(__FILE__, __LINE__, CppNet::Logger::FATAL).stream()

用户使用方法:

1
LOG_DEBUG << "hello" << Fmt("%lf", 3.4);

这条表达式生成的 Logger 对象实例会在当前行构造并析构,因此日志生成并输出的时序是:

  1. 调用 Logger 类的构造函数从而调用其内部实现类 Impl 的构造函数,将时间、线程 ID、日志等级等输入到 stream_ 中。
  2. 执行用户的 << 操作,将用户自己的信息输入到 stream_ 中。
  3. 调用 Logger 类的析构函数输出一个 \nstream_ 中,并调用 g_output 函数进行日志的输出。

以上就是日志前端的所有内容,默认的日志输出地是控制台。

日志后端

日志后端有自己的缓冲区,使用的是双缓冲区,实际实现上使用了 4 个缓冲区。缓冲区使用FixedBuffer<kLargeBuffer> 对象(大小为4000*1000字节),缓冲区使用std::unique_ptr管理,通过移动语义来实现缓冲区之间的交换,避免了缓冲区内容的复制成本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class AsyncLogging : noncopyable{
public:
AsyncLogging(std::string base, size_t rollSize, int flushInterval=3);
~AsyncLogging() {
if (running_) {
stop();
}
}

void append(const char* logLine, int len); // 日志前端(生产者)调用该函数写入
void start() {
running_ = true;
thread_ = std::make_unique<std::thread>(&AsyncLogging::theadFunc, this);
}
void stop() {
if (running_) {
running_ = false;
cond_.notify_one();
thread_->join();
}
}

private:
void theadFunc(); // 消费缓冲区中的日志

typedef detail::FixedBuffer<detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr;

const int flushInterval_;
bool running_;
std::string basename_;
size_t rollSize_;

std::unique_ptr<std::thread> thread_;
std::mutex mutex_;
std::condition_variable cond_;

BufferPtr currentBuffer_; // 前端使用的第一个缓冲区
BufferPtr nextBuffer_; // 如果日志写入速度很快,短时间写满第一个缓冲区,则使用第二个缓冲区
BufferVector buffers_; // 存储已经写满的缓冲区
};

首先看前端(生产者)调用的写日志函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void AsyncLogging::append(const char *logLine, int len) {
std::lock_guard<std::mutex> lockGuard(mutex_);
// 当前缓冲区空间足够
if (currentBuffer_->avail() > len) {
currentBuffer_->append(logLine, len);
} else {
buffers_.emplace_back(currentBuffer_.release());
if (nextBuffer_) {
// 使用另一个缓冲区
currentBuffer_ = std::move(nextBuffer_); // now nextBuffer_ == nullptr
} else {
// 两个缓冲区都满了,重新分配一个缓冲区,很少发生
currentBuffer_ = std::make_unique<Buffer>();
}
currentBuffer_->append(logLine, len);
cond_.notify_one();
}
}

可能同时多个线程向日志后端提交日志,因此 append 函数加锁,其逻辑如下:首先写 currentBuffer_ 缓冲区,写满之后使用 nextBuffer_ 缓冲区,两个都写满时再重新分配新的缓冲区。所有写满的缓冲区都移入buffers_ 中保存,等待消费者写入磁盘。当写满一个缓冲区后就立即唤醒条件变量,从而让消费者线程将缓冲区写入磁盘。

日志后端的线程函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void AsyncLogging::theadFunc() {
assert(running_ == true);
LogFile output(basename_, rollSize_);
BufferPtr newBuffer1 = std::make_unique<Buffer>();
BufferPtr newBuffer2 = std::make_unique<Buffer>();
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite;
buffersToWrite.reserve(16);

while (running_) {
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());

{
std::unique_lock<std::mutex> uniqueLock(mutex_);
if (buffers_.empty()) {
// 如果后端 buffers 没有数据,等待 3 s
cond_.wait_for(uniqueLock, std::chrono::seconds(flushInterval_));
}
buffers_.emplace_back(currentBuffer_.release());
currentBuffer_ = std::move(newBuffer1);
buffersToWrite.swap(buffers_);
if (!nextBuffer_) {
nextBuffer_ = std::move(newBuffer2); // 此时 buffers_size() >= 2
}
}

for (const auto& b : buffersToWrite) {
output.append(b->data(), b->length());
}
if (buffersToWrite.size() > 2) {
buffersToWrite.resize(2);
}
if (!newBuffer1) {
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset(); // reset newBuffer1
}
if (!newBuffer2) {
// 说明 newBuffer2 给了 nextBuffer_, buffersToWrite 必然还有 1 个 buffer
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
}
output.flush();
}

其中 LogFile 是对写入磁盘文件的一个封装,它能在文件大小超过 rollSize_ 后自动写一个新的文件,这样防止单个日志文件过大。调用其 append 函数 flush 函数能够写入磁盘和刷新磁盘缓冲区,其详细设计可以参考源码。

线程函数每次在条件变量上休眠,唤醒条件有两个:(1) 生产者唤醒;(2) 超时。超时时间为 3 s,因此最长每 3 s就会将缓冲区的日志写入磁盘一次。这里注意临界区里将当前在写的缓冲区加入了 buffers_swapbuffersToWritebuffers_,而不是将整个写入磁盘的操作也放进临界区里,提高效率。

之后调用output.append()来将日志写入磁盘,并调用output.flush()来刷新 IO 的内核缓冲区,确保日志写入磁盘中。

总结

本篇博客介绍了 Muduo 库中的日志模块设计,是一个典型的异步日志,上面代码均出自我重写的版本中:https://github.com/xiaoqieF/CppNet.git

参考文献

  1. https://github.com/xiaoqieF/CppNet.git
  2. https://github.com/chenshuo/muduo.git
  3. 《C++ 服务器开发精髓》