本文所述程序實例主要實現在后端應用服務器上實時獲取STORM集群的運行信息和topology相關的提交和控制。對此,通過對STORM UI和CMD源碼的分析,得出可以通過其thrift接口調用實現這些功能。先下載一個thrift庫進行編碼和安裝。關于thrift可以參見這個地方。安裝完成后,從STORM源碼中將storm.thrift拷貝到thrift目錄下。輸入:
hrift -gen cpp storm.thrift
會得到一個gen-cpp目錄,里面就是thrift先關腳本的C++實現。我們先看storm.thrift文件接口:
service Nimbus { //TOPOLOGY上傳接口 void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options); void killTopology(1: string name); void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e); void activate(1: string name) ; void deactivate(1: string name); void rebalance(1: string name, 2: RebalanceOptions options); //TOPOLOGY JAR包上傳接口 string beginFileUpload(); void uploadChunk(1: string location, 2: binary chunk); void finishFileUpload(1: string location); string beginFileDownload(1: string file); binary downloadChunk(1: string id); //獲取NIMBUS的配置信息 string getNimbusConf(); //獲取STORM集群運行信息 ClusterSummary getClusterInfo(); //獲取TOPOLOGY的運行狀態信息 TopologyInfo getTopologyInfo(1: string id); //獲取TOPOLOGY對象信息 string getTopologyConf(1: string id); StormTopology getTopology(1: string id); StormTopology getUserTopology(1: string id);}
生成C++文件后,我們就可以對其接口進行調用,由于thrift c++框架是使用boost庫實現的,必須安裝boost庫依賴。實現的代碼如下:
#define HAVE_NETDB_H //使用網絡模塊的宏必須打開#include "Nimbus.h"#include "storm_types.h"#include <string>#include <iostream>#include <set>#include <transport/TSocket.h> #include <transport/TBufferTransports.h> #include <protocol/TBinaryProtocol.h> int test_storm_thrift(){ boost::shared_ptr<TSocket> tsocket(new TSocket("storm-nimbus-server", 6627)); boost::shared_ptr<TTransport> ttransport(new TFramedTransport(tsocket, 1024 * 512)); //此處必須使用TFramedTransport boost::shared_ptr<TProtocol> tprotocol(new TBinaryProtocol(ttransport)); try{ //創建一個nimbus客戶端對象 NimbusClient client(tprotocol); //打開通道 ttransport->open(); ClusterSummary summ; std::string conf; //對STORM的RPC調用,直接獲取信息,同步進行的。 client.getNimbusConf(conf); client.getClusterInfo(summ); //關閉通道 ttransport->close(); }catch(TException &tx){ printf("InvalidOperation: %s", tx.what()); }}
以上代碼就可以直接獲取nimbus的配置和集群信息,其他接口以此類推。值得注意的是storm.thrift to C++生成的storm_types.h文件里其中operator < 函數都未實現,所以必須手動進行添加實現,否則編譯會有問題。
此外,不僅僅C++可以實現STORM的控制,PHP和其他的語言也可以實現,只要thrift支持就OK。感興趣的讀者可以自己實現一下試試看。
新聞熱點
疑難解答
圖片精選