亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

RabbitMQ消息中間件示例詳解

2024-07-14 08:43:14
字體:
來源:轉載
供稿:網友

前言

RabbitMQ 是使用 Erlang 語言開發的消息中間件, 其遵循了高級消息隊列協議(Advanced Message Queuing Protocol, AMQP)。

與 Kafka 等消息隊列相比,RabbitMQ 最大的優勢在于其較高的可靠性:

  • 提供確認(ACK)和重傳機制保證消息完成消費, 消費者異常不會導致消息丟失
  • 提供消息持久化機制, broker 崩潰不會導致消息丟失
  • 集群模式下工作, 保證高可用

因為具有較高可靠性和一致性, RabbitMQ 可以勝任訂單處理、秒殺等一致性要求較高的業務場景。

RabbitMQ 概念與機制

RabbitMQ 中的概念模型:

  • Broker: 消息中間件實例, 可能是單個節點也可能是運行在多節點集群上的邏輯實體
  • 消息(Message): 消息由消息頭和消息體兩部分組成。消息頭中包括routing-key、priority等標準消息頭以及其它自定義消息頭,用于定義RabbitMQ對消息行為。消息體是字節流,包含消息內容。
  • 連接(Connection): 客戶端與 Broker 之間的 TCP連接
  • 信道(Channel): Channel 是建立在 TCP 連接上的邏輯(虛擬)連接。多個 Channel 復用同一個 TCP 連接, 以避免建立 TCP 連接的巨大開銷。 RabbitMQ 官方要求每個線程使用獨立的 Channel, 禁止多個線程共用 Channel。
  • 生產者(Publisher): 發送消息的客戶端線程
  • 消費者(Consumer): 處理消息的客戶端線程
  • 交換機(Exchange): 交換機負責將消息投遞到相應的隊列
  • 隊列(Queue): 接收并保存交換機投遞的消息,直至被消費者成功消費。邏輯結構遵循先進先出FIFO。
  • 綁定(Binding): 將隊列(Queue)注冊到交換機(Exchange)的路由表
  • 虛擬主機(Vhost): 每個Broker下可建立多個vhost, 每個 vhost 可建立獨立的 Exchange、Queue、綁定及權限系統。同一個 Broker 下的 vhost 共享 Connection、Channel 和 用戶系統,就是說可以使用同一個用戶身份使用同一個 Channel 訪問不同 vhost。

交換機(Exchange)

生產者發送的消息會首先送到交換機(Exchange), 交換機根據自身類型和消息的 routing-key 等信息將消息投遞到綁定的消息隊列中。

RabbitMQ中的四種標準交換機:

direct: 如果消息的 routing-key 與隊列的 binding-key 完全相同,direct類型的交換機則會將消息投遞到該隊列中。

  • 多個隊列可以使用相同的 binding-key 綁定到同一個 direct 交換機,direct 交換機會把消息投遞到所有 binding-key 與消息 routing-key 相同的隊列

topic: 允許隊列的 binding-key 中包含通配符*和#, topic 交換機會將消息投遞到 binding-key 與 routing-key 匹配的隊列中。

  • 通配符按照關鍵字進行匹配,如news.cn.a中的關鍵字是news、cn和a,即關鍵字按照.分割
  • #通配符匹配0個或多個關鍵字, news.#.a可以匹配news.a, news.cn.a和news.asia.cn.a等
  • *通配符匹配一個關鍵字, news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a

fanout: fanout 交換機不進行任何匹配, 將消息投遞到所有綁定的隊列

header: header 交換機根據消息頭進行投遞,現在已較少使用

我們可以使用 RabbitMQ 的插件機制使用第三方交換機或自行開發交換機。如實現延時投遞的delayed-message-exchange。

消息頭中的delivery-mode可以設置為 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在處理持久化的消息時都會先將消息寫入磁盤中再進行下一步處理, 即使 RabbitMQ 崩潰也不會丟失。

消費者客戶端通常使用的channel.basicConsume使用推(push)模式投遞消息, 即當有新消息時 Broker 通過 channel 主動向客戶端發送消息。客戶端也可以使用channel.basicGet從 Broker 拉取消息。

ACK機制

RabbitMQ 提供了確認送達(acknowledge)機制保證消息被正確處理不會丟失。

確認送達的回執有三種:

  • ACK: 消息已被成功處理
  • NACK: 消息處理異常, 需要重新投遞
  • REJECT: 消息非法, 丟棄消息

RabbitMQ 的 Queue 可以設置 no_ack=true, 則消息被投遞后即刪除不等待回執。

channel.basicConsume 可以指定auto_ack模式,若auto_ack=true當客戶端收到完整消息后即會自動發出ACK回執,否則必須顯式的發出回執。

Java 代碼示例

首先安裝并啟動RabbitMQ實例, Mac用戶可以使用 Homebrew 進行安裝:

brew install rabbitmq

啟動服務:

brew services start rabbitmq

或者使用官方docker鏡像:

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

RabbitMQ官網提供了Ubuntu、RPM以及Windows等多種平臺安裝方式。

RabbitMQ默認TCP端口為5672, Web控制臺默認端口15672。

在Maven中添加依賴:

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version></dependency>

編寫生產者:

package rabbit;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author finley */public class RabbitProducer { public static void main(String[] args) throws IOException, TimeoutException {  ConnectionFactory factory = new ConnectionFactory();  factory.setUsername("guest");  factory.setPassword("guest");  factory.setHost("localhost");  try (Connection conn = factory.newConnection();    Channel channel = conn.createChannel()) {   String exchangeName = "test-exchange";   channel.exchangeDeclare(exchangeName, "direct", true);   String routingKey = "hello";   byte[] msg = "hello world".getBytes();   AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();   propsBuilder.deliveryMode(2); // persistent   propsBuilder.priority(0); // normal   propsBuilder.contentType("text/plain");   channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);  } }}

編寫消費者:

package rabbit;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.*;/** * @author finley */public class RabbitConsumer { public static void main(String[] args) throws IOException, TimeoutException {  ConnectionFactory factory = new ConnectionFactory();  factory.setUsername("guest");  factory.setPassword("guest");  factory.setHost("localhost");  try (Connection conn = factory.newConnection();    Channel channel = conn.createChannel()) {   String exchangeName = "test-exchange";   channel.exchangeDeclare(exchangeName, "direct", true);   String queueName = channel.queueDeclare().getQueue();   String bindingKey = "hello";   channel.queueBind(queueName, exchangeName, bindingKey);   while(true) {    channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {      String routingKey = envelope.getRoutingKey();      String contentType = properties.getContentType();      String bodyStr = new String(body, "UTF-8");      System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);      long deliveryTag = envelope.getDeliveryTag();      channel.basicAck(deliveryTag, false);     }    });   }  } }}

RabbitMQ 的消息為字節, 可以將 Java 對象序列化后作為消息體發送。

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VeVb武林網的支持。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲a成v人在线观看| 奇米一区二区三区四区久久| 久久人人爽国产| 一区二区三区四区在线观看视频| 亚洲第一中文字幕| 亚洲iv一区二区三区| 欧美网站在线观看| 国产精品免费观看在线| 青青草一区二区| 国产欧美 在线欧美| 国产偷亚洲偷欧美偷精品| 97香蕉久久夜色精品国产| 国产不卡在线观看| 亚洲自拍欧美色图| 日本高清不卡在线| 亚洲热线99精品视频| 久久视频在线免费观看| 亚洲男人天堂2024| 亚洲一区二区日本| 亚洲第一页中文字幕| 日本久久久久久久久| 精品久久久在线观看| 久久久亚洲福利精品午夜| 日韩专区在线观看| 日韩高清a**址| 姬川优奈aav一区二区| 欧洲亚洲在线视频| 欧美xxxx18国产| 久久久亚洲国产| 日韩成人av在线| 国内精品久久久久| 亚洲第一级黄色片| 欧美性感美女h网站在线观看免费| 成人欧美一区二区三区在线湿哒哒| 亚洲aa中文字幕| 欧美精品情趣视频| 91国内精品久久| 国产一区二区黑人欧美xxxx| 在线观看国产精品91| 亚洲电影免费观看高清| 亚洲精品成人久久久| 国产v综合ⅴ日韩v欧美大片| 欧美性感美女h网站在线观看免费| 91精品国产高清| 成人精品在线视频| 国产成人一区二区三区电影| 91老司机精品视频| 国产香蕉一区二区三区在线视频| 精品视频www| 中文字幕亚洲激情| 日本国产一区二区三区| 欧美午夜电影在线| 韩国视频理论视频久久| 国产香蕉精品视频一区二区三区| 日韩成人在线视频网站| 欧美限制级电影在线观看| 久久成人一区二区| 国产精品91免费在线| 国产综合在线观看视频| 国产亚洲欧洲黄色| 国产精品日韩在线播放| 成人信息集中地欧美| 午夜精品久久17c| 国产区精品在线观看| 狠狠久久亚洲欧美专区| 成人黄色片网站| 欧美日韩亚洲成人| 亚洲精品免费网站| 性欧美xxxx交| 98精品国产高清在线xxxx天堂| 久久网福利资源网站| 久久久精品中文字幕| 亚洲国产精品国自产拍av秋霞| 欧美第一黄色网| 国产精品久久久久久久一区探花| 欧美天天综合色影久久精品| 久久久久国产精品www| 日韩欧美aⅴ综合网站发布| 国产精品一区二区在线| 国产一区二区三区欧美| 欧美国产第一页| 色综合亚洲精品激情狠狠| 久久不射热爱视频精品| 亚洲精品小视频| 日韩av影视在线| 一本大道久久加勒比香蕉| 亚洲国产成人在线播放| 亚洲999一在线观看www| 亚洲精品成人久久| 97成人在线视频| 亚洲综合社区网| 2020久久国产精品| 亚洲人av在线影院| 国产欧美日韩中文字幕| 欧美电影免费观看高清| 一区二区三区国产视频| 一本大道久久加勒比香蕉| 国产九九精品视频| 日本欧美一级片| 久久免费视频观看| 日韩女优在线播放| 亚洲男人天堂久| 日韩在线视频线视频免费网站| 久久青草福利网站| 久久精品在线播放| 国产女精品视频网站免费| 亚洲精品中文字幕女同| 国产狼人综合免费视频| 8090成年在线看片午夜| 中文字幕成人在线| 久久免费国产视频| 第一福利永久视频精品| 超碰日本道色综合久久综合| 欧美极品美女视频网站在线观看免费| 综合欧美国产视频二区| 91欧美激情另类亚洲| 日韩成人久久久| 欧美色视频日本高清在线观看| 粉嫩老牛aⅴ一区二区三区| 久久青草福利网站| 亚洲欧美日韩久久久久久| 日韩亚洲欧美中文在线| 日韩高清有码在线| 国产69精品久久久久99| 国产在线拍偷自揄拍精品| 久久久人成影片一区二区三区观看| 国产成人综合一区二区三区| 亚洲一级片在线看| 热re91久久精品国99热蜜臀| 欧美亚洲视频一区二区| 中文字幕精品影院| 狠狠色香婷婷久久亚洲精品| 欧美午夜精品久久久久久久| 成人欧美一区二区三区在线湿哒哒| 国内精品久久久久久影视8| 国产精品美女久久| 91视频8mav| 亚洲欧美一区二区三区四区| 久久久人成影片一区二区三区| 欧洲日本亚洲国产区| 95av在线视频| 欧美激情在线有限公司| 国产精品久久色| 色伦专区97中文字幕| 国产精品美腿一区在线看| 成人福利在线观看| 色综合老司机第九色激情| 欧美极品在线视频| 最近更新的2019中文字幕| 国语自产在线不卡| 国产成人一区二区| 成人黄色片网站| 国产精品看片资源| 欧美成人一二三| 川上优av一区二区线观看| 成人在线精品视频| 亚洲全黄一级网站| 在线中文字幕日韩| 日本午夜精品理论片a级appf发布| 欧美在线国产精品| 久久伊人精品天天| 欧美小视频在线观看| y97精品国产97久久久久久| 91国语精品自产拍在线观看性色|