亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

詳解Spring Cloud Stream使用延遲消息實現定時任務(RabbitMQ)

2024-07-14 08:43:28
字體:
來源:轉載
供稿:網友

我們在使用一些開源調度系統(比如:elastic-job等)的時候,對于任務的執行時間通常都是有規律性的,可能是每隔半小時執行一次,或者每天凌晨一點執行一次。然而實際業務中還存在另外一種定時任務,它可能需要一些觸發條件才開始定時,比如:編寫博文時候,設置2小時之后發送。對于這些開始時間不確定的定時任務,我們也可以通過Spring Cloud Stream來很好的處理。

為了實現開始時間不確定的定時任務觸發,我們將引入延遲消息的使用。RabbitMQ中提供了關于延遲消息的插件,所以本文就來具體介紹以下如何利用Spring Cloud Stream以及RabbitMQ輕松的處理上述問題。

動手試試

插件安裝

關于RabbitMQ延遲消息的插件介紹可以查看官方網站: https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

安裝方式很簡單,只需要在這個頁面: http://www.rabbitmq.com/community-plugins.html 中找到 rabbitmq_delayed_message_exchange 插件,根據您使用的RabbitMQ版本選擇對應的插件版本下載即可。

注意:只有RabbitMQ 3.6.x以上才支持

在下載好之后,解壓得到 .ez 結尾的插件包,將其復制到RabbitMQ安裝目錄下的 plugins 文件夾。

然后通過命令行啟用該插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

該插件在通過上述命令啟用后就可以直接使用,不需要重啟。

另外,如果您沒有啟用該插件,您可能為遇到類似這樣的錯誤:

ERROR 156 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=1

應用編碼

下面通過編寫一個簡單的例子來具體體會一下這個屬性的用法:

@EnableBinding(TestApplication.TestTopic.class)@SpringBootApplicationpublic class TestApplication {  public static void main(String[] args) {    SpringApplication.run(TestApplication.class, args);  }  @Slf4j  @RestController  static class TestController {    @Autowired    private TestTopic testTopic;    /**     * 消息生產接口     *     * @param message     * @return     */    @GetMapping("/sendMessage")    public String messageWithMQ(@RequestParam String message) {      log.info("Send: " + message);      testTopic.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 5000).build());      return "ok";    }  }  /**   * 消息消費邏輯   */  @Slf4j  @Component  static class TestListener {    @StreamListener(TestTopic.INPUT)    public void receive(String payload) {      log.info("Received: " + payload);    }  }  interface TestTopic {    String OUTPUT = "example-topic-output";    String INPUT = "example-topic-input";    @Output(OUTPUT)    MessageChannel output();    @Input(INPUT)    SubscribableChannel input();  }}

內容很簡單,既包含了消息的生產,也包含了消息消費。在 /sendMessage 接口的定義中,發送了一條消息,一條消息的頭信息中包含了 x-delay 字段,該字段用來指定消息延遲的時間,單位為毫秒。所以上述代碼發送的消息會在5秒之后被消費。在消息監聽類 TestListener 中,對 TestTopic.INPUT 通道定義了 @StreamListener ,這里會對延遲消息做具體的邏輯。由于消息的消費是延遲的,從而變相實現了從消息發送那一刻起開始的定時任務。

在啟動應用之前,還要需要做一些必要的配置,下面分消息生產端和消費端做說明:

消息生產端

spring.cloud.stream.bindings.example-topic-output.destination=delay-topicspring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true

注意這里的一個新參數 spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange,用來開啟延遲消息的功能,這樣在創建exchange的時候,會將其設置為具有延遲特性的exchange,也就是用到上面我們安裝的延遲消息插件的功能。

消息消費端

spring.cloud.stream.bindings.example-topic-input.destination=delay-topicspring.cloud.stream.bindings.example-topic-input.group=testspring.cloud.stream.rabbit.bindings.example-topic-input.consumer.delayed-exchange=true

在消費端也一樣,需要設置 spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true 。如果該參數不設置,將會出現類似下面的錯誤:

ERROR 9340 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay-topic' in vhost '/': received 'topic' but current is ''x-delayed-message'', class-id=40, method-id=10)

完成了上面配置之后,就可以啟動應用,并嘗試訪問 localhost:8080/sendMessage?message=hello 接口來發送一個消息到MQ中了。此時可以看到類似下面的日志:

2019-01-02 23:28:45.318 INFO 96164 --- [ctor-http-nio-3] c.d.s.TestApplication$TestController   : Send: hello2019-01-02 23:28:45.328 INFO 96164 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory    : Attempting to connect to: [localhost:5672]2019-01-02 23:28:45.333 INFO 96164 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory    : Created new connection: rabbitConnectionFactory.publisher#5c5f9a03:0/SimpleConnection@3278a728 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 53536]2019-01-02 23:28:50.349 INFO 96164 --- [ay-topic.test-1] c.d.stream.TestApplication$TestListener : Received: hello

從日志中可以看到, Send: hello 和 Received: hello 兩條輸出之間間隔了5秒,符合我們上面編碼設置的延遲時間。

深入思考

在代碼層面已經完成了定時任務,那么我們如何查看延遲的消息數等信息呢?

此時,我們可以打開RabbitMQ的Web控制臺,首先可以進入Exchanges頁面,看看這個特殊exchange,具體如下:

Spring,Cloud,Stream,延遲消息,定時任務,RabbitMQ

可以看到,這個exchange的Type類型是 x-delayed-message 。點擊該exchange的名稱,進入詳細頁面,就可以看到更多具體信息了:

Spring,Cloud,Stream,延遲消息,定時任務,RabbitMQ

代碼示例

本文示例讀者可以通過查看下面倉庫的中的 stream-delayed-message 項目:

Github

Gitee

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
91国偷自产一区二区三区的观看方式| 亚州国产精品久久久| 国产91在线播放九色快色| 色噜噜狠狠狠综合曰曰曰88av| 亚洲高清福利视频| 国产精品高潮呻吟久久av无限| 色yeye香蕉凹凸一区二区av| 国产成+人+综合+亚洲欧洲| 九九热视频这里只有精品| 57pao国产精品一区| 日本精品在线视频| 亚洲中国色老太| 欧美国产日韩一区二区在线观看| 国产欧美日韩最新| 啊v视频在线一区二区三区| 国产成人jvid在线播放| 亚洲精品久久久久久久久| 亚洲国产精品系列| 国产91在线视频| 国产精品久久久久久亚洲影视| 国产男女猛烈无遮挡91| 国产欧美精品一区二区三区介绍| 欧美另类第一页| 精品女同一区二区三区在线播放| 成人黄色影片在线| 国产精品免费久久久久影院| 91精品视频在线播放| 国产成人在线亚洲欧美| 亚洲男人天天操| 国产精品一区二区三区久久| 97人洗澡人人免费公开视频碰碰碰| 亚洲国产欧美久久| 国产精品国产三级国产专播精品人| 欧美激情国产日韩精品一区18| 亚洲欧美日韩一区二区在线| 国产日本欧美一区二区三区在线| 国产日韩综合一区二区性色av| 性欧美xxxx| 欧美激情精品久久久久久大尺度| 国产精品国语对白| 另类图片亚洲另类| 精品中文视频在线| 久久久精品一区二区三区| 日韩精品视频免费专区在线播放| 国产精品678| 国产精品高潮呻吟久久av黑人| 欧美大秀在线观看| 亚洲国产天堂久久综合| 国产欧美日韩免费看aⅴ视频| 国产精品第七影院| 成人h片在线播放免费网站| 91久久精品久久国产性色也91| 91国产精品视频在线| 日韩在线观看视频免费| 亚洲欧美一区二区精品久久久| 欧美韩日一区二区| 青青a在线精品免费观看| 久久影视电视剧免费网站| 亚洲午夜未删减在线观看| 亚洲第一二三四五区| 欧美一级视频在线观看| 国产精品福利在线观看| 国产成+人+综合+亚洲欧美丁香花| 伊人久久久久久久久久久| 日韩黄色高清视频| 亚洲男人天堂手机在线| 国产精品亚洲美女av网站| 精品国产老师黑色丝袜高跟鞋| 欧美亚洲成人xxx| 亚洲欧洲视频在线| 国产成人高清激情视频在线观看| 亚洲网在线观看| 尤物精品国产第一福利三区| 成人福利免费观看| 啪一啪鲁一鲁2019在线视频| 亚洲精品电影久久久| 国产一区二区三区丝袜| 欧美激情免费在线| 成人在线中文字幕| 久久夜精品va视频免费观看| 91久久中文字幕| 日本精品视频网站| 韩国视频理论视频久久| 国产精品久久一区主播| 久久久精品国产网站| 欧美极度另类性三渗透| 欧美日韩国产中文精品字幕自在自线| 91精品视频在线看| 国产亚洲在线播放| 欧美黑人xxxⅹ高潮交| 亚洲新中文字幕| 欧美在线激情视频| 欧美激情精品久久久久久蜜臀| 日韩美女免费视频| 午夜精品一区二区三区在线视频| 久久久久久国产精品三级玉女聊斋| 成人午夜在线视频一区| 97精品伊人久久久大香线蕉| 国产福利精品av综合导导航| 久久久亚洲影院你懂的| 精品国产自在精品国产浪潮| 久久久久久久久爱| 国产精品永久免费| 97超碰国产精品女人人人爽| 亚洲品质视频自拍网| 国产剧情日韩欧美| 国产精品中文久久久久久久| 日韩av一区二区在线| 亚洲aaa激情| 亚洲第一页中文字幕| 亚洲男人7777| 中文综合在线观看| 91久久综合亚洲鲁鲁五月天| 在线观看国产成人av片| 国产精品福利在线观看| 亚洲欧洲日产国产网站| 日韩中文有码在线视频| 亚洲国产精品专区久久| 国产不卡av在线免费观看| 日本91av在线播放| 国产精品视频一区二区三区四| 久久影视电视剧免费网站| 伦伦影院午夜日韩欧美限制| 国产精品夫妻激情| 欧美午夜精品久久久久久浪潮| 日韩黄色高清视频| 精品二区三区线观看| 欧美视频专区一二在线观看| 成人精品视频99在线观看免费| 色爱精品视频一区| 亚洲国产成人久久| 91在线观看免费| 久久精品中文字幕| 欧美亚洲第一区| 欧美极品在线播放| 国产精品久久久久久av下载红粉| 国产精品久久综合av爱欲tv| 日韩欧美成人区| 夜夜嗨av一区二区三区免费区| 一区二区亚洲精品国产| 成人免费网视频| 4k岛国日韩精品**专区| 国产精品劲爆视频| 在线亚洲午夜片av大片| 亚洲欧美国产精品久久久久久久| 欧美一区二区三区免费观看| 7m精品福利视频导航| 久久99久久99精品免观看粉嫩| 精品国产一区二区三区久久狼黑人| 亚洲mm色国产网站| 亚洲国产精品一区二区久| 久久精品中文字幕免费mv| 蜜臀久久99精品久久久无需会员| 亚洲综合成人婷婷小说| 欧美日韩一区二区三区在线免费观看| 精品一区二区三区电影| y97精品国产97久久久久久| 欧美在线欧美在线| 亚洲欧洲日产国码av系列天堂| 日韩久久免费视频| 亚洲国产精品久久久久| 欧美成人精品一区二区三区| 一本色道久久88精品综合| 欧美成人精品在线播放|