亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

Rabbitmq延遲隊列實現定時任務的方法

2024-07-14 08:40:54
字體:
來源:轉載
供稿:網友

場景

開發中經常需要用到定時任務,對于商城來說,定時任務尤其多,比如優惠券定時過期、訂單定時關閉、微信支付2小時未支付關閉訂單等等,都需要用到定時任務,但是定時任務本身有一個問題,一般來說我們都是通過定時輪詢查詢數據庫來判斷是否有任務需要執行,也就是說不管怎么樣,我們需要先查詢數據庫,而且有些任務對時間準確要求比較高的,需要每秒查詢一次,對于系統小倒是無所謂,如果系統本身就大而且數據也多的情況下,這就不大現實了,所以需要其他方式的,當然實現的方式有多種多樣的,比如Redis實現定時隊列、基于優先級隊列的JDK延遲隊列、時間輪等。因為我們項目中本身就使用到了Rabbitmq,所以基于方便開發和維護的原則,我們使用了Rabbitmq延遲隊列來實現定時任務,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章 Spring boot集成RabbitMQ

Rabbitmq延遲隊列

Rabbitmq本身是沒有延遲隊列的,只能通過Rabbitmq本身隊列的特性來實現,想要Rabbitmq實現延遲隊列,需要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)

死信交換機

一個消息在滿足如下條件下,會進死信交換機,記住這里是交換機而不是隊列,一個交換機可以對應很多隊列。

  1. 一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
  2. 上面的消息的TTL到了,消息過期了。
  3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

死信交換機就是普通的交換機,只是因為我們把過期的消息扔進去,所以叫死信交換機,并不是說死信交換機是某種特定的交換機

消息TTL(消息存活時間)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。

byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。只是expiration字段是字符串參數,所以要寫個int類型的字符串: 當上面的消息扔到隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統計到隊列的消息數中去

處理流程圖

Rabbitmq,延遲隊列,定時任務

創建交換機(Exchanges)和隊列(Queues)

創建死信交換機

Rabbitmq,延遲隊列,定時任務

如圖所示,就是創建一個普通的交換機,這里為了方便區分,把交換機的名字取為:delay

創建自動過期消息隊列

這個隊列的主要作用是讓消息定時過期的,比如我們需要2小時候關閉訂單,我們就需要把消息放進這個隊列里面,把消息過期時間設置為2小時

Rabbitmq,延遲隊列,定時任務

創建一個一個名為delay_queue1的自動過期的隊列,當然圖片上面的參數并不會讓消息自動過期,因為我們并沒有設置x-message-ttl參數,如果整個隊列的消息有消息都是相同的,可以設置,這里為了靈活,所以并沒有設置,另外兩個參數x-dead-letter-exchange代表消息過期后,消息要進入的交換機,這里配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置消息過期后,進入死信交換機的routing-key,跟發送消息的routing-key一個道理,根據這個key將消息放入不同的隊列

創建消息處理隊列

這個隊列才是真正處理消息的隊列,所有進入這個隊列的消息都會被處理

Rabbitmq,延遲隊列,定時任務

消息隊列的名字為delay_queue2

消息隊列綁定到交換機

進入交換機詳情頁面,將創建的2個隊列(delay queue1和delay queue2)綁定到交換機上面

Rabbitmq,延遲隊列,定時任務

自動過期消息隊列的routing key 設置為delay

綁定delay queue2

Rabbitmq,延遲隊列,定時任務

delay queue2 的key要設置為創建自動過期的隊列的x-dead-letter-routing-key參數,這樣當消息過期的時候就可以自動把消息放入delay_queue2這個隊列中了

綁定后的管理頁面如下圖:

Rabbitmq,延遲隊列,定時任務

當然這個綁定也可以使用代碼來實現,只是為了直觀表現,所以本文使用的管理平臺來操作

發送消息

String msg = "hello word"; MessageProperties messageProperties = new MessageProperties();   messageProperties.setExpiration("6000");  messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());  Message message = new Message(msg.getBytes(), messageProperties);  rabbitTemplate.convertAndSend("delay", "delay",message);

主要的代碼就是

messageProperties.setExpiration("6000"); 

設置了讓消息6秒后過期

注意:因為要讓消息自動過期,所以一定不能設置delay_queue1的監聽,不能讓這個隊列里面的消息被接受到,否則消息一旦被消費,就不存在過期了

接收消息

接收消息配置好delay_queue2的監聽就好了

package wang.raye.rabbitmq.demo1;import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configurationpublic class DelayQueue {  /** 消息交換機的名字*/ public static final String EXCHANGE = "delay"; /** 隊列key1*/ public static final String ROUTINGKEY1 = "delay"; /** 隊列key2*/ public static final String ROUTINGKEY2 = "delay_key"; /**  * 配置鏈接信息  * @return  */ @Bean public ConnectionFactory connectionFactory() {  CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);  connectionFactory.setUsername("kberp");  connectionFactory.setPassword("kberp");  connectionFactory.setVirtualHost("/");  connectionFactory.setPublisherConfirms(true); // 必須要設置  return connectionFactory; } /**   * 配置消息交換機  * 針對消費者配置   FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念   HeadersExchange :通過添加屬性key-value匹配   DirectExchange:按照routingkey分發到指定隊列   TopicExchange:多關鍵字匹配   */  @Bean  public DirectExchange defaultExchange() {   return new DirectExchange(EXCHANGE, true, false); }  /**  * 配置消息隊列2  * 針對消費者配置   * @return  */ @Bean public Queue queue() {   return new Queue("delay_queue2", true); //隊列持久  } /**  * 將消息隊列2與交換機綁定  * 針對消費者配置   * @return  */ @Bean  @Autowired public Binding binding() {   return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);  }  /**  * 接受消息的監聽,這個監聽會接受消息隊列1的消息  * 針對消費者配置   * @return  */ @Bean  @Autowired public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {   SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());   container.setQueues(queue());   container.setExposeListenerChannel(true);   container.setMaxConcurrentConsumers(1);   container.setConcurrentConsumers(1);   container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認   container.setMessageListener(new ChannelAwareMessageListener() {   public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {    byte[] body = message.getBody();     System.out.println("delay_queue2 收到消息 : " + new String(body));     channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費    }   });   return container;  } }

在消息監聽中處理需要定時處理的任務就好了,因為Rabbitmq能發送消息,所以可以把任務特征碼發過來,比如關閉訂單就把訂單id發過來,這樣就避免了需要查詢一下那些訂單需要關閉而加重MySQL負擔了,畢竟一旦訂單量大的話,查詢本身也是一件很費IO的事情

總結

基于Rabbitmq實現定時任務,就是將消息設置一個過期時間,放入一個沒有讀取的隊列中,讓消息過期后自動轉入另外一個隊列中,監控這個隊列消息的監聽處來處理定時任務具體的操作

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
国产亚洲人成a一在线v站| 国产成人亚洲精品| 亚洲人成电影网站色| 欧美日韩国产999| 国产一区二区成人| 国产激情综合五月久久| 91色在线视频| 国产精品第三页| 欧美性在线视频| 亚洲第一国产精品| 亚洲级视频在线观看免费1级| 国产午夜精品视频免费不卡69堂| 精品视频偷偷看在线观看| 久久久亚洲网站| 91精品免费视频| 成人一区二区电影| 国产日本欧美一区二区三区| 国产精品久久久久秋霞鲁丝| 国产精品久久色| 最近2019中文字幕mv免费看| 国产精品一区=区| 日韩中文字幕视频在线观看| 久久好看免费视频| 久久99国产综合精品女同| 国产视频久久久| 成人啪啪免费看| 最近日韩中文字幕中文| 国模叶桐国产精品一区| 国产日韩欧美综合| 中文字幕av日韩| 日韩成人激情视频| 高清欧美一区二区三区| 97av在线视频| 97热在线精品视频在线观看| 国产欧美一区二区| 精品久久久香蕉免费精品视频| 欧美亚洲视频一区二区| 国产欧美精品va在线观看| 91久久精品视频| 欧美性猛交xxxx偷拍洗澡| 久久综合色影院| 欧美贵妇videos办公室| 中文字幕av一区二区| 亚洲欧美另类在线观看| 国产视频久久久久久久| 国产一区二区三区欧美| 国产精自产拍久久久久久蜜| 中文在线不卡视频| 国产婷婷成人久久av免费高清| 亚州精品天堂中文字幕| 亚洲www视频| 91国产精品视频在线| 日韩激情视频在线| 日韩av影视在线| 国产成人黄色av| 久久69精品久久久久久国产越南| 欧美日韩亚洲网| 日韩精品视频在线| 成人网在线免费看| 欧美日韩视频免费播放| 奇门遁甲1982国语版免费观看高清| 成人激情视频免费在线| 尤物yw午夜国产精品视频| 色爱精品视频一区| 清纯唯美亚洲激情| 热re91久久精品国99热蜜臀| 国产一区二区三区久久精品| 日韩在线不卡视频| 国产亚洲精品久久久久久777| 色多多国产成人永久免费网站| 欧美视频在线观看免费网址| 91亚洲精品一区二区| 成人黄色片网站| 国产精品一久久香蕉国产线看观看| 国产精品aaaa| 7m精品福利视频导航| 久久精品亚洲国产| 欧美另类在线播放| 日本精品视频在线观看| 中文字幕日韩精品在线观看| 欧美性xxxxx极品娇小| 日韩亚洲第一页| 91国产在线精品| 国产成人精彩在线视频九色| 国产香蕉一区二区三区在线视频| 久久精品精品电影网| 日韩av一区二区在线观看| 欧美成人免费一级人片100| 久久久精品欧美| 日韩成人在线视频| 欧美视频免费在线| 日韩成人在线免费观看| 91国产精品91| 欧美日韩国产激情| 欧美俄罗斯乱妇| 97国产在线观看| 精品国产拍在线观看| 亚洲精选中文字幕| 亚洲国产精品久久| 国产精品白嫩美女在线观看| 色噜噜狠狠色综合网图区| 一区二区三区四区视频| 91日韩在线播放| 久久亚洲春色中文字幕| 国产在线久久久| 97成人超碰免| 国产精品一区二区三| 一色桃子一区二区| 国产成人久久精品| 日本亚洲欧美三级| 国产精品久久久久久久av大片| 国产精品久久一区主播| 国产一区二区日韩精品欧美精品| 欧美黑人极品猛少妇色xxxxx| 精品亚洲一区二区三区在线播放| 伊人一区二区三区久久精品| 国内精品久久久久影院 日本资源| 成人免费自拍视频| 亚洲成人三级在线| 国产日韩欧美影视| 亚洲欧美日韩网| 97视频在线观看网址| 国产中文字幕日韩| 日韩中文字幕视频| 日韩视频精品在线| 97香蕉超级碰碰久久免费的优势| 国产精品久久婷婷六月丁香| 精品国产乱码久久久久酒店| 久热爱精品视频线路一| 久久免费视频在线| 日韩美女视频中文字幕| 91精品国产91久久| 亚洲综合日韩在线| 国产一区二区三区18| 欧美成人精品h版在线观看| 久久久久国产视频| 精品夜色国产国偷在线| 精品视频一区在线视频| 国产精品视频yy9099| 亚洲qvod图片区电影| 中文字幕亚洲字幕| 91久久久久久国产精品| 欧美日韩国产成人在线观看| 欧美在线中文字幕| 日韩国产欧美精品一区二区三区| 韩剧1988免费观看全集| 中文国产成人精品久久一| 91精品在线观看视频| 欧美激情性做爰免费视频| 国产欧美日韩免费看aⅴ视频| 两个人的视频www国产精品| 色婷婷成人综合| 国产精品免费视频久久久| 亚洲精品免费一区二区三区| 欧美综合在线观看| 日韩av电影免费观看高清| 欧美成人高清视频| 91免费国产视频| 日韩欧美成人精品| 91精品国产91久久久久久最新| 成人久久久久久久| 亚洲综合一区二区不卡| 欧美裸体xxxx极品少妇软件| 国产成人jvid在线播放|