#命名規范:容器名稱.[隊列特點or路由特點].使用的平臺名稱.作用#@容器名稱:queue、exchange#@隊列特點:非持久化標記(undurable)、延時隊列(delay)、優先級隊列(priority)#@路由特點:direct、topic、fanout、headers#@使用的平臺名稱:xiangshang、xiangqian……#@作用:干什么的#eg:消息隊列(queue.xiangshang.message)、延時消息隊列(queue.delay.xiangshang.message)、普通路由(exchange.direct.xiangshang.common)、通用路由(exchange.direct.xiangshang.common)
依賴的jar
< properties > < spring.amqp.version >1.6.6.RELEASE</ spring.amqp.version > </ properties > < dependencies > < dependency > < groupId >org.springframework.amqp</ groupId > < artifactId >spring-amqp</ artifactId > < version >${spring.amqp.version}</ version > < exclusions > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-core</ artifactId > </ exclusion > </ exclusions > </ dependency > < dependency > < groupId >org.springframework.amqp</ groupId > < artifactId >spring-rabbit</ artifactId > < version >${spring.amqp.version}</ version > < exclusions > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-core</ artifactId > </ exclusion > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-messaging</ artifactId > </ exclusion > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-tx</ artifactId > </ exclusion > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-context</ artifactId > </ exclusion > </ exclusions > </ dependency > </ dependencies > |
連接配置(rabbitmq-config.properties)
#RabbitMQ服務器工廠參數設置 rmq.addresses=10.200.0.150:5672 rmq.username=root rmq.passWord=123456 #命名規范:容器名稱.[隊列特點or路由特點].使用的平臺名稱.作用 #@容器名稱:queue、exchange #@隊列特點:非持久化標記(undurable)、延時隊列(delay)、優先級隊列(priority) #@路由特點:direct、topic、fanout、headers #@使用的平臺名稱:xiangshang、xiangqian…… #@作用:干什么的 #eg:消息隊列(queue.xiangshang.message)、延時消息隊列(queue.delay.xiangshang.message)、普通路由(exchange.direct.xiangshang.common)、通用路由(exchange.direct.xiangshang.common) rmq.queue.xiangshang.test=queue.xiangshang.test rmq.queue.undurable.xiangshang.test=queue.undurable.xiangshang.test rmq.queue.priority.xiangshang.test=queue.priority.xiangshang.test rmq.queue.delay.xiangshang.test=queue.delay.xiangshang.test rmq.exchange.direct.xiangshang.test=exchange.direct.xiangshang.test rmq.exchange.fanout.xiangshang.test=exchange.fanout.xiangshang.test rmq.exchange.headers.xiangshang.test=exchange.headers.xiangshang.test |
xml配置(rabbitConfiguration.xml)
<? xml version = "1.0" encoding = "UTF-8" ?> < beans xmlns = "<a href="http://www.springframework.org/schema/beans" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans" xmlns:xsi = "<a href="http://www.w3.org/2001/XMLSchema-instance" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit = "<a href="http://www.springframework.org/schema/rabbit" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/rabbit" xmlns:context = "<a href="http://www.springframework.org/schema/context" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd "> < description >rabbitMQ連接服務配置</ description > < context:property-placeholder location = "classpath:rabbitmq-config.properties" /> <!-- 連接配置 --> < rabbit:connection-factory id = "connectionFactory" addresses = "${rmq.addresses}" username = "${rmq.username}" password = "${rmq.password}" /> < rabbit:admin connection-factory = "connectionFactory" /> <!-- 消息轉換器 --> < bean id = "gsonConverter" class = "org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 連接模板 --> < rabbit:template id = "rabbitTemplate" connection-factory = "connectionFactory" message-converter = "gsonConverter" exchange = "${rmq.exchange.direct.xiangshang.test}" /> <!-- 消息隊列 begin --> < rabbit:queue name = "${rmq.queue.xiangshang.test}" durable = "true" /> < rabbit:queue name = "${rmq.queue.undurable.xiangshang.test}" durable = "false" /> < rabbit:queue name = "${rmq.queue.priority.xiangshang.test}" durable = "false" > < rabbit:queue-arguments > < entry key = "x-max-priority" > < value type = "java.lang.Integer" >10</ value > </ entry > </ rabbit:queue-arguments > </ rabbit:queue > < rabbit:queue name = "${rmq.queue.delay.xiangshang.test}" > < rabbit:queue-arguments > < entry key = "x-message-ttl" > < value type = "java.lang.Long" >60000</ value > </ entry > < entry key = "x-dead-letter-exchange" value = "${rmq.exchange.direct.xiangshang.test}" /> < entry key = "x-dead-letter-routing-key" value = "${rmq.queue.xiangshang.test}" /> </ rabbit:queue-arguments > </ rabbit:queue > <!-- 消息隊列 end --> <!-- 消息路由 begin --> < rabbit:direct-exchange name = "${rmq.exchange.direct.xiangshang.test}" > < rabbit:bindings > < rabbit:binding queue = "${rmq.queue.xiangshang.test}" key = "${rmq.queue.xiangshang.test}" /> < rabbit:binding queue = "${rmq.queue.undurable.xiangshang.test}" key = "${rmq.queue.undurable.xiangshang.test}" /> < rabbit:binding queue = "${rmq.queue.priority.xiangshang.test}" key = "${rmq.queue.priority.xiangshang.test}" /> < rabbit:binding queue = "${rmq.queue.delay.xiangshang.test}" key = "${rmq.queue.delay.xiangshang.test}" /> </ rabbit:bindings > </ rabbit:direct-exchange > <!-- 發布訂閱 --> < rabbit:fanout-exchange name = "${rmq.exchange.fanout.xiangshang.test}" > < rabbit:bindings > < rabbit:binding queue = "${rmq.queue.xiangshang.test}" /> < rabbit:binding queue = "${rmq.queue.undurable.xiangshang.test}" /> </ rabbit:bindings > </ rabbit:fanout-exchange > <!-- 路由轉發 --> < rabbit:headers-exchange name = "${rmq.exchange.headers.xiangshang.test}" > < rabbit:bindings > < rabbit:binding queue = "${rmq.queue.delay.xiangshang.test}" > < rabbit:binding-arguments > < entry key = "x-match" value = "all" /> < entry key = "Operator" value = "xsjf" /> < entry key = "sex" value = "male" /> </ rabbit:binding-arguments > </ rabbit:binding > </ rabbit:bindings > </ rabbit:headers-exchange > <!-- 消息路由 end --> <!-- 監聽器 begin --> <!-- 消息接收:DirectListener.java繼承MessageListener接口實現onMessage方法即可,concurrency表示并發處理的監聽數量 --> < bean id = "directListener" class = "com.xiangshang.mq.listener.DirectListener" /> < rabbit:listener-container connection-factory = "connectionFactory" message-converter = "gsonConverter" concurrency = "1" acknowledge = "manual" > < rabbit:listener queues = "${rmq.queue.xiangshang.test},${rmq.queue.undurable.xiangshang.test}" ref = "directListener" /> </ rabbit:listener-container > <!-- 監聽器 end --> </ beans > |
客戶端發送(DirectQueueTest.java示例)
package com.xiangshang.mq.test; import java.util.Random; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.BeansException; import org.springframework.context.ConfigurableapplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class DirectQueueTest { private static String RMQ_QUEUE_XIANGSHANG_TEST; private static String RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST; private static String RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST; private static String RMQ_QUEUE_DELAY_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST; private static ConfigurableApplicationContext context = null ; private static AmqpTemplate rabbitTemplate = null ; static { RMQ_QUEUE_XIANGSHANG_TEST = "queue.xiangshang.test" ; RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST = "queue.undurable.xiangshang.test" ; RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST = "queue.priority.xiangshang.test" ; RMQ_QUEUE_DELAY_XIANGSHANG_TEST = "queue.delay.xiangshang.test" ; RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST = "exchange.direct.xiangshang.test" ; RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST = "exchange.fanout.xiangshang.test" ; RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST = "exchange.headers.xiangshang.test" ; try { context = new ClassPathXmlApplicationContext( "rabbitConfiguration.xml" ); } catch (BeansException e) { e.printStackTrace(); } rabbitTemplate = context.getBean( "rabbitTemplate" , AmqpTemplate. class ); } public static void main(String[] args) { demo1(); } /** * 對隊列發送消息 */ public static void demo1() { String sendMsg = "隊列消息." + String.valueOf( new Random().nextDouble()); /* * 默認路由已配置為exchange.direct.xiangshang.test * 發送普通消息:rabbit服務重啟后,數據丟失 */ MessageProperties mp1 = new MessageProperties(); mp1.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); Message msg1 = new Message(sendMsg.getBytes(), mp1); rabbitTemplate.convertAndSend(RMQ_QUEUE_XIANGSHANG_TEST, msg1); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST, RMQ_QUEUE_XIANGSHANG_TEST, msg1); // 效果同上 rabbitTemplate.convertAndSend(RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, msg1); /* * 發送持久化消息:rabbit服務重啟后,持久化隊列數據可恢復(默認情況是持久化的) */ rabbitTemplate.convertAndSend(RMQ_QUEUE_XIANGSHANG_TEST, sendMsg); rabbitTemplate.convertAndSend(RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, sendMsg); } /** * 發送廣播:此時廣播下所有隊列均可接收到數據 */ public static void demo2() { String sendMsg = "廣播消息." + String.valueOf( new Random().nextDouble()); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST, RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, sendMsg); } /** * 優先級隊列 */ public static void demo3() { for ( int i = 0 ; i < 10 ; i++) { int priority = new Random().nextInt( 10 ); String sendMsg = "隊列消息." + String.valueOf( new Random().nextDouble()) + "[" + priority + "]" ; System.out.println(sendMsg); MessageProperties mp = new MessageProperties(); mp.setPriority(priority); Message msg = new Message(sendMsg.getBytes(), mp); rabbitTemplate.convertAndSend(RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST, msg); } } /** * 延遲消息隊列 * 1.當前rabbitMQ沒有自動排序的隊列,它所支持的隊列是無序的 * 2.隊列遵守先進先出,因此它執行的順序是確認第一條數據過期時間為基準,如果第二條過期時間在第一條之前,第二條不會在第一條之前執行 * eg:隊列數據過期時間依次為 30s后、10秒后、20秒后,此時必須等到30秒后隊列才出數據 * 3.因此,延遲消息功能需要規范以下幾點: * 3.1 每個延遲業務申明獨有的路由、隊列,并標注業務使用說明 * 3.2 延遲隊列最好設置消息的最大過期時間,到期后要轉發的路由和隊列 * 3.3 header采用全部匹配,路由中需匹配與生產端header相同 */ public static void demo4() { for ( int i = 0 ; i < 3 ; i++) { int delay = (i + 1 ) * 2 * 1000 ; String sendMsg = "隊列消息." + String.valueOf( new Random().nextDouble()) + "[" + delay + "]" ; System.out.println(sendMsg); MessageProperties mp = new MessageProperties(); mp.setExpiration(String.valueOf(delay)); // 設置消息過期時間 mp.setHeader( "operator" , "xsjf" ); mp.setHeader( "sex" , "male" ); Message msg = new Message(sendMsg.getBytes(), mp); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST, "" , msg); } } } |
消息監聽(DirectListener.java示例)
package com.xiangshang.mq.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import com.rabbitmq.client.Channel; public class DirectListener implements ChannelAwareMessageListener { /* (non-Javadoc) * @see org.springframework.amqp.core.MessageListener#onMessage(org.springframework.amqp.core.Message) */ @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println( "DirectListener 獲取的消息內容:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); // 返回成功ack // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 返回失敗ack,并且數據重新入隊 Thread.currentThread().sleep( 1000 ); } }
|
新聞熱點
疑難解答