SPRing地址:http://projects.spring.io/spring-amqp/
Spring提供的Demo地址:https://github.com/spring-projects/spring-amqp-samples
中文文檔地址:http://rabbitmq.mr-ping.com/
"Hello World!"
Work queues(工作隊列)
Publish/Subscribe(發布訂閱)
Routing(路由)
Topics(主題交換機)
RPC(遠程調用)
(此6特性不舉例說明,請查閱上述中文文檔地址和下載spring的demo理解即可)
隊列可恢復性,數據可恢復性
// 生產者 public static void main(String[] args) { String DURABLE_QUEUE_NAME = "queue_for_durable" ; String UNDURABLE_QUEUE_NAME = "queue_for_undurable" ; ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "10.200.0.150" ); factory.setPort( 5672 ); factory.setUsername( "root" ); factory.setPassWord( "123456" ); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明兩個隊列,第二個參數表示該隊列在rabbit服務器重啟后是否自動創建 channel.queueDeclare(DURABLE_QUEUE_NAME, true , false , false , null ); channel.queueDeclare(UNDURABLE_QUEUE_NAME, false , false , false , null ); // 數據是否可恢復(聲明數據為文本類型,可恢復) BasicProperties basicProperties = MessageProperties.PERSISTENT_TEXT_PLAIN; channel.basicPublish( "" , DURABLE_QUEUE_NAME, null , "文本一" .getBytes()); channel.basicPublish( "" , DURABLE_QUEUE_NAME, basicProperties, "文本二" .getBytes()); channel.basicPublish( "" , UNDURABLE_QUEUE_NAME, null , "文本三" .getBytes()); channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } |
執行上輸代碼,rabbit服務器會生成2個隊列,持久化隊列2條消息,臨時隊列1條消息;在linux服務器執行rabbit服務重啟命令:service rabbitmq-server restart,重啟后查看隊列狀態,此時只有1個持久化隊列,且只有1條消息
ack應答機制
// 消費者 public static void main(String[] args) { String DURABLE_QUEUE_NAME = "queue_for_durable" ; // 持久化隊列 ConnectionFactory factory = new ConnectionFactory(); factory.setHost( "10.200.0.150" ); factory.setPort( 5672 ); factory.setUsername( "root" ); factory.setPassword( "123456" ); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(DURABLE_QUEUE_NAME, true , false , false , null ); // 創建消費隊列(防止消費者先上線無法找到隊列而拋出異常) channel.basicQos( 1 ); // 指定當前接收消息的容量 QueueingConsumer consumer = new QueueingConsumer(channel); /* * 第二個參數如果為true則在接收到消息的一瞬間,rabbit服務器會刪除消息 * 為false表明ack應答機制為手動響應,此時消息會存儲在rabbit等待應答的隊列中,在未收到應答,會重新進入準備狀態 */ String resp = channel.basicConsume(DURABLE_QUEUE_NAME, false , consumer); System.out.println( "======:" + resp); QueueingConsumer.Delivery delivery = null ; while ( null != (delivery = consumer.nextDelivery())) { String message = new String(delivery.getBody()); System.out.println( "獲取到消息:" + message); // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 手動回應服務器,當前任務執行成功 } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } catch (ConsumerCancelledException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } |
執行程序,此時觀察rabbit服務器隊列狀態,持久化隊列中的消息處于<待應答>狀態,停止程序,此時rabbit服務器隊列數據狀態恢復到準備狀態;打開注釋channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);,執行程序后觀察rabbit服務器隊列發現數據已被移除;
消息的監聽
<? xml version = "1.0" encoding = "UTF-8" ?> < beans xmlns = "<a href="http://www.springframework.org/schema/beans" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans" xmlns:xsi = "<a href="http://www.w3.org/2001/XMLSchema-instance" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit = "<a href="http://www.springframework.org/schema/rabbit" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans <a href="http://www.springframework.org/schema/beans/spring-beans-4.0.xsd" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"> < description >rabbitMQ連接服務配置</ description > <!-- 連接配置,addresses可以配置多個服務器地址,用英文逗號分隔開 --> < rabbit:connection-factory id = "connectionFactory" addresses = "10.200.0.150:5672" username = "root" password = "123456" /> < rabbit:admin connection-factory = "connectionFactory" /> <!-- 消息轉換器 --> < bean id = "gsonConverter" class = "org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 消息隊列 --> < rabbit:queue name = "queue_for_durable" /> < rabbit:queue name = "queue_for_undurable" /> <!-- 連接模板 --> < rabbit:template id = "rabbitTemplate" connection-factory = "connectionFactory" message-converter = "gsonConverter" exchange = "directExchange" /> <!-- 客戶端發送器:全名匹配 --> < rabbit:direct-exchange name = "directExchange" > < rabbit:bindings > < rabbit:binding queue = "queue_for_durable" key = "queue_for_durable" /> < rabbit:binding queue = "queue_for_undurable" key = "queue_for_undurable" /> </ rabbit:bindings > </ rabbit:direct-exchange > <!-- 消息接收:DirectListener.java繼承MessageListener接口實現onMessage方法即可,concurrency表示并發處理的監聽數量 --> < bean id = "directListener" class = "com.xiangshang.mq.listener.DirectListener" /> < rabbit:listener-container connection-factory = "connectionFactory" message-converter = "gsonConverter" concurrency = "2" acknowledge = "auto" > < rabbit:listener queues = "queue_for_durable,queue_for_undurable" ref = "directListener" /> </ rabbit:listener-container > </ beans > |
junit加載上述文件,使用生產者向隊列發送消息,此時可以看到監聽器中接收到數據(提示:實現ChannelAwareMessageListener接口,listener也可以實現ack機制,此時acknowledge="manual")
發布訂閱
<? xml version = "1.0" encoding = "UTF-8" ?> < beans xmlns = "<a href="http://www.springframework.org/schema/beans" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans" xmlns:xsi = "<a href="http://www.w3.org/2001/XMLSchema-instance" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit = "<a href="http://www.springframework.org/schema/rabbit" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans <a href="http://www.springframework.org/schema/beans/spring-beans-4.0.xsd" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"> < description >rabbitMQ連接服務配置</ description > <!-- 連接配置,addresses可以配置多個服務器地址,用英文逗號分隔開 --> < rabbit:connection-factory id = "connectionFactory" addresses = "10.200.0.150:5672" username = "root" password = "123456" /> < rabbit:admin connection-factory = "connectionFactory" /> <!-- 消息轉換器 --> < bean id = "gsonConverter" class = "org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 消息隊列 --> < rabbit:queue name = "queue_for_durable" /> < rabbit:queue name = "queue_for_undurable" /> <!-- 連接模板 --> < rabbit:template id = "rabbitTemplate" connection-factory = "connectionFactory" message-converter = "gsonConverter" exchange = "fanoutExchange" /> <!-- 發布訂閱 --> < rabbit:fanout-exchange name = "fanoutExchange" > < rabbit:bindings > < rabbit:binding queue = "queue_for_durable" /> < rabbit:binding queue = "queue_for_undurable" /> </ rabbit:bindings > </ rabbit:fanout-exchange > </ beans > |
junit加載上述文件,使用生產者向隊列發送消息,此時在rabbit服務器可以看到發布訂閱模式路由下這兩個隊列,均能收到一條消息
延遲消息隊列(參見下一頁整合spring的demo)優先級隊列(參見下一頁整合spring的demo)新聞熱點
疑難解答