rest_rpc
rest_rpc 是一个C++ 编写的RPC库,选择这个开源库作为学习目标的原因是其体量比较小,便于学习。学习过后,也重写了一个自己的版本:https://github.com/xiaoqieF/trpc ,抽取了原库中最重要的部分进行了实现,只是作为一个学习用的Demo,通过C++ 17标准将内部一些元编程进行了简化。
在这里我不会介绍RPC的原理,其实是很简单的一件事情。我认为作为一个RPC库最重要的是做好网络通信以及消息序列化这两件事情,rest_rpc库利用经典的ASIO库作为网络通信框架,使用msgpack库作为序列化工具。
函数和参数的序列化
首先是使用函数名称作为调用标识,服务端注册函数:
1 server.register_handler ("hello" , hello);
客户端调用:
1 client.call<ReturnType>("hello" , 1 , 2 );
通过对函数名字符串计算MD5得到func_id
作为函数调用标识,因此服务端需要建立func_id
到函数之间的映射,客户端调用时需要传递该字段。此外调用函数还需要将函数参数进行序列化然后传递。由于不同函数参数类型和数量均不同,因此将所有函数参数打包成std::tuple
然后通过msgpack库进行序列化。例如对于void f1(int, std::string)
,将传递msgpack对std::tuple<int, std::string>
进行打包后的结果。服务端和客户端之间通信消息包格式可以定义如下:
1 2 3 4 5 6 7 8 9 10 struct RpcHeader { uint64_t request_id; uint32_t body_len; uint32_t function_id; }; struct RpcMsg { RpcHeader header; std::string content; };
这里只是写一个Demo,因此删除了原库头部的magic字段,从而使得RpcHeader
结构体天然对齐并占据16字节长度,body_len
字段值即为后续content
的长度。
两个通信方向均采用上述消息包格式。对于客户端发往服务端的包而言,content
保存了调用函数参数的序列化结果;对于服务端发往客户端的包而言,content
保存了函数调用结果的序列化结果(实际上是std::tuple<int, Result>
),其中第一个字段为函数调用是否成功的标志,后续字段为结果。
如何注册和调用RPC函数
通过router.hpp
中的class Router
记录所有注册的函数:
1 2 3 4 5 6 7 private : using RpcFunc = std::function<std::string (std::string_view)>; using FuncMap = std::unordered_map<uint32_t , RpcFunc>; using FuncNameMap = std::unordered_map<uint32_t , std::string>; FuncMap func_map_; FuncNameMap func_name_map_;
通过哈希表记录已经注册的函数,其中哈希表的key是对函数名称计算MD5得来的。其中保存的函数RpcFunc
的参数和结果都是字符串,也就是通过msgpack序列化后的字符串。通过router.register_handler
进行函数注册:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 template <typename F>void register_handler (const std::string& name, F f) { uint32_t key = MD5::MD5Hash32 (name.data ()); func_name_map_.emplace (key, name); func_map_[key] = [f](std::string_view str) -> std::string { using args_tuple = typename FunctionTraits<F>::bare_params_type; msgpack::object_handle handle; try { auto params = msgpack_codec::unpack<args_tuple>(handle, str.data (), str.size ()); return call (f, std::move (params)); } catch (const std::exception& e) { return msgpack_codec::pack_args_to_str (FuncResultCode::FAIL, e.what ()); } }; }
注册的函数是一个lambda表达式,对实际注册的函数f
进行了包装,在内部将序列化的参数反序列化得到std::tuple
类型,并通过call
函数将该每个元素提取出来用于调用f
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 template <typename F, typename ... Args>static std::string call (const F& f, std::tuple<Args...> tp) { return call_helper (f, std::make_index_sequence<sizeof ...(Args)>{}, std::move (tp)); } template <typename F, size_t ... I, typename ... Args>static std::string call_helper (const F& f, const std::index_sequence<I...>&, std::tuple<Args...> tp) { if constexpr (std::is_same_v<std::result_of_t <F(Args...)>, void >) { f (std::move (std::get<I>(tp))...); return msgpack_codec::pack_args_to_str (FuncResultCode::OK); } else { auto res = f (std::move (std::get<I>(tp))...); return msgpack_codec::pack_args_to_str (FuncResultCode::OK, res); } }
这里再交给call_helper
函数进行处理是为了将std::tuple
中的值拆解开来,里面使用std::make_index_sequence
和转发一次是较为惯用的手法,实际上在C++17标准中通过std::apply
函数已经实现了上述过程。此外,通过FunctionTraits<F>
可以得到函数参数的类型,从而通过std::tuple
构建参数列表。这里主要还是讲逻辑,关于元编程细节后面可以较为细致的聊聊。
调用Rpc函数就比较简单了,就是在上述哈希表中找到函数调用并返回结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 std::string route (uint32_t key, std::string_view args) { std::string result; auto it = func_map_.find (key); if (it == func_map_.end ()) { result = msgpack_codec::pack_args_to_str (FuncResultCode::FAIL, "unknown function" ); } else { result = it->second (args); } if (result.size () > UINT32_MAX) { result = msgpack_codec::pack_args_to_str (FuncResultCode::FAIL, "result too long" ); } return result; }
网络通信
网络通信部分通过ASIO库来实现,其实是较为典型的服务器和客户端实现了,服务端主要在connection.hpp
和rpc_server.hpp
中实现。其中Connection
类维护单个TCP连接,读取客户端传来的消息包头部和消息体并通过上述Router
类找到RPC函数调用并将结果写入客户端。
RpcServer
类负责接受客户端的连接并分配新的Connection
对其进行服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void do_accept () { conn_.reset ( new Connection (&(io_service_pool_.next_io_service ()), &router_, timeout_seconds_)); acceptor_.async_accept (conn_->get_socket (), [this ](asio::error_code ec) { CLOG_TRACE ("one client come." ); if (!acceptor_.is_open ()) { return ; } if (ec) { CLOG_WARN ("{}: {}" , ec.value (), ec.message ()); } else { conn_->set_conn_id (conn_id_); conn_->start (); std::lock_guard lock (conn_mutex_); connections_.emplace (conn_id_++, conn_); CLOG_TRACE ("establish connection, id:{}." , conn_id_ - 1 ); } do_accept (); }); }
此外还有对已关闭连接的定期清理以及对信号的处理等,由于实现均比较典型,细节见代码。
FunctionTrait
最后补充一下FunctionTrait
的实现,对于上面的register_handler
函数:
1 2 template <typename F>void register_handler (const std::string& name, F f) ;
对于用户传入的可调用类型F
,需要通过元编程获取其中相关信息,在上面是获取它的参数类型列表,其实定义比较简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 template <typename T>struct FunctionTraits ;template <typename Ret, typename ... Args>struct FunctionTraits <Ret (Args...)> { using function_type = Ret (Args...); using return_type = Ret; using function_pointer = Ret (*)(Args...); using params_type = std::tuple<Args...>; using bare_params_type = std::tuple<std::decay_t <Args>...>; static constexpr size_t args_num = sizeof ...(Args); };
首先定义对于普通函数的相应类型提取,然后对不同类型的调用对象进行特化,并使其继承上述实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 template <typename Ret, typename ... Args>struct FunctionTraits <Ret (*)(Args...)> : FunctionTraits<Ret (Args...)> {};template <typename Ret, typename ... Args>struct FunctionTraits < std::function<Ret (Args...)>> : FunctionTraits<Ret (Args...)> {};template <typename Ret, typename Class, typename ... Args>struct FunctionTraits <Ret (Class::*)(Args...)> : FunctionTraits<Ret (Args...)> {};template <typename Ret, typename Class, typename ... Args>struct FunctionTraits <Ret (Class::*)(Args...) const > : FunctionTraits<Ret (Args...)> {};template <typename Callable>struct FunctionTraits : FunctionTraits<decltype (&Callable::operator ())> {};