假設用戶編寫了以下Thrift文件:
struct LogInfo {
1: required string name,
2: optional string content,
}
service LogSender {
void SendLog(1:list<LogInfo> loglist);
}
用戶使用命令“thrift –gen cpp example.thrift”可生成C++代碼,該代碼包含以下文件:
example_constants.h
example_constants.cpp
example_types.h //struct定義
example_types.cpp //struct實現
LogSender.h //service定義
LogSender.cpp //service實現和LogSenderClient實現
LogSender_server.skeleton.cpp //一個實例RPC Server
用戶可以這樣編寫Client:
shared_ptr socket(new TSocket(“8.8.8.8″, 9090));
shared_ptr transport(new TBufferedTransport(socket));
shared_ptr PRotocol(new TBinaryProtocol(transport));
LogSenderClient client(protocol);
try {
transport->open();
vector<LogInfo> logInfos;
LogInfo logInfo(“image”, “10:9:0 visit:xxxxxx”);
logInfos.push_back(logInfo);
…..
client.SendLog(logInfos);
transport->close();
} catch (TException &tx) {
printf(“ERROR: %s/n”, tx.what());
}
為了深入分析這段代碼,我們看一下client.SendLog()函數的內部實現(在LogSender.cpp中):
void LogSenderClient::SendLog(const std::vector<LogInfo> & loglist)
{
send_SendLog(loglist);
recv_SendLog();
}
void LogSenderClient::send_SendLog(const std::vector<LogInfo> & loglist)
{
int32_t cseqid = 0;
oprot_->writeMessageBegin(“SendLog”, ::apache::thrift::protocol::T_CALL, cseqid);
LogSender_SendLog_pargs args;
args.loglist = &loglist;
args.write(oprot_);
oprot_->writeMessageEnd();
oprot_->getTransport()->flush();
oprot_->getTransport()->writeEnd();
}
void LogSenderClient::recv_SendLog()
{
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
iprot_->readMessageBegin(fname, mtype, rseqid);
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
…..
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
……
}
if (fname.compare(“SendLog”) != 0) {
……
}
LogSender_SendLog_presult result;
result.read(iprot_);
iprot_->readMessageEnd();
iprot_->getTransport()->readEnd();
return;
}
閱讀上面的代碼,可以看出,RPC函數SendLog()實際上被轉化成了兩個函數:send_SendLog和recv_SendLog,分別用于發送數據和接收結果。數據是以消息的形式表示的,消息頭部是RPC函數名,消息內容是RPC函數的參數。
我們再進一步分析RPC Server端,一個server的編寫方法(在LogSender.cpp中)如下:
shared_ptr protocolFactory(new TBinaryProtocolFactory());
shared_ptr handler(new LogSenderHandler());
shared_ptr processor(new LogSenderProcessor(handler));
shared_ptr serverTransport(new TServerSocket(9090));
shared_ptr transportFactory(new TBufferedTransportFactory());
TSimpleServer server(processor,
serverTransport,
transportFactory,
protocolFactory);
printf(“Starting the server…/n”);
server.serve();
Server端最重要的類是LogSenderProcessor,它內部有一個映射關系processMap_,保存了所有RPC函數名到函數實現句柄的映射,對于LogSender而言,它只保存了一個RPC映射關系:
processMap_[" SendLog"] = &LogSenderProcessor::process_SendLog;
其中,process_SendLog是一個函數指針,它的實現如下:
void LogSenderProcessor::process_SendLog(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)
{
LogSender_SendLog_args args;
args.read(iprot);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
LogSender_SendLog_result result;
try {
iface_->SendLog(args.loglist);//調用用戶編寫的函數
} catch (const std::exception& e) {
……
}
oprot->writeMessageBegin(“SendLog”, ::apache::thrift::protocol::T_REPLY, seqid);
result.write(oprot);
oprot->writeMessageEnd();
oprot->getTransport()->flush();
oprot->getTransport()->writeEnd();
}
LogSenderProcessor中一個最重要的函數是process(),它是服務器的主體函數,服務器端(socket server)監聽到客戶端有請求到達后,會檢查消息類型,并檢查processMap_映射,找到對應的消息處理函數,并調用之(注意,這個地方可以采用各種并發模型,比如one-request-one-thread,thread pool等)。
通過上面的分析可以看出,Thrift最重要的組件是編譯器(采用C++編寫),它為用戶生成了網絡通信相關的代碼,從而大大減少了用戶的編碼工作。
新聞熱點
疑難解答