开源代码阅读:rest_rpc

rest_rpc

rest_rpc是一个C++ 编写的RPC库,选择这个开源库作为学习目标的原因是其体量比较小,便于学习。学习过后,也重写了一个自己的版本:https://github.com/xiaoqieF/trpc,抽取了原库中最重要的部分进行了实现,只是作为一个学习用的Demo,通过C++ 17标准将内部一些元编程进行了简化。

在这里我不会介绍RPC的原理,其实是很简单的一件事情。我认为作为一个RPC库最重要的是做好网络通信以及消息序列化这两件事情,rest_rpc库利用经典的ASIO库作为网络通信框架,使用msgpack库作为序列化工具。

函数和参数的序列化

首先是使用函数名称作为调用标识,服务端注册函数:

1
server.register_handler("hello", hello);

客户端调用:

1
client.call<ReturnType>("hello", 1, 2);

通过对函数名字符串计算MD5得到func_id作为函数调用标识,因此服务端需要建立func_id到函数之间的映射,客户端调用时需要传递该字段。此外调用函数还需要将函数参数进行序列化然后传递。由于不同函数参数类型和数量均不同,因此将所有函数参数打包成std::tuple然后通过msgpack库进行序列化。例如对于void f1(int, std::string),将传递msgpack对std::tuple<int, std::string>进行打包后的结果。服务端和客户端之间通信消息包格式可以定义如下:

1
2
3
4
5
6
7
8
9
10
struct RpcHeader {
uint64_t request_id;
uint32_t body_len;
uint32_t function_id;
};

struct RpcMsg {
RpcHeader header;
std::string content;
};

这里只是写一个Demo,因此删除了原库头部的magic字段,从而使得RpcHeader结构体天然对齐并占据16字节长度,body_len字段值即为后续content的长度。

两个通信方向均采用上述消息包格式。对于客户端发往服务端的包而言,content保存了调用函数参数的序列化结果;对于服务端发往客户端的包而言,content保存了函数调用结果的序列化结果(实际上是std::tuple<int, Result>),其中第一个字段为函数调用是否成功的标志,后续字段为结果。

如何注册和调用RPC函数

通过router.hpp中的class Router记录所有注册的函数:

1
2
3
4
5
6
7
private:
using RpcFunc = std::function<std::string(std::string_view)>;
using FuncMap = std::unordered_map<uint32_t, RpcFunc>;
using FuncNameMap = std::unordered_map<uint32_t, std::string>;

FuncMap func_map_;
FuncNameMap func_name_map_;

通过哈希表记录已经注册的函数,其中哈希表的key是对函数名称计算MD5得来的。其中保存的函数RpcFunc的参数和结果都是字符串,也就是通过msgpack序列化后的字符串。通过router.register_handler进行函数注册:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template <typename F>
void register_handler(const std::string& name, F f) {
uint32_t key = MD5::MD5Hash32(name.data());
func_name_map_.emplace(key, name);
func_map_[key] = [f](std::string_view str) -> std::string {
using args_tuple = typename FunctionTraits<F>::bare_params_type;
msgpack::object_handle handle; // 该对象保存了 unpack 的结果,销毁时会使结果销毁
try {
auto params = msgpack_codec::unpack<args_tuple>(handle, str.data(), str.size());
return call(f, std::move(params));
} catch (const std::exception& e) {
return msgpack_codec::pack_args_to_str(FuncResultCode::FAIL, e.what());
}
};
}

注册的函数是一个lambda表达式,对实际注册的函数f进行了包装,在内部将序列化的参数反序列化得到std::tuple类型,并通过call函数将该每个元素提取出来用于调用f

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template <typename F, typename... Args>
static std::string call(const F& f, std::tuple<Args...> tp) {
return call_helper(f, std::make_index_sequence<sizeof...(Args)>{}, std::move(tp));
}

template <typename F, size_t... I, typename... Args>
static std::string call_helper(const F& f,
const std::index_sequence<I...>&,
std::tuple<Args...> tp) {
if constexpr (std::is_same_v<std::result_of_t<F(Args...)>, void>) {
f(std::move(std::get<I>(tp))...);
return msgpack_codec::pack_args_to_str(FuncResultCode::OK);
} else {
auto res = f(std::move(std::get<I>(tp))...);
return msgpack_codec::pack_args_to_str(FuncResultCode::OK, res);
}
}

这里再交给call_helper函数进行处理是为了将std::tuple中的值拆解开来,里面使用std::make_index_sequence和转发一次是较为惯用的手法,实际上在C++17标准中通过std::apply函数已经实现了上述过程。此外,通过FunctionTraits<F>可以得到函数参数的类型,从而通过std::tuple构建参数列表。这里主要还是讲逻辑,关于元编程细节后面可以较为细致的聊聊。

调用Rpc函数就比较简单了,就是在上述哈希表中找到函数调用并返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 根据 key 找到相应函数并调用
std::string route(uint32_t key, std::string_view args) {
std::string result;
auto it = func_map_.find(key);
if (it == func_map_.end()) {
result = msgpack_codec::pack_args_to_str(FuncResultCode::FAIL, "unknown function");
} else {
result = it->second(args);
}
if (result.size() > UINT32_MAX) {
result = msgpack_codec::pack_args_to_str(FuncResultCode::FAIL, "result too long");
}
return result;
}

网络通信

网络通信部分通过ASIO库来实现,其实是较为典型的服务器和客户端实现了,服务端主要在connection.hpprpc_server.hpp中实现。其中Connection类维护单个TCP连接,读取客户端传来的消息包头部和消息体并通过上述Router类找到RPC函数调用并将结果写入客户端。

RpcServer类负责接受客户端的连接并分配新的Connection对其进行服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void do_accept() {
conn_.reset(
new Connection(&(io_service_pool_.next_io_service()), &router_, timeout_seconds_));
acceptor_.async_accept(conn_->get_socket(), [this](asio::error_code ec) {
CLOG_TRACE("one client come.");
if (!acceptor_.is_open()) {
return;
}
if (ec) {
CLOG_WARN("{}: {}", ec.value(), ec.message());
} else {
conn_->set_conn_id(conn_id_);
conn_->start(); // 开启连接的服务
std::lock_guard lock(conn_mutex_);
connections_.emplace(conn_id_++, conn_);
CLOG_TRACE("establish connection, id:{}.", conn_id_ - 1);
}
do_accept();
});
}

此外还有对已关闭连接的定期清理以及对信号的处理等,由于实现均比较典型,细节见代码。

FunctionTrait

最后补充一下FunctionTrait的实现,对于上面的register_handler函数:

1
2
template <typename F>
void register_handler(const std::string& name, F f);

对于用户传入的可调用类型F,需要通过元编程获取其中相关信息,在上面是获取它的参数类型列表,其实定义比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
template <typename T>
struct FunctionTraits;

template <typename Ret, typename... Args>
struct FunctionTraits<Ret(Args...)> {
using function_type = Ret(Args...);
using return_type = Ret;
using function_pointer = Ret (*)(Args...);

using params_type = std::tuple<Args...>;
using bare_params_type = std::tuple<std::decay_t<Args>...>;
static constexpr size_t args_num = sizeof...(Args);
};

首先定义对于普通函数的相应类型提取,然后对不同类型的调用对象进行特化,并使其继承上述实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 对函数指针特化
template <typename Ret, typename... Args>
struct FunctionTraits<Ret (*)(Args...)> : FunctionTraits<Ret(Args...)> {};

// 对std::function特化
template <typename Ret, typename... Args>
struct FunctionTraits<std::function<Ret(Args...)>> : FunctionTraits<Ret(Args...)> {};

// 对成员函数特化
template <typename Ret, typename Class, typename... Args>
struct FunctionTraits<Ret (Class::*)(Args...)> : FunctionTraits<Ret(Args...)> {};

// 对成员函数特化
template <typename Ret, typename Class, typename... Args>
struct FunctionTraits<Ret (Class::*)(Args...) const> : FunctionTraits<Ret(Args...)> {};

// 对重载了函数运算符的对象特化
template <typename Callable>
struct FunctionTraits : FunctionTraits<decltype(&Callable::operator())> {};