亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

Spring下ActiveMQ實戰

2019-11-14 15:10:29
字體:
來源:轉載
供稿:網友

    MessageQueue是分布式的系統里經常要用到的組件,一般來說,當需要把消息跨網段、跨集群的分發出去,就可以用這個。一些典型的示例就是:

    1、集群A中的消息需要發送給多個機器共享;

    2、集群A中消息需要主動推送,但彼此的網絡不是互通的(如集群A只有過HA才能被外界訪問);

    

    當然上面的幾個點,除了用MQ還有其它實現方式,但是MQ無疑是非常適合用來做這些事的。眾多MQ中,ActiveMQ是比較有名氣也很穩定的,它發送消息的成本非常廉價,支持Queue與Topic兩種消息機制。本文主要就是講如何在SPRing環境下配置此MQ:

 

1、場景假設

    現有機器兩臺Server、Worker需要進行異步通信,另有一臺ActiveMQ機器,關于MQ的配置信息存放在Zookeeper中,Zookeeper的節點有:

      - /mq/activemq/ip:mq的機器ip

      -/mq/activemq/port:這是mq的機器端口

 

2、Server的Spring xml配置

    Server主要的工作就是接受Worker消息,并發送消息給Worker。主要是定義了連接MQ的連接池,接受Worker消息的隊列worker,發送消息給Worker的隊列server:

 1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" 3         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd 4         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd"> 5  6     <!-- ActiveMQ連接池 --> 7     <bean id="conFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> 8         <property name="connectionFactory"> 9             <bean class="org.apache.activemq.ActiveMQConnectionFactory">10                 <property name="brokerURL">11                     <bean class="lekko.mq.util.MQPropertiesFactory" factory-method="getUrl" />12                 </property>13                 <property name="closeTimeout" value="60000" />14                 <!-- <property name="userName" value="admin" /> -->15                 <!-- <property name="passWord" value="admin" /> -->16                 <!-- <property name="optimizeAcknowledge" value="true" /> -->17                 <property name="optimizedAckScheduledAckInterval" value="10000" />18             </bean>19         </property>20     </bean>21 22 23     <!-- Worker任務消息 -->24     <bean id="taskWorkerTopic" class="org.apache.activemq.command.ActiveMQTopic">25         <constructor-arg value="worker_topic" />26     </bean>27     <!-- 任務監聽容器 -->28     <bean id="taskWorkerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">29         <property name="connectionFactory" ref="conFactory" />30         <property name="destination" ref="taskWorkerTopic" />31         <property name="messageListener">32             <bean class="lekko.mq.task.TaskWorkerListener" />33         </property>34         <property name="pubSubDomain" value="true" />35     </bean>36 37 38     <!-- Server任務消息 -->39     <bean id="taskServerTopic" class="org.apache.activemq.command.ActiveMQTopic">40         <constructor-arg value="server_topic" />41     </bean>    42     <!-- 任務消息發送模板 -->43     <bean id="taskServerTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="conFactory" p:defaultDestination-ref="taskServerTopic" />44 45 </beans>

    一段一段地分析,ActiveMQ連接池這里,定義了連接的bean為“conFactory”,其中broberURL屬性是通過后臺java代碼的靜態方法來設置的,方便線上環境通過Java代碼動態地切換,稍后會介紹這塊代碼,你現在需要知道的是,它實際上返回的就是一個字符串,格式像:tcp://xxx.xxx.xxx.xxx:port,如果不要用后臺來管理連接信息,直接改成“<property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:port">”也是OK的。

    接下來,便是Worker消息隊列的定義,這里定義為“taskWorkerTopic”,類型是org.apache.activemq.command.ActiveMQTopic,(訂閱模式)它表示一個消息可以被多個機器收到并處理,其它的還有org.apache.activemq.command.ActiveMQQueue,(點對點模式)表示一個消息只能被一臺機器收到,當收到后消息就出隊列了,其它機器無法處理。它們都有一個構造參數constructor-arg,指定了消息隊列的名稱,一個MQ中一個消息隊列的名字是唯一的。

    Worker的消息隊列定義好了之后,就是接受Worker的里消息了,這里定義了“taskWorkerContainer”,其屬性分別定義了連接池、目標隊列、消息處理器(我們自己的Java類,后面再講),參數pubSubDomain用于指定是使用訂閱模式還是使用點對點模式,如果是ActiveMQTopic則要設置為true,默認是false。

    好了,Server現在已經可以通過自己定義的“lekko.mq.task.TaskWorkerListener”類接受并處理taskWorkerTopic的消息了。

    如法炮制,定義一個專門用于往Worker里發消息的隊列“taskServerTopic”,并定義發送消息的模板“taskServerTemplate”備用。

 

3、Server端的接收類與發送類

    lekko.mq.task.TaskWorkerListener便是一個接收類示例:

 1 package lekko.mq.task; 2  3 import javax.jms.Message; 4 import javax.jms.MessageListener; 5  6 import org.apache.activemq.command.ActiveMQObjectMessage; 7 import org.apache.log4j.Logger; 8 import org.springframework.stereotype.Service; 9 import lekko.mq.model.MessageModel;10 11 12 /**13  * Task消息監聽類14  * @author lekko15  */16 @Service17 public class TaskWorkerListener implements MessageListener {18 19     private Logger _logger = Logger.getLogger(TaskWorkerListener.class);20 21     @Override22     public void onMessage(Message message) {23         if (message instanceof ActiveMQObjectMessage) {24             ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;25             try {26                 onMessage((MessageModel) aMsg.getObject());27             } catch (Exception e) {28                 _logger.warn("Message:${} is not a instance of MessageModel.", e);29             }30         } else {31             _logger.warn("Message:${} is not a instance of ActiveMQObjectMessage.");32         }33     }34 35     /**36      * 處理消息37      * @param message 自定義消息實體38      */39     public void onMessage(MessageModel message) { ... }40 41 }

    這里給大家演示的并不是最基礎的知識,處理的消息是一個自定義的類“lekko.mq.model.MessageModel”,這個類怎么寫可以隨便整,反正就是一些你要傳遞的數據字段,但是記得要實現Serializable接口。如果你需要傳遞的僅僅是純字符串,那么直接在代碼的23行片,把message.toString()即可。這個類通過前面XML配置會處理來自“worker_topic”隊列中的消息。

    

    再就是發送類,實際上就是把前面的taskServiceTemplate拿來用就行了:

 1 package lekko.mq.task; 2  3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.beans.factory.annotation.Qualifier; 5 import org.springframework.jms.core.JmsTemplate; 6 import org.springframework.stereotype.Service; 7 import lekko.mq.model.MessageModel; 8  9 10 /**11  * 服務器任務消息分發12  * @author lekko13  */14 @Service15 public class TaskServerSender {16 17     @Autowired18     @Qualifier("taskServerTemplate")19     private JmsTemplate jmsTemplate;20 21     /**22      * 發送消息23      */24     public void sendMessage(MessageModel msg) {25         jmsTemplate.convertAndSend(msg);26     }27 28 }

    把這個類TaskServerSender注入到任意需要用到的地方,調用sendMessage方法即可。它會往前面定義的“server_topic”中塞消息,等Worker來取。

 

4、關于Zookeeper配置MQ連接信息

    Worker端的配置我這里不再闡述,因為它跟在Server端的配置太相像,區別就在于Server端是從worker_topic中取消息,往server_topic中寫消息;而Worker端的代碼則是反過來,往worker_topic中寫消息,從server_topic中取消息。

    那么如何使用Java代碼來控制ActiveMQ的配置消息呢:

 1 package lekko.mq.util; 2  3 import org.apache.zookeeper.ZooKeeper; 4 import org.apache.zookeeper.data.Stat; 5  6 /** 7  * 獲取MQ配置 8  * @author lekkoli 9  */10 public class MQPropertiesFactory {11     12     private static boolean isLoaded = false;13     private static String ZOOKEEPER_CLUST = "xxx.xxx.xxx.xxx:2181";14     private static ZooKeeper _zk;15     private static String _ip;16     private static String _port;17 18     private static String getProperty(String path) throws Exception {19         if (_zk == null) {20             if (ZOOKEEPER_CLUST == null) {21                 throw new Exception("Zookeeper, Host /"" + ZOOKEEPER_CLUST + "/" is null!");22             }23             _zk = new ZooKeeper(ZOOKEEPER_CLUST, 90000, null);24         }25         Stat s = _zk.exists(path, false);26         if (s != null)27             return new String(_zk.getData(path, false, s));28         throw new Exception("Zookeeper, Path /"" + path + "/" is not exist!");29     }30 31     private static void load() throws Exception {32         if (!isLoaded) {33             _ip = getProperty("/mq/activemq/ip");34             _port = getProperty("/mq/activemq/port");35             isLoaded = true;36         }37     }38 39     public static String getUrl() throws Exception {40         load();41         StringBuilder failover = new StringBuilder();42         String[] ips = _ip.split(";"), ports = _port.split(";");43         for (int i = 0; i < ips.length; ++i) {44             failover.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(",");45         }46         failover.setLength(failover.length() - 1);47         String failovers = failover.toString();48         if (ips.length > 1) {49             failovers = "failover:(" + failovers + ")";50         }51         return failovers;52     }53 }

    上面的代碼需要解釋的地方跟MQ相關的不多,主要就是如果是mq集群,則格式是:failover:(tcp://192.168.1.117:1001,tcp://192.168.1.118:1001,tcp://xxx.xxx.xxx.xxx:port)。其它上面代碼沒有對Zookeeper集群都掛了的情況,做應急連接方案。當然,無論如何本節都不是全文的重點,但是多學一技何嘗不可?

    最近工作越來越忙,更新博客也是時有時無,但是我會堅持下去,還有許多工作中的點滴,在這里沉淀一下,也希望更進一步吧。

 

    轉載請注明原址:http://www.49028c.com/lekko/p/4940976.html 

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
青青草成人在线| 在线播放日韩专区| 91久久久久久国产精品| 欧美激情精品久久久久久| 18性欧美xxxⅹ性满足| 精品无码久久久久久国产| 国产日韩欧美视频| 久久久久久久久久久久av| 国产suv精品一区二区三区88区| 欧美电影在线观看高清| 久久五月天综合| 91精品综合久久久久久五月天| 5566成人精品视频免费| 2019中文字幕全在线观看| 国产一区二区三区在线视频| 久久久精品999| 久久人91精品久久久久久不卡| 91成人免费观看网站| 久久视频在线免费观看| 日韩性生活视频| 欧美激情日韩图片| 国产一区二区三区毛片| 国产精品视频公开费视频| 日本国产精品视频| 久久久久久这里只有精品| 亚洲国产日韩欧美综合久久| 国产精品视频永久免费播放| 国产午夜精品免费一区二区三区| 97视频com| 国产大片精品免费永久看nba| 日韩精品久久久久久久玫瑰园| 亚洲伊人成综合成人网| 日韩av中文字幕在线播放| 韩国视频理论视频久久| 久久精品久久精品亚洲人| 欧美亚洲另类视频| 国产欧美日韩中文字幕在线| 欧美贵妇videos办公室| 国产日韩一区在线| 成人在线一区二区| 久久久久久久激情视频| 国语自产精品视频在线看一大j8| 国产一区二区三区欧美| 久久精品99无色码中文字幕| 国产精品老女人精品视频| 国产欧美日韩中文| 欧美日韩国产中文字幕| 成人免费在线网址| 久久视频中文字幕| 911国产网站尤物在线观看| 国产精品久久久久久久7电影| 大胆欧美人体视频| 国产精品久久99久久| 欧美高清第一页| 亚洲精品日韩激情在线电影| 亚洲已满18点击进入在线看片| 久久久av电影| 亚洲电影中文字幕| 亚洲a在线播放| 国产精品爽爽ⅴa在线观看| 国产日韩欧美夫妻视频在线观看| 在线视频欧美日韩| 亚洲电影免费观看高清完整版在线观看| 日韩在线视频一区| 日韩中文字幕欧美| 成人中文字幕在线观看| 成人精品久久久| 国产成人av网| 亚洲高清一二三区| 日韩av手机在线| 欧美亚洲另类制服自拍| 亚洲精品日韩激情在线电影| 国产亚洲精品久久久| 在线国产精品播放| 欧美激情亚洲精品| 亚洲精品www久久久久久广东| 亚洲欧洲自拍偷拍| 色午夜这里只有精品| 欧美一区三区三区高中清蜜桃| 亚洲综合日韩中文字幕v在线| 日韩中文在线中文网三级| 精品自拍视频在线观看| 国产一区欧美二区三区| 亚洲精品资源美女情侣酒店| 97超碰国产精品女人人人爽| 欧美特级www| 国产精品女人久久久久久| 国产一区二区美女视频| 午夜欧美不卡精品aaaaa| 亚洲自拍偷拍色片视频| 亚洲国产精品成人一区二区| 国产精品久久久久久久久久尿| 国产欧美在线播放| 色综合老司机第九色激情| 欧美电影免费观看电视剧大全| 一个色综合导航| 亚洲精品在线视频| 热草久综合在线| 国产一区二区三区中文| 日韩欧美第一页| www国产精品视频| 久久国产精品久久久久| 91av国产在线| 国产一区二区欧美日韩| 国产精品久久久久aaaa九色| 91免费精品国偷自产在线| 亚洲精品视频中文字幕| 日韩免费观看在线观看| 欧美体内谢she精2性欧美| 在线观看日韩av| 精品视频在线导航| 欧美最猛性xxxx| 久久精品国产亚洲精品2020| 亚洲欧美日韩国产成人| 精品福利在线观看| 国产亚洲视频中文字幕视频| 91色琪琪电影亚洲精品久久| 中文在线资源观看视频网站免费不卡| 欧美极品欧美精品欧美视频| 欧美成人免费va影院高清| 成人免费福利视频| 91在线免费看网站| 国产精品18久久久久久麻辣| 欧美性猛交xxxx久久久| 日本免费久久高清视频| 成人免费自拍视频| 91久久国产精品| 午夜精品久久久久久久99黑人| 久久久91精品国产| 久久精品99国产精品酒店日本| 91久久精品国产91性色| 成人精品一区二区三区电影免费| 日韩国产激情在线| 国产专区欧美专区| 欧美性猛交xxxx富婆弯腰| 国产精品视频26uuu| 日韩中文字幕在线免费观看| 日韩av黄色在线观看| 欧美中文在线免费| 国产精品久久久久久久久久久新郎| 亚洲国产欧美一区| 色综合久综合久久综合久鬼88| 国产一区二区三区日韩欧美| 成人黄色午夜影院| 国产精品久久久久久亚洲影视| 欧美精品做受xxx性少妇| 亚洲成人在线网| 7m精品福利视频导航| 成人有码在线播放| 亚洲美女av网站| 在线视频日本亚洲性| 亚洲欧洲一区二区三区在线观看| 国产精品欧美日韩一区二区| 欧美日韩国产精品专区| 亚洲97在线观看| 欧美天天综合色影久久精品| 精品久久久久久中文字幕| 欧美日韩国产限制| 日韩精品极品毛片系列视频| 久久综合伊人77777尤物| 91经典在线视频| 亚洲精品成a人在线观看| 国产精品极品美女粉嫩高清在线| 成人黄色午夜影院|