亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

淺談Springboot整合RocketMQ使用心得

2024-07-13 10:16:44
字體:
來源:轉載
供稿:網友

一、阿里云官網---幫助文檔

https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh

按照官網步驟,創建Topic、申請發布(生產者)、申請訂閱(消費者)

二、代碼

1、配置:

public class MqConfig {  /**   * 啟動測試之前請替換如下 XXX 為您的配置   */  public static final String PUBLIC_TOPIC = "test";//公網測試  public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER";  public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE";  public static final String ACCESS_KEY = "123";  public static final String SECRET_KEY = "123";  public static final String TAG = "";  public static final String THREAD_NUM = "25";//消費端線程數  /**   * ONSADDR 請根據不同Region進行配置   * 公網測試: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet   * 公有云生產: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal   * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal   * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal   */  public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";}

ONSADDR 阿里云用 公有云生產,測試用公網

不同的業務可以設置不同的tag,但是如果發送消息量大的話,建議新建TOPIC

2、生產者

方式1:

配置文件:producer.xml

<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd"><beans>  <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"     init-method="start" destroy-method="shutdown">    <property name="properties">      <map>        <entry key="ProducerId" value="" /> <!-- PID,請替換 -->        <entry key="AccessKey" value="" /> <!-- ACCESS_KEY,請替換 -->        <entry key="SecretKey" value="" /> <!-- SECRET_KEY,請替換 -->        <!--PropertyKeyConst.ONSAddr 請根據不同Region進行配置         公網測試: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet         公有云生產: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal         杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal         深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->        <entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>      </map>    </property>  </bean></beans>

啟動方式1,在使用類的全局里設置:

//初始化生產者  private ApplicationContext ctx;  private ProducerBean producer;  @Value("${producerConfig.enabled}")//開關,spring配置項,true為開啟,false關閉  private boolean producerConfigEnabled;  @PostConstruct  public void init(){    if (true == producerConfigEnabled) {      ctx = new ClassPathXmlApplicationContext("producer.xml");      producer = (ProducerBean) ctx.getBean("producer");    }  }

PS:最近發現一個坑,如果producer用上面這種方式啟動的話,一旦啟動的多了,會造成fullGC,所以可以換成下面這種注解方式啟動,在用到的地方手動start、shutdown

方式2:配置類(不需要xml)

@Configurationpublic class ProducerBeanConfig {  @Value("${openservices.ons.producerBean.producerId}")  private String producerId;  @Value("${openservices.ons.producerBean.accessKey}")  private String accessKey;  @Value("${openservices.ons.producerBean.secretKey}")  private String secretKey;  private ProducerBean producerBean;  @Value("${openservices.ons.producerBean.ONSAddr}")  private String ONSAddr;  @Bean  public ProducerBean oneProducer() {    ProducerBean producerBean = new ProducerBean();    Properties properties = new Properties();    properties.setProperty(PropertyKeyConst.ProducerId, producerId);    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);    producerBean.setProperties(properties);    return producerBean;  }}

PS:經過這次雙11發現,以上2種方式在大數據量,多線程情況下都不太適用, 性能很差,所以推薦用3

方式3:(不需要xml)

@Componentpublic class ProducerBeanSingleTon {  @Value("${openservices.ons.producerBean.producerId}")  private String producerId;  @Value("${openservices.ons.producerBean.accessKey}")  private String accessKey;  @Value("${openservices.ons.producerBean.secretKey}")  private String secretKey;  @Value("${openservices.ons.producerBean.ONSAddr}")  private String ONSAddr;  private static Producer producer;  private static class SingletonHolder {    private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon();  }  private ProducerBeanSingleTon (){}  public static final ProducerBeanSingleTon getInstance() {    return SingletonHolder.INSTANCE;  }  @PostConstruct  public void init(){    // producer 實例配置初始化    Properties properties = new Properties();    //您在控制臺創建的Producer ID    properties.setProperty(PropertyKeyConst.ProducerId, producerId);    // AccessKey 阿里云身份驗證,在阿里云服務器管理控制臺創建    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);    // SecretKey 阿里云身份驗證,在阿里云服務器管理控制臺創建    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);    //設置發送超時時間,單位毫秒    properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");    // 設置 TCP 接入域名(此處以公共云生產環境為例)    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);    producer = ONSFactory.createProducer(properties);    // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可    producer.start();  }  public Producer getProducer(){    return producer;  }}

spring配置

spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5DialectconsumerConfig.enabled = trueproducerConfig.enabled = true #方式1:scheduling.enabled = false#方式2、3:rocketMQ /u516C/u7F51/u914D/u7F6Eopenservices.ons.producerBean.producerId = pidopenservices.ons.producerBean.accessKey = openservices.ons.producerBean.secretKey = openservices.ons.producerBean.ONSAddr = 公網、杭州公有云生產

方式1投遞消息代碼:

 try {   String jsonC = JsonUtils.toJson(elevenMessage);   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());   SendResult sendResult = producer.send(message);   if (sendResult != null) {     logger.info(".Send mq message success!”;   } else {     logger.warn(".sendResult is null.........");   }   } catch (Exception e) {      logger.warn("DoubleElevenAllPreService");      Thread.sleep(1000);//如果有異常,休眠1秒   }

方式2投遞消息代碼:(可以每發1000個啟動/關閉一次)

   producerBean.start();try {   String jsonC = JsonUtils.toJson(elevenMessage);   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());   SendResult sendResult = producer.send(message);   if (sendResult != null) {     logger.info(".Send mq message success!”;   } else {     logger.warn(".sendResult is null.........");   }   } catch (Exception e) {      logger.warn("DoubleElevenAllPreService");      Thread.sleep(1000);//如果有異常,休眠1秒   }   producerBean.shutdown();

方式3:投遞消息

 try {   String jsonC = JsonUtils.toJson(elevenMessage);   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());   Producer producer = ProducerBeanSingleTon.getInstance().getProducer();   SendResult sendResult = producer.send(message);   if (sendResult != null) {     logger.info("DoubleElevenMidService.Send mq message success! Topic is:"”;   } else {     logger.warn("DoubleElevenMidService.sendResult is null.........");   }   } catch (Exception e) {     logger.error("DoubleElevenMidService Thread.sleep 1 s___error is "+e.getMessage(), e);     Thread.sleep(1000);//如果有異常,休眠1秒   }

發送消息的代碼一定要捕獲異常,不然會重復發送。

這里的TOPIC用自己創建的,elevenMessage是要發送的內容,我這里是自己建的對象

3、消費者

配置啟動類:

@Configuration@ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true)public class ConsumerConfig {  private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());  @Bean  public Consumer consumerFactory(){//不同消費者 這里不能重名    Properties consumerProperties = new Properties();    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);    consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);    consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);    //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);    Consumer consumer = ONSFactory.createConsumer(consumerProperties);    consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());//new對應的監聽器    consumer.start();    logger.info("ConsumerConfig start success.");        return consumer;  }}

CID和ONSADDR一點要選對,用自己的,消費者線程數等可以在這里配置

創建消息監聽器類,消費消息:

@Componentpublic class MessageListener implements MessageListener {  private Logger logger = LoggerFactory.getLogger("remind");  protected static ElevenReposity elevenReposity;  @Resource  public void setElevenReposity(ElevenReposity elevenReposity){    MessageListener .elevenReposity=elevenReposity;  }  @Override  public Action consume(Message message, ConsumeContext consumeContext) {    if(message.getTopic().equals("自己的TOPIC")){//避免消費到其他消息 json轉換報錯      try {      byte[] body = message.getBody();      String res = new String(body);            //res 是生產者傳過來的消息內容        //業務代碼      }else{        logger.warn("!");      }      } catch (Exception e) {        logger.error("MessageListener.consume error:" + e.getMessage(), e);      }      logger.info("MessageListener.Receive message”);      //如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater      return Action.CommitMessage;    }else{      logger.warn();      return Action.ReconsumeLater;    }  }

注意,由于消費者是多線程的,所以對象要用static+set注入,把對象的級別提升到進程,這樣多個線程就可以共用,但是無法調用父類的方法和變量

Springboot整合RocketMQ,Springboot,RocketMQ

消費者狀態可以查看消費者是否連接成功,消費是否延遲,消費速度等

重置消費位點可以清空所有消息

三、注意事項

1、發送的消息體 最大為256KB

2、消息最多存在3天

3、消費端默認線程數是20

4、如果運行過程中出現java掛掉或者cpu占用異常高,可以在發送消息的時候,每發送1000條讓線程休息1s

5、本地測試或啟動的時候,把ONSADDR換成公網,不然報錯無法啟動

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美又大又硬又粗bbbbb| 亚洲精品资源美女情侣酒店| 亚洲免费视频一区二区| 最近免费中文字幕视频2019| 免费91麻豆精品国产自产在线观看| 国产日韩精品一区二区| xxx成人少妇69| 国产精品视频播放| 亚洲成人网在线观看| 欧美性xxxx极品hd满灌| 日韩美女av在线| 国产97在线|日韩| 91在线观看免费高清完整版在线观看| 久久国产精品影视| 亚洲精美色品网站| 成人黄色免费看| 欧美国产日韩xxxxx| 成人黄色免费片| 国产精品久久久精品| 久久久久久97| 亚洲毛片在线免费观看| 久久久人成影片一区二区三区| 久久久久亚洲精品国产| 精品国产一区av| 亚洲free性xxxx护士hd| 国产精品视频免费在线观看| 亚洲精品国产精品国产自| 色综合色综合久久综合频道88| 7777kkkk成人观看| 中文字幕在线亚洲| 国产精品久久久久久搜索| 日韩av在线免费观看| 久久精品91久久香蕉加勒比| 日韩电影免费在线观看| 亚洲国产婷婷香蕉久久久久久| 久久精品精品电影网| 成人欧美一区二区三区黑人| 国产一区二区色| 欧美性极品xxxx做受| 亚洲毛片一区二区| 色先锋久久影院av| 成人在线精品视频| 国产一区二区三区欧美| 日韩国产高清污视频在线观看| 欧美亚洲在线视频| 91av在线视频观看| 亚洲视频在线视频| 97免费视频在线| 国内精品久久久久影院 日本资源| 日韩欧美一区二区在线| 国产97在线|日韩| 亚洲白拍色综合图区| 久久精品久久久久久国产 免费| 久久久久久久亚洲精品| 欧美激情综合色| 成人h视频在线| 国产91成人video| 久久久欧美精品| 亚洲a成v人在线观看| 国产一区二区免费| 日韩中文第一页| 亚洲人成在线观看| 奇门遁甲1982国语版免费观看高清| 久久精品人人爽| xvideos亚洲人网站| 中文字幕国产精品久久| 国产亚洲欧洲黄色| 一区二区欧美在线| 大量国产精品视频| 97免费中文视频在线观看| 亚洲欧美中文字幕| 这里精品视频免费| 久久久久国色av免费观看性色| 欧美富婆性猛交| 欧美在线免费视频| 久久人人爽人人爽人人片av高请| 久久香蕉精品香蕉| 日韩中文字幕视频在线| 亚洲人成网站色ww在线| 久久久综合免费视频| 亚洲跨种族黑人xxx| 欧洲成人午夜免费大片| 国产精品96久久久久久| 97在线日本国产| 97超碰蝌蚪网人人做人人爽| 成人网欧美在线视频| 7m第一福利500精品视频| 一区二区三区国产视频| 久久久久久有精品国产| 在线国产精品播放| 欧美xxxx18国产| 日韩视频在线一区| 亚洲第一精品电影| 一本一本久久a久久精品牛牛影视| 性欧美在线看片a免费观看| 国产在线精品成人一区二区三区| 免费91麻豆精品国产自产在线观看| 久久亚洲国产成人| 美女av一区二区| 亚洲最大成人在线| 国产精品com| 久久久成人精品视频| 日韩欧美在线播放| 久久精品91久久久久久再现| 日本精品免费一区二区三区| 成人黄色午夜影院| 在线日韩精品视频| 国产日本欧美一区二区三区| 正在播放亚洲1区| 国产精品视频最多的网站| 精品国产一区二区在线| 欧美午夜影院在线视频| www.亚洲天堂| 97色在线播放视频| 欧美另类极品videosbest最新版本| 日韩经典一区二区三区| 欧美成人自拍视频| 亚洲一区亚洲二区亚洲三区| 成人亚洲激情网| 精品久久久久久电影| 亚洲free性xxxx护士白浆| 欧美精品激情blacked18| 国产亚洲福利一区| 91九色单男在线观看| 狠狠操狠狠色综合网| 欧美日韩国产限制| 精品小视频在线| 久久久久久久久久久免费精品| 欧美激情2020午夜免费观看| 国外视频精品毛片| 亚洲成年网站在线观看| 欧美交受高潮1| 欧美视频在线观看免费网址| 欧美一区二区三区……| 亚洲伊人久久综合| 日韩成人久久久| 亚洲精品自拍偷拍| 超碰日本道色综合久久综合| 精品无人区太爽高潮在线播放| 亚洲最大的免费| 欧美性xxxxhd| 久久久电影免费观看完整版| 欧美日韩国产一区二区| 国产中文字幕日韩| 欧美成人免费在线视频| 色悠悠久久88| 国模吧一区二区三区| 国产成人精品久久二区二区| 国产一级揄自揄精品视频| 欧美中文在线字幕| 午夜精品久久久久久99热| 美乳少妇欧美精品| 欧洲美女7788成人免费视频| 欧美高清在线播放| 免费99精品国产自在在线| 91精品国产高清自在线看超| 欧美中文字幕在线观看| 性色av一区二区三区| 91情侣偷在线精品国产| 欧美大片欧美激情性色a∨久久| 欧美肥婆姓交大片| 日韩美女写真福利在线观看| 在线视频日本亚洲性| 国产精品日韩一区|