@TOC
一.介绍
RPC(Remote Procedure Call)远程过程调⽤,简单来说就是客⼾端在不知道调⽤细节的情况下,调⽤远程计算机上的某个功能就像调⽤本地功能⼀样,其主要⽬标就是让构建分布式计算(应⽤)更容易,在提供强⼤的远程调⽤能⼒时不损失本地调⽤的语义简洁性。
二.安装
先安装依赖:
dev@dev-host:~/workspace$ sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev再进行brpc的安装:
xxxxxxxxxxdev@dev-host:~/workspace$ git clone https://github.com/apache/brpc.gitdev@dev-host:~/workspace$ cd brpc/dev@dev-host:~/workspace/brpc$ mkdir build && cd builddev@dev-host:~/workspace/brpc/build$ cmake -DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6dev@dev-host:~/workspace/brpc/build$ make && sudo make install三.使用到的类与接口介绍
3.1日志相关
因为brpc自带有日志输出模块,与websocket一样,无法更换除非改库内源码中所有的⽇志输出操作后重新编译库进⾏安装。我们这里主要是要禁用掉不需要的⽇志输出,避免大量日志输出占视线:
xxxxxxxxxxnamespace logging {enum LoggingDestination { LOG_TO_NONE = 0};struct BUTIL_EXPORT LoggingSettings {LoggingSettings(); LoggingDestination logging_dest;};bool InitLogging(const LoggingSettings& settings);}3.2protobuf相关类与接口
xxxxxxxxxxnamespace google {namespace protobuf { class PROTOBUF_EXPORT Closure { public: Closure() {} virtual ~Closure(); virtual void Run() = 0; }; inline Closure* NewCallback(void (*function)()); class PROTOBUF_EXPORT RpcController { bool Failed(); std::string ErrorText() ; }; }}3.3服务端使用到的接口
xxxxxxxxxxnamespace brpc {struct ServerOptions { //⽆数据传输,则指定时间后关闭连接 int idle_timeout_sec; // Default: -1 (disabled) int num_threads; // Default: #cpu-cores//....};enum ServiceOwnership { //添加服务失败时,服务器将负责删除服务对象 SERVER_OWNS_SERVICE, //添加服务失败时,服务器也不会删除服务对象 SERVER_DOESNT_OWN_SERVICE};class Server { int AddService(google::protobuf::Service* service,ServiceOwnership ownership); int Start(int port, const ServerOptions* opt); int Stop(int closewait_ms/*not used anymore*/); int Join(); //休眠直到ctrl+c按下,或者stop和join服务器 void RunUntilAskedToQuit();};class ClosureGuard { explicit ClosureGuard(google::protobuf::Closure* done); ~ClosureGuard() { if (_done) _done->Run(); }};} 3.4http相关类与接口
xclass URI { typedef butil::FlatMap<std::string, std::string> QueryMap; typedef QueryMap::const_iterator QueryIterator; int SetHttpURL(const std::string& url);// Returns 0 on success, -1 otherwise void set_path(const std::string& path) void set_host(const std::string& host) void set_port(int port) void SetHostAndPort(const std::string& host_and_optional_port); size_t RemoveQuery(const char* key);// Returns 1 on removed, 0 otherwise const std::string& host() const { return _host; } int port() const std::string& path() const std::string& user_info() const std::string& query() const; const std::string* GetQuery(const std::string& key) void SetQuery(const std::string& key, const std::string& value); QueryIterator QueryBegin() QueryIterator QueryEnd() size_t QueryCount()};
enum HttpMethod { HTTP_METHOD_DELETE = 0, HTTP_METHOD_GET = 1, HTTP_METHOD_HEAD = 2, HTTP_METHOD_POST = 3, HTTP_METHOD_PUT = 4,};
const char *HttpMethod2Str(HttpMethod http_method);bool Str2HttpMethod(const char* method_str, HttpMethod* method);static const int HTTP_STATUS_OK = 200;static const int HTTP_STATUS_BAD_REQUEST = 400;static const int HTTP_STATUS_UNAUTHORIZED = 401;static const int HTTP_STATUS_FORBIDDEN = 403;static const int HTTP_STATUS_NOT_FOUND = 404;static const int HTTP_STATUS_METHOD_NOT_ALLOWED = 405;static const int HTTP_STATUS_INTERNAL_SERVER_ERROR = 500;
class HttpHeader { const std::string& content_type() void set_content_type(const std::string& type) const std::string* GetHeader(const std::string& key) void SetHeader(const std::string& key, const std::string& value); const URI& uri() const { return _uri; } HttpMethod method() const { return _method; } void set_method(const HttpMethod method) int status_code() void set_status_code(int status_code);};
class Controller : public google::protobuf::RpcController { void set_timeout_ms(int64_t timeout_ms); void set_max_retry(int max_retry); void Reset(); google::protobuf::Message* response(); HttpHeader& http_response(); butil::IOBuf& response_attachment(); HttpHeader& http_request(); butil::IOBuf& request_attachment(); bool Failed(); std::string ErrorText(); using AfterRpcRespFnType = std::function<void(Controller* cntl,const google::protobuf::Message* req,const google::protobuf::Message* res)>; void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn);};
namespace butil {class IOBuf { //Returns 0 on success, -1 otherwise. int append(const std::string& s); // Convert all data in this buffer to a std::string. std::string to_string() const; void clear(); bool empty() const; size_t length() const;};3.5客户端相关接口
xxxxxxxxxxnamespace brpc {enum ProtocolType : int { PROTOCOL_UNKNOWN = 0, PROTOCOL_BAIDU_STD = 1, PROTOCOL_STREAMING_RPC = 2, PROTOCOL_HULU_PBRPC = 3, PROTOCOL_SOFA_PBRPC = 4, PROTOCOL_RTMP = 5, PROTOCOL_THRIFT = 6, PROTOCOL_HTTP = 7, PROTOCOL_PUBLIC_PBRPC = 8, PROTOCOL_NOVA_PBRPC = 9, PROTOCOL_REDIS = 10, PROTOCOL_NSHEAD_CLIENT = 11, PROTOCOL_NSHEAD = 12, PROTOCOL_HADOOP_RPC = 13, PROTOCOL_HADOOP_SERVER_RPC = 14, PROTOCOL_MONGO = 15, PROTOCOL_UBRPC_COMPACK = 16, PROTOCOL_DIDX_CLIENT = 17, PROTOCOL_MEMCACHE = 18, PROTOCOL_ITP = 19, PROTOCOL_NSHEAD_MCPACK = 20, PROTOCOL_DISP_IDL = 21, PROTOCOL_ERSDA_CLIENT = 22, PROTOCOL_UBRPC_MCPACK2 = 23, PROTOCOL_CDS_AGENT = 24, PROTOCOL_ESP = 25, PROTOCOL_H2 = 26};struct ChannelOptions { //请求连接超时时间 int32_t connect_timeout_ms;// Default: 200 (milliseconds) //rpc请求超时时间 int32_t timeout_ms;// Default: 500 (milliseconds) //最⼤重试次数 int max_retry;// Default: 3 //序列化协议类型 options.protocol = "baidu_std"; AdaptiveProtocolType protocol; //....};class Channel : public ChannelBase { //初始化接⼝,成功返回0; int Init(const char* server_addr_and_port, //192.168.xx.xx:9000 const ChannelOptions* options); void CallMethod(const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController* controller, const google::protobuf::Message* request,google::protobuf::Message* response,google::protobuf::Closure* done);};inline ::google::protobuf::Closure* NewCallback(void (*function)());3.6定时任务相关
需要包含如下头文件: bthread/unstable.h , butil/time.h
xxxxxxxxxx// Return 0 on success, errno otherwise.int bthread_timer_add(bthread_timer_t* id,struct timespec abstime,void (*on_timer)(void*),void* arg);// Returns: 0 - exist & not-run;// 1 - still running;// EINVAL - not exist.int bthread_timer_del(bthread_timer_t id);namespace butil { timespec microseconds_from_now(int64_t milliseconds); timespec seconds_from_now(int64_t seconds);}四.使用样例
4.0编写proto文件
xxxxxxxxxxsyntax = "proto3";//声明语法版本package cal;//定义包名option cc_generic_services = true;//是否启用rpc服务
message AddRequest {//定义请求消息 int32 a = 1;//第一个参数 int32 b = 2;//第二个参数}
message AddResponse {//定义响应消息 int32 c = 1;//结果}
//这是一个http请求,不需要任何字段message HelloRequest {}
//这是一个http响应,不需要任何字段message HelloResponse {
}
service Calculator {//定义服务 rpc Add(AddRequest) returns (AddResponse);//定义rpc方法 rpc Hello(HelloRequest) returns (HelloResponse);//定义另一个rpc方法}编译后会生成两个文件cal.pb.cc与cal.pb.h
4.1同步服务端
根据上图我们可以得到服务端创建的一个大致流程:
0.重写Calculator类中的服务函数,在我们的例子中应该是Add与Hello这两个业务处理函数
1.定义重写类对象
2.定义服务器配置对象ServerOptions(我们这里只把idle_timeout_sec连接超时时间设置为-1表示不超时即可)
3.创建brpc服务器对象
4.使用brpc::Server提供的AddService注册我们上面重写的服务类
5.启动服务器
6.等待服务器退出
案例代码如下:
xxxxxxxxxx
class CalculatorService : public cal::Calculator{public: CalculatorService(){}; ~CalculatorService(){}; void Add(google::protobuf::RpcController* controller, const cal::AddRequest* request, cal::AddResponse* response, google::protobuf::Closure* done) override{ //当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用 brpc::ClosureGuard done_guard(done); int result = request->a() + request->b(); response->set_c(result); }};
int main() { //1.定义计算服务 CalculatorService service; //2.定义服务器配置对象ServerOptions brpc::ServerOptions options; options.idle_timeout_sec = -1;//设置超时时间为-1,表示不超时 //3.创建服务器对象-如果是堆上new出来的,则AddService需要设置为brpc::SERVER_OWNS_SERVICE,让服务器对象负责释放service brpc::Server server; //4.注册服务 int ret = server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE);//因为service是栈对象,所以这里需要设置SERVER_DOESNT_OWN_SERVICE,server会自动释放service if(ret != 0){ std::cerr << "AddService failed" << std::endl; return -1; } //5.启动服务器 if (server.Start(9000, &options)!= 0) { std::cerr << "Start server failed" << std::endl; return -1; } //6.等待服务器退出 server.RunUntilAskedToQuit(); return 0;}记住业务处理函数比如Add的声明周期是哪里调用done->Run()哪里才算结束,如果不调用就会导致业务函数始终不结束业务处理。
4.2同步客户端
客户端的创建流程大致如下: 1.创建服务端与客户端通信的信道channel 2.使用stub对象通过channel发起rpc调用 3.打印结果 示例代码如下:
xxxxxxxxxx
//简单的同步远程rpc调用示例int main(){ //1.定义并设置channel的配置 brpc::ChannelOptions options; options.protocol = "baidu_std"; //使用baidu_std协议 //2.创建并初始化channel-channel可以理解为客⼾端到服务器的⼀条通信线路 brpc::Channel channel; channel.Init("192.168.30.128:9000", &options); //3.创建stub对象-用于发起rpc调用 cal::Calculator_Stub stub(&channel); //4.创建请求对象-用于设置rpc调用参数 cal::AddRequest request; request.set_a(10); request.set_b(20); //5.发起rpc调用-同步调用 cal::AddResponse response; brpc::Controller controller; stub.Add(&controller, &request, &response, NULL);//NULL表示同步调用 //检查调用是否成功 if(controller.Failed()){ std::cerr << "rpc远程调用失败: "<< controller.ErrorText() << std::endl; return -1; } //6.打印rpc调用的结果 std::cout << "a+b=" << response.c() << std::endl; return 0;}运行之后客户端会打印出a+b=30的字样,博主这里便不再演示了。
4.3异步客户端
我们上面使用stub.Add发起远端的Add请求时,最后一个参数填NULL表示同步调用,如果想要异步调用,就需要使用brpc为我们提供的NewCallback来创建一个Closure的done对象。然后将收到请求之后的逻辑写到该done的回调函数中:
xxxxxxxxxx
void callback(brpc::Controller* cntl,cal::AddRequest* request,cal::AddResponse* response) { std::unique_ptr<brpc::Controller> cntl_guard(cntl); std::unique_ptr<cal::AddRequest> req_guard(request); std::unique_ptr<cal::AddResponse> res_guard(response); if (cntl_guard->Failed()) { std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl; return; } //打印rpc调用的结果 std::cout << "a+b=" << response->c() << std::endl;}
//简单的异步远程rpc调用示例int main(){ //1.定义并设置channel的配置 brpc::ChannelOptions options; options.protocol = "baidu_std"; //使用baidu_std协议 //2.创建并初始化channel-channel可以理解为客⼾端到服务器的⼀条通信线路 brpc::Channel channel; channel.Init("192.168.30.128:9000", &options); //3.创建stub对象-用于发起rpc调用 cal::Calculator_Stub stub(&channel); //4.创建请求对象-用于设置rpc调用参数 cal::AddRequest* request = new cal::AddRequest();//需要new,否则会有生命周期问题 request->set_a(10); request->set_b(20); //5.发起rpc调用-异步调用 cal::AddResponse* response = new cal::AddResponse(); brpc::Controller* controller = new brpc::Controller(); //补充:设置Controller的timeout时间,默认是3秒 controller->set_timeout_ms(4000); //6.设置回调函数 auto done = brpc::NewCallback(callback, controller, request, response); stub.Add(controller, request, response, done);//设置回调函数表示异步rpc调用 std::cout << "rpc调用已发出,继续干其他事情..." << std::endl; //7.等待rpc调用结果-键盘按下回车键退出程序 getchar(); return 0;}需要注意的是,brpc::NewCallback 不允许直接传入一个需要多个参数的 lambda 表达式作为回调。因为brpc::NewCallback 的工作方式类似于 std::bind。你提供一个函数(如你的 callback 函数)和它需要的所有参数(controller, request, response)。NewCallback 会将这些东西“打包”起来。当 RPC 完成后,brpc 框架只会调用这个包的 Run() 方法,然后 Run() 方法内部再用你当初提供的参数去调用你的 callback 函数。 但是实际上我们这样子去调用NewCallback也是不行的,虽然官方给的例子是可行的,但不知道我这里为什么不行:
xxxxxxxxxx auto done = brpc::NewCallback([controller,request,response](){ std::unique_ptr<brpc::Controller> cntl_guard(controller); std::unique_ptr<cal::AddRequest> req_guard(request); std::unique_ptr<cal::AddResponse> res_guard(response); if (cntl_guard->Failed()) { std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl; return; } //打印rpc调用的结果 std::cout << "a+b=" << response->c() << std::endl; });所以我们自己构建一个方法来支持这种方式的lamda表达式传入:
xxxxxxxxxx //因为google::protobuf::Closure的回调函数不允许传入多个参数,所以需要定义一个Object类来包装回调函数和参数 google::protobuf::Closure* ClosureFactory::create(callback_t &&cb){ ClosureFactory::Object::ptr obj = std::make_shared<ClosureFactory::Object>(); obj->callback = std::move(cb); return google::protobuf::NewCallback(ClosureFactory::asyncCallback, obj); } //异步回调函数执行 void ClosureFactory::asyncCallback(const ClosureFactory::Object::ptr obj){ obj->callback(); }这其实是我们后面封装的方法,它放在命名空间limerpc中,这时我们这样子写就可以传入上面的一个lamda表达式了:
xxxxxxxxxx //设置回调函数 auto done = limerpc::ClosureFactory::create([controller,request,response](){ std::unique_ptr<brpc::Controller> cntl_guard(controller); std::unique_ptr<cal::AddRequest> req_guard(request); std::unique_ptr<cal::AddResponse> res_guard(response); if (cntl_guard->Failed()) { std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl; return; } //打印rpc调用的结果 std::cout << "a+b=" << response->c() << std::endl; });4.4异步服务端
这其实就很简单了,我们上面说过,业务函数的声明周期是由done->Run()决定的,所以我们可以将原来的业务处理放到一个线程中,在线程处理完业务时调用done->Run()即可完成异步处理的逻辑:
xxxxxxxxxx
//异步简易rpc服务端class CalculatorService : public cal::Calculator{public: CalculatorService(){}; ~CalculatorService(){}; void Add(google::protobuf::RpcController* controller, const cal::AddRequest* request, cal::AddResponse* response, google::protobuf::Closure* done) override{ //使用多线程进行异步处理 std::thread thr([=](){ //当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用 brpc::ClosureGuard done_guard(done); int result = request->a() + request->b(); response->set_c(result); //模拟业务处理时间 std::this_thread::sleep_for(std::chrono::seconds(3)); }); thr.detach(); }};
int main() { //1.定义计算服务 CalculatorService service; //2.定义服务器配置对象ServerOptions brpc::ServerOptions options; options.idle_timeout_sec = -1;//设置超时时间为-1,表示不超时 //3.创建服务器对象-如果是堆上new出来的,则AddService需要设置为brpc::SERVER_OWNS_SERVICE,让服务器对象负责释放service brpc::Server server; //4.注册服务 int ret = server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE);//因为service是栈对象,所以这里需要设置SERVER_DOESNT_OWN_SERVICE,server会自动释放service if(ret != 0){ std::cerr << "AddService failed" << std::endl; return -1; } //5.启动服务器 if (server.Start(9000, &options)!= 0) { std::cerr << "Start server failed" << std::endl; return -1; } //6.等待服务器退出 server.RunUntilAskedToQuit(); return 0;}4.5http服务端
我们上面说过,http请求和响应的相关信息都存储在google::protobuf::RpcController*中,所以我们服务端要想能够响应http请求,还是需要先重写对应的http请求处理函数,然后在该重写函数中实现我们的业务逻辑即可:
xxxxxxxxxx
class CalculatorService : public cal::Calculator{public: CalculatorService(){}; ~CalculatorService(){}; void Add(google::protobuf::RpcController* controller, const cal::AddRequest* request, cal::AddResponse* response, google::protobuf::Closure* done) override{ //当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用 brpc::ClosureGuard done_guard(done); int result = request->a() + request->b(); response->set_c(result); }
void Hello(google::protobuf::RpcController* controller, const cal::HelloRequest* request, cal::HelloResponse* response, google::protobuf::Closure* done) override{ brpc::ClosureGuard done_guard(done);//切记不要忘记done->run() brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); //打印请求的http信息 std::cout << "请求方法:" << cntl->http_request().method() << std::endl; std::cout << "请求uri:" << cntl->http_request().uri() << std::endl; std::cout << "请求body:" << cntl->request_attachment().to_string() << std::endl; //设置响应的http信息 cntl->http_response().set_content_type("text/plain"); cntl->http_response().set_status_code(200);//设置响应码-200表示成功 cntl->response_attachment().append("回显:" + cntl->request_attachment().to_string()); }};
int main() { //1.定义计算服务 CalculatorService service; //2.定义服务器配置对象ServerOptions brpc::ServerOptions options; options.idle_timeout_sec = -1;//设置超时时间为-1,表示不超时 //3.创建服务器对象-如果是堆上new出来的,则AddService需要设置为brpc::SERVER_OWNS_SERVICE,让服务器对象负责释放service brpc::Server server; //4.注册服务 int ret = server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE);//因为service是栈对象,所以这里需要设置SERVER_DOESNT_OWN_SERVICE,server会自动释放service if(ret != 0){ std::cerr << "AddService failed" << std::endl; return -1; } //5.启动服务器 if (server.Start(9000, &options)!= 0) { std::cerr << "Start server failed" << std::endl; return -1; } //6.等待服务器退出 server.RunUntilAskedToQuit(); return 0;}4.6http客户端
这里相较于上面的远程rpc请求有些不同,它不需要再通过protobuf提供的stub对象发起请求,而是直接通过brpc创建的channel信道中的CallMethod方法发起http请求。请求路由应该与我们在proto中定义的服务类名称一致。比如在我们的示例中如若想要访问服务端的Hello服务,路由应该是这样的:
xxxxxxxxxx/Calculator/Hello还有一个比较特殊的点,我们也可以通过http客户端发起rpc请求,我们示例中的rpc服务为Add,首先需要将路由设置为:
xxxxxxxxxx/Calculator/Add其次我们的请求正文格式必须是json格式,请求的json正文格式必须与Add在proto文件中定义的AddRequest格式相同:
xxxxxxxxxxmessage AddRequest {//定义请求消息 int32 a = 1;//第一个参数 int32 b = 2;//第二个参数}请求的json正文示例:
xxxxxxxxxx{"a":10,"b":20}这样便可以访问远端的Add服务,最后服务端会返回json格式的响应内容写到brpc::Controller的response_attachment()中。客户端示例代码如下:
xxxxxxxxxx
//基于brpc的简单http客户端示例int main(){ //1.定义并设置channel的配置 brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_HTTP; //使用http协议 //2.创建并初始化channel-channel可以理解为客⼾端到服务器的⼀条通信线路 brpc::Channel channel; channel.Init("192.168.30.128:9000", &options); //3.使用brpc::Controller来设置http请求的相关信息 brpc::Controller cntl; //3-1.设置http请求的url-需要和proto文件中定义的服务名称对应 cntl.http_request().uri().set_path("/Calculator/Add");//通过http访问rpc服务,正文内容必须为json格式 //cntl.http_request().uri().set_path("/Calculator/Hello");//通过http访问普通的http服务 //3-2.设置http请求的method-GET/POST/PUT/DELETE等 cntl.http_request().set_method(brpc::HTTP_METHOD_POST); //3-3.设置http请求的header-可选 cntl.http_request().SetHeader("Content-Type", "application/json"); //cntl.http_request().set_content_type("text/plain"); //3-4.设置http请求的body-可选 cntl.request_attachment().append(R"({"a":10,"b":20})");//json格式字符串 //cntl.request_attachment().append("hello world");//普通字符串 //4.直接使用channel发起http请求 channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);//NULL表示同步-此处会阻塞等待服务器的响应 if(cntl.Failed()){ std::cerr << "http请求失败: "<< cntl.ErrorText() << std::endl; return -1; } //5.打印http请求的结果 std::cout << "http请求成功,响应码=" << cntl.http_response().status_code() << std::endl; std::cout << "响应内容=" << cntl.response_attachment().to_string() << std::endl; return 0;}注释部分为普通http请求,非注释部分为http访问rpc服务的请求,读者可自行去掉或加上注释编译运行查看不同结果。 Makefile如下:
xxxxxxxxxx.PHONY: all cleanall: server client async_client async_server http_client http_server
server: server.cc cal.pb.cc g++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17client:client.cc cal.pb.cc g++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17async_client:async_client.cc cal.pb.cc g++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17async_server:async_server.cc cal.pb.cc g++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17http_client:http_client.cc cal.pb.cc g++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17http_server:http_server.cc cal.pb.cc g++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -std=c++17%.pb.cc: %.proto protoc --cpp_out=. $<clean: rm -f server client async_client async_server http_client http_server五.二次封装
因为brpc一般是使用在分布式系统上的。一个服务可能在不同的机器上都会提供,但是有的时候部分机器可能会挂掉,所以此时就需要将其移除,如果有新的服务器添加了此项服务,那么就需要将其管理起来,所以我们可以设计如下的类:
xxxxxxxxxx using ChannelPtr = std::shared_ptr<brpc::Channel>; // 服务信道管理类 class RpcChannels { public: using ptr = std::shared_ptr<RpcChannels>; RpcChannels(); // 获取服务信道 ChannelPtr get_channel(); // 增加服务信道 void add_channel(const std::string &addr); // 删除服务信道 void remove_channel(const std::string &addr);
private: std::mutex _mtx; // 互斥锁 uint32_t _idx; // 服务信道索引-轮询下标 std::vector<ChannelPtr> _channels; // 服务信道列表 std::unordered_map<std::string, ChannelPtr> _channels_map; // 服务信道映射表 };当然我们的服务不可能单单只有一个,所以我们要将不同服务的RpcChannels管理起来。而且有的时候并不是所有的服务器提供的所有的服务我们都需要关心,我们还需要管理哪些服务是需要关心的:
xxxxxxxxxx // 服务管理类 class SvcRpcChannels { public: using ptr = std::shared_ptr<SvcRpcChannels>; SvcRpcChannels() = default; // 设置服务关心 void set_match(const std::string &service_name); // 新增结点 void add_node(const std::string &service_name, const std::string &node_addr); // 删除结点 void remove_node(const std::string &service_name, const std::string &node_addr); // 获取服务信道 ChannelPtr get_channel(const std::string &service_name);
private: std::mutex _mtx; std::unordered_map<std::string, RpcChannels::ptr> _svc_channels_map; // 服务名称-服务信道管理映射表 };也就是说,根据上面的两个类我们可以知道,客户端访问远端rpc服务的流程在我们封装下变成了: 1.告诉SvcRpcChannels我客户端想要访问的服务 2.SvcRpcChannels根据客户端提供的消息,查找对应的RpcChannels 3.SvcRpcChannels找到了对应的RpcChannels,让RpcChannels给一个可用服务信道 4.RpcChannels通过RR轮询的方式,遍历自身的可用信道,保证返回效率的同时负载均衡。最后通过SvcRpcChannels的get_channel返回一个可用信道。 5.客户端收到可用服务信道,使用该信道发起http请求或远程rpc服务请求 这是客户端的相关类,当然我们再封装一个解决我们上面说的客户端异步调用无法直接传lamda表达式的工厂类:
xxxxxxxxxx // 异步回调⼯⼚类 class ClosureFactory { public: using callback_t = std::function<void()>; static google::protobuf::Closure *create(callback_t &&cb); private: struct Object { using ptr = std::shared_ptr<Object>; callback_t callback; }; static void asyncCallback(const Object::ptr obj); };最后我们再来封装一个服务端的工厂类:
xxxxxxxxxx // 服务器⼯⼚类 class RpcServer{ public: // 默认svc是堆上new出来的对象,将管理权移交给rpc服务器进⾏管理 static std::shared_ptr<brpc::Server> create(int port,google::protobuf::Service *svc); };完整封装如下:
xxxxxxxxxx//limerpc.h
namespace limerpc{ using ChannelPtr = std::shared_ptr<brpc::Channel>; // 服务信道管理类 class RpcChannels { public: using ptr = std::shared_ptr<RpcChannels>; RpcChannels(); // 获取服务信道 ChannelPtr get_channel(); // 增加服务信道 void add_channel(const std::string &addr); // 删除服务信道 void remove_channel(const std::string &addr);
private: std::mutex _mtx; // 互斥锁 uint32_t _idx; // 服务信道索引-轮询下标 std::vector<ChannelPtr> _channels; // 服务信道列表 std::unordered_map<std::string, ChannelPtr> _channels_map; // 服务信道映射表 }; // 服务管理类 class SvcRpcChannels { public: using ptr = std::shared_ptr<SvcRpcChannels>; SvcRpcChannels() = default; // 设置服务关心 void set_match(const std::string &service_name); // 新增结点 void add_node(const std::string &service_name, const std::string &node_addr); // 删除结点 void remove_node(const std::string &service_name, const std::string &node_addr); // 获取服务信道 ChannelPtr get_channel(const std::string &service_name);
private: std::mutex _mtx; std::unordered_map<std::string, RpcChannels::ptr> _svc_channels_map; // 服务名称-服务信道管理映射表 }; // 异步回调⼯⼚类 class ClosureFactory { public: using callback_t = std::function<void()>; static google::protobuf::Closure *create(callback_t &&cb); private: struct Object { using ptr = std::shared_ptr<Object>; callback_t callback; }; static void asyncCallback(const Object::ptr obj); }; // 服务器⼯⼚类 class RpcServer{ public: // 默认svc是堆上new出来的对象,将管理权移交给rpc服务器进⾏管理 static std::shared_ptr<brpc::Server> create(int port,google::protobuf::Service *svc); };} // namespace limerpcxxxxxxxxxx//limerpc.cc
namespace limerpc{ RpcChannels::RpcChannels():_idx(0){ } // 轮询获取服务信道 ChannelPtr RpcChannels::get_channel(){ std::unique_lock<std::mutex> lock(_mtx); size_t index = _idx % _channels.size(); _idx++; return _channels[index]; } // 增加服务信道 void RpcChannels::add_channel(const std::string &addr){ std::unique_lock<std::mutex> lock(_mtx); if(_channels_map.find(addr) != _channels_map.end()){ DBG("信道:{} 已存在,不再添加", addr); return; } //1.定义并设置channel的配置 brpc::ChannelOptions options; options.protocol = "baidu_std"; //使用baidu_std协议 //2.创建并初始化channel-channel可以理解为客⼾端到服务器的⼀条通信线路 ChannelPtr channel = std::make_shared<brpc::Channel>(); channel->Init(addr.c_str(), &options); //3.将channel添加到_channels中 _channels.push_back(channel); //4.将channel添加到_channels_map中 _channels_map[addr] = _channels.back(); } // 删除服务信道 void RpcChannels::remove_channel(const std::string &addr){ std::unique_lock<std::mutex> lock(_mtx); auto it = _channels_map.find(addr); //找不到直接返回 if(it == _channels_map.end()){ WRN("信道:{} 不存在,无法删除", addr); return; } //找到后从_channels中删除 auto item = std::find(_channels.begin(), _channels.end(), it->second); if (item != _channels.end()) { _channels.erase(item); } //从_channels_map中删除 _channels_map.erase(it); }
//设置服务关心 void SvcRpcChannels::set_match(const std::string &service_name){ std::unique_lock<std::mutex> lock(_mtx); _svc_channels_map[service_name] = std::make_shared<RpcChannels>(); } //新增结点 void SvcRpcChannels::add_node(const std::string &service_name, const std::string &node_addr){ //判断是否为关心的服务 auto it = _svc_channels_map.find(service_name); if(it == _svc_channels_map.end()){ DBG("服务:{} 未设置关注,忽略添加", service_name); return; } //增加服务信道 it->second->add_channel(node_addr); } //删除结点 void SvcRpcChannels::remove_node(const std::string &service_name, const std::string &node_addr){ //判断是否为关心的服务 auto it = _svc_channels_map.find(service_name); if(it == _svc_channels_map.end()){ DBG("服务:{} 未设置关注,忽略删除", service_name); return; } //删除服务信道 it->second->remove_channel(node_addr); } //获取服务信道 ChannelPtr SvcRpcChannels::get_channel(const std::string &service_name){ //判断是否为关心的服务 auto it = _svc_channels_map.find(service_name); if(it == _svc_channels_map.end()){ DBG("服务:{} 未设置关注,无法获取信道", service_name); return ChannelPtr(); } //获取服务信道 return it->second->get_channel(); }
//因为google::protobuf::Closure的回调函数不允许传入多个参数,所以需要定义一个Object类来包装回调函数和参数 google::protobuf::Closure* ClosureFactory::create(callback_t &&cb){ ClosureFactory::Object::ptr obj = std::make_shared<ClosureFactory::Object>(); obj->callback = std::move(cb); return google::protobuf::NewCallback(ClosureFactory::asyncCallback, obj); } //异步回调函数执行 void ClosureFactory::asyncCallback(const ClosureFactory::Object::ptr obj){ obj->callback(); }
//服务端创建 std::shared_ptr<brpc::Server> RpcServer::create(int port,google::protobuf::Service *svc){ //1.定义服务器配置对象ServerOptions brpc::ServerOptions options; options.idle_timeout_sec = -1;//设置超时时间为-1,表示不超时 //2.创建服务器对象 std::shared_ptr<brpc::Server> server = std::make_shared<brpc::Server>(); //3.注册服务 if (server->AddService(svc, brpc::SERVER_OWNS_SERVICE) != 0) { ERR("服务注册失败"); exit(-1);//直接退出 } //4.启动服务器 if (server->Start(port, &options) != 0) { ERR("服务启动失败"); exit(-1);//直接退出 } return server; }}一个简单的使用样例:
cal.proto:
xxxxxxxxxxsyntax = "proto3";//声明语法版本package cal;//定义包名option cc_generic_services = true;//是否启用rpc服务
message AddRequest {//定义请求消息 int32 a = 1;//第一个参数 int32 b = 2;//第二个参数}
message AddResponse {//定义响应消息 int32 c = 1;//结果}
//这是一个http请求,不需要任何字段message HelloRequest {}
//这是一个http响应,不需要任何字段message HelloResponse {
}
service Calculator {//定义服务 rpc Add(AddRequest) returns (AddResponse);//定义rpc方法 rpc Hello(HelloRequest) returns (HelloResponse);//定义另一个rpc方法}rpc_client.cc
xxxxxxxxxx
int main() { //初始化日志 limelog::limelog_init(); //创建服务管理类 limerpc::SvcRpcChannels svc_rpc_channels; //手动添加服务关心 svc_rpc_channels.set_match("Calculator"); //模拟服务发现 svc_rpc_channels.add_node("Calculator","192.168.30.128:9000"); //获取服务信道 auto channel = svc_rpc_channels.get_channel("Calculator"); //3.创建stub对象-用于发起rpc调用 cal::Calculator_Stub stub(channel.get()); //4.创建请求对象-用于设置rpc调用参数 cal::AddRequest* request = new cal::AddRequest();//需要new,否则会有生命周期问题 request->set_a(10); request->set_b(20); //5.发起rpc调用-异步调用 cal::AddResponse* response = new cal::AddResponse(); brpc::Controller* controller = new brpc::Controller(); //补充:设置Controller的timeout时间,默认是3秒 controller->set_timeout_ms(4000); //设置回调函数 auto done = limerpc::ClosureFactory::create([controller,request,response](){ std::unique_ptr<brpc::Controller> cntl_guard(controller); std::unique_ptr<cal::AddRequest> req_guard(request); std::unique_ptr<cal::AddResponse> res_guard(response); if (cntl_guard->Failed()) { std::cerr << "rpc远程调用失败: " << cntl_guard->ErrorText() << std::endl; return; } //打印rpc调用的结果 std::cout << "a+b=" << response->c() << std::endl; }); stub.Add(controller, request, response, done);//设置回调函数表示异步rpc调用 std::cout << "rpc调用已发出,继续干其他事情..." << std::endl; //等待rpc调用结果-键盘按下回车键退出程序 getchar(); return 0;}rpc_server.cc
xxxxxxxxxx
//异步简易rpc服务端class CalculatorService : public cal::Calculator{public: CalculatorService(){}; ~CalculatorService(){}; void Add(google::protobuf::RpcController* controller, const cal::AddRequest* request, cal::AddResponse* response, google::protobuf::Closure* done) override{ //使用多线程进行异步处理 std::thread thr([=](){ //当业务逻辑比较复杂时返回时机可能不同,所以这里需要使用done_guard来确保done在add函数执行完毕后自动被调用 brpc::ClosureGuard done_guard(done); int result = request->a() + request->b(); response->set_c(result); //模拟业务处理时间 std::this_thread::sleep_for(std::chrono::seconds(3)); }); thr.detach(); }};
int main(){ //定义计算服务 CalculatorService* service = new CalculatorService(); //通过服务器工厂类获取一个服务器实例 auto server = limerpc::RpcServer::create(9000, service); //等待服务器退出 server->RunUntilAskedToQuit(); return 0;}makefile:
xxxxxxxxxx.PHONY: all cleanall: server client
server: rpc_server.cc cal.pb.cc ../../source/limerpc.cc ../../source/limelog.cc g++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -lspdlog -lfmt -std=c++17client: rpc_client.cc cal.pb.cc ../../source/limerpc.cc ../../source/limelog.cc g++ $^ -o $@ -lprotobuf -lpthread -lbrpc -ldl -lleveldb -lssl -lcrypto -lgflags -lspdlog -lfmt -std=c++17%.pb.cc: %.proto protoc --cpp_out=. $<clean: rm -f server client

评论(已关闭)
评论已关闭