Apollo是apache旗下的基金項目,它是以Apache ActiveMQ5.x為基礎,采用全新的線程和消息調度架構重新實現的消息中間件,針對多核處理器進行了優化處理,它的速度更快、更可靠、更易于維護。apollo與ActiveQQ一樣支持多協議:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介紹MQTT協議的使用。
關于ActiveMQ5請參考:http://activemq.apache.org,本文只介紹Apollo在windows下安裝和應用,Apollo的詳細文檔請參考官網:http://activemq.apache.org/apollo/documentation/user-manual.html.
進入http://activemq.apache.org/apollo/download.html,下載windows版本的壓縮包,并解壓到自己工作目錄(如:E:/apache-apollo-1.7),并創建環境變量APOLLO_HOME=E:/apache-apollo-1.7。如果操作是系統是Windows Vista或更高版本,則需要安裝Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。
進入E:/apache-apollo-1.7之下的bin目錄,打開cmd窗口,執行命令:apollo create E:/apollo_broker,命令執行成功后,在E盤下會有apollo_broker目錄,在其下有個bin目錄,其中有兩個文件:apollo-broker.cmd和apollo-broker-service.exe,第一個是通過cmd命令啟動apollo服務的,第二個是創建window服務的。
cmd命令啟動:apollo-broker run,啟動成功可以在瀏覽器中查看運行情況(http://127.0.0.1:61680/,默認用戶名/密碼:admin/passWord);
windows服務啟動:執行apollo-broker-service.exe,創建windows服務,就可以以windows服務的方式啟動apollo服務。
MQTT協議有眾多客戶端實現,相關請參考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。
本文采用eclipse的paho客戶端實現(https://eclipse.org/paho/)。a.javascript客戶端:https://eclipse.org/paho/clients/js/
將Javascript客戶端項目下載下來,并在其項目根目錄下執行mvn命令,進行編譯,生成target目錄,其下生成mqttws31.js、mqttws31-min.js兩個js文件,將其拷貝到自己項目相關目錄下,并在頁面中引用,即可實現javascript客戶端的消息訂閱和發布,demo代碼如下:
var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId");
// 61623是ws連接的默認端口,可以在apollo中間件中進行配置(關于apollo的配置請參考:http://activemq.apache.org/apollo/documentation/user-manual.html)
// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
// connect the client
client.connect({userName:'admin',password:'password',onSuccess:onConnect});
// called when the client connects
function onConnect() { // 連接成功后的處理
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("/topic/event"); // 訂閱消息的主題
var message = new Paho.MQTT.Message("Hello,this is a test");
message.destinationName = "/topic/event";
client.send(message); // 發送消息
}
// called when the client loses its connection
function onConnectionLost(responSEObject) { // 連接丟失后的處理
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:"+responseObject.errorMessage);
}
}
// called when a message arrives
function onMessageArrived(message) { // 消息接收成功后的處理
console.log("onMessageArrived:"+message.payloadString);
}b. java客戶端實現
paho目前只支持J2SE和安卓,下載地址:https://eclipse.org/paho/clients/java/,我們采用maven方式。
maven庫地址:
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots
maven dependency:<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.1</version>
</dependency>
說明:版本為1.0.0或0.9.0時,其jar包根本加載不進來,最后搜到1.0.1版本才可以正常使用。
java端實現:public interface IMessage {
String getHost();
Integer getPort();
Integer getQos();
String getTopic();
String getClientId();
String getContent();
byte[] getContentBytes();
Map<String,Object> getOption();
Object getSender();
Date getSendTime();
}public final class MessagePRocessingCenter {
protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class);
protected static final String BROKER_PREFIX="tcp://";
protected static final String BROKER_HOST="localhost";
protected static final int PORT=61613;
protected static final int QOS=2;
protected static final String TOPIC="/topic/event";
protected static final String CLIENT_ID="clientId";
protected static final String MQ_USER="admin";
protected static final String MQ_PASSWORD="password";
public static void send(IMessage message){
String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic();
int qos=null == message.getQos()?QOS: message.getQos();
String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost());
int port=null == message.getPort()?PORT:message.getPort();
broker+=":"+port;
String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId();
Map<String,Object> opts=message.getOption();
String user=MQ_USER;
String password=MQ_PASSWORD;
if(null != opts){
if(null != opts.get("userName")){
user=opts.get("userName").toString();
}
if(null != opts.get("password")){
password=opts.get("password").toString();
}
}
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(user);
connOpts.setPassword(password.toCharArray());
connOpts.setCleansession(true);
sampleClient.connect(connOpts);
MqttMessage mqm = new MqttMessage(message.getContentBytes());
mqm.setQos(qos);
sampleClient.publish(topic, mqm);
sampleClient.disconnect();
} catch(MqttException me) {
logger.info("********************* send message exception :");
logger.info("********************* reason : " + me.getReasonCode());
logger.info("********************* msg : " + me.getMessage());
logger.info("********************* loc : " + me.getLocalizedMessage());
logger.info("********************* cause : " + me.getCause());
logger.info("********************* excep : " + me);
me.printStackTrace();
}
}
public static void send(Set<IMessage> set){
for(IMessage message:set){
send(message);
}
}
}
至此,MQTT協議已部署完畢,java端可以發布消息,而javascript端則可以訂閱并接收到java端發布的信息。
本文只是依照官網手冊而實現的簡單應用,講解不一定十分準確,有什么不對的地方還請多多指點,更詳細的應用請參考官網文檔:
apollo:http://activemq.apache.org/apollo/documentation/user-manual.html
eclipse paho:https://eclipse.org/paho/
新聞熱點
疑難解答