亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁(yè) > 開(kāi)發(fā) > Java > 正文

利用Redis實(shí)現(xiàn)延時(shí)處理的方法實(shí)例

2024-07-14 08:43:52
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

背景

在開(kāi)發(fā)中,往往會(huì)遇到一些關(guān)于延時(shí)任務(wù)的需求。例如

•生成訂單30分鐘未支付,則自動(dòng)取消

•生成訂單60秒后,給用戶發(fā)短信

對(duì)上述的任務(wù),我們給一個(gè)專業(yè)的名字來(lái)形容,那就是延時(shí)任務(wù)。

最近需要做一個(gè)延時(shí)處理的功能,主要是從kafka中消費(fèi)消息后根據(jù)消息中的某個(gè)延時(shí)字段來(lái)進(jìn)行延時(shí)處理,在實(shí)際的實(shí)現(xiàn)過(guò)程中有一些需要注意的地方,記錄如下。

實(shí)現(xiàn)過(guò)程

說(shuō)到j(luò)ava中的定時(shí)功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下幾點(diǎn):

  • Timer使用的是絕對(duì)時(shí)間,系統(tǒng)時(shí)間的改變會(huì)對(duì)Timer產(chǎn)生一定的影響;而ScheduledThreadPoolExecutor使用的是相對(duì)時(shí)間,所以不會(huì)有這個(gè)問(wèn)題。
  • Timer使用單線程來(lái)處理任務(wù),長(zhǎng)時(shí)間運(yùn)行的任務(wù)會(huì)導(dǎo)致其他任務(wù)的延時(shí)處理,而ScheduledThreadPoolExecutor可以自定義線程數(shù)量。
  • Timer沒(méi)有對(duì)運(yùn)行時(shí)異常進(jìn)行處理,一旦某個(gè)任務(wù)觸發(fā)運(yùn)行時(shí)異常,會(huì)導(dǎo)致整個(gè)Timer崩潰,而ScheduledThreadPoolExecutor對(duì)運(yùn)行時(shí)異常做了捕獲(可以在 afterExecute() 回調(diào)方法中進(jìn)行處理),所以更加安全。

1、ScheduledThreadPoolExecutor決定了用ScheduledThreadPoolExecutor來(lái)進(jìn)行實(shí)現(xiàn),接下來(lái)就是代碼編寫啦(大體流程代碼)。

主要的延時(shí)實(shí)現(xiàn)如下:

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new ThreadPoolExecutor.AbortPolicy());//從消息中取出延遲時(shí)間及相關(guān)信息的代碼略int delayTime = 0;executorService.scheduleWithFixedDelay(new Runnable() {  @Override  public void run() {   //具體操作邏輯  }},0,delayTime, TimeUnit.SECONDS);

其中NamedThreadFactory是我自定義的一個(gè)線程工廠,主要給線程池定義名稱及相關(guān)日志打印便于后續(xù)的問(wèn)題分析,這里就不多做介紹了。拒絕策略也是采用默認(rèn)的拒絕策略。

然后測(cè)試了一下,滿足目標(biāo)需求的功能,可以做到延遲指定時(shí)間后執(zhí)行,至此似乎功能就被完成了。

大家可能疑問(wèn),這也太簡(jiǎn)單了有什么好說(shuō)的,但是這種方式實(shí)現(xiàn)簡(jiǎn)單是簡(jiǎn)單但是存在一個(gè)潛在的問(wèn)題,問(wèn)題在哪呢,讓我們看一下ScheduledThreadPoolExecutor的源碼:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0,  TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}

ScheduledThreadPoolExecutor由于它自身的延時(shí)和周期的特性,默認(rèn)使用了DelayWorkQueue,而并不像我們平時(shí)使用的SingleThreadExecutor等構(gòu)造是可以使用自己定義的LinkedBlockingQueue并且設(shè)置隊(duì)列大小,問(wèn)題就出在這里。

DelayWrokQueue是一個(gè)無(wú)界隊(duì)列,而我們的目標(biāo)數(shù)據(jù)源是kafka,也就是一個(gè)高并發(fā)高吞吐的消息隊(duì)列,很大可能在某一時(shí)間段有大量的消息過(guò)來(lái)從而導(dǎo)致OOM,在使用多線程時(shí)我們是肯定要考慮到OOM的可能性的,因?yàn)镺OM帶來(lái)的后果往往比較嚴(yán)重,系統(tǒng)OOM臨時(shí)的解決辦法一般只能是重啟,可能會(huì)導(dǎo)致用戶數(shù)據(jù)丟失等不可能挽回的問(wèn)題,所以從編碼設(shè)計(jì)階段要采用盡可能穩(wěn)妥的手段來(lái)避免這些問(wèn)題。

2、采用redis和線程結(jié)合

這一次換了思路,采用redis來(lái)幫助我們做緩沖,從而避免消息過(guò)多OOM的問(wèn)題。

相關(guān)redis zset api:

//添加元素ZADD key score member [[score member] [score member] …]//根據(jù)分值及限制數(shù)量查詢ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]//從zset中刪除指定成員ZREM key member [member …]

我們采用redis基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)的zset結(jié)構(gòu),采用score來(lái)存儲(chǔ)我們目標(biāo)發(fā)送時(shí)間的數(shù)值,整體處理流程如下:

  • 第一步數(shù)據(jù)存儲(chǔ):9:10分從kafka接收了一條a的訂單消息,要求30分鐘后進(jìn)行發(fā)貨通知,那我們就將當(dāng)前時(shí)間加上30分鐘然后轉(zhuǎn)為時(shí)間戳作為a的score,key為a的訂單號(hào)存入redis中。代碼如下:
public void onMessage(String topic, String message) {  String orderId;		int delayTime = 0;  try {   Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {   }.getType());   if (msgMap.isEmpty()) {    return;   }   LOGGER.info("onMessage kafka content:{}", msgMap.toString());	 orderId = msgMap.get("orderId");   if(StringUtils.isNotEmpty(orderId)){    delayTime = Integer.parseInt(msgMap.get("delayTime"));    Calendar calendar = Calendar.getInstance();    //計(jì)算出預(yù)計(jì)發(fā)送時(shí)間    calendar.add(Calendar.MINUTE, delayTime);    long sendTime = calendar.getTimeInMillis();    RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);    LOGGER.info("orderId:{}---放入redis中等待發(fā)送---sendTime:{}", ---orderId:{}, sendTime);   }  } catch (Exception e) {   LOGGER.info("onMessage 延時(shí)發(fā)送異常:{}", e);  } }
  • 第二步數(shù)據(jù)處理:另起一個(gè)線程具體調(diào)度時(shí)間根據(jù)業(yè)務(wù)需求來(lái)定,我這里3分鐘執(zhí)行一次,內(nèi)部邏輯:從redis中取出一定量的zset數(shù)據(jù),如何取呢,使用zset的zrangeByScore方法,根據(jù)數(shù)據(jù)的score進(jìn)行排序,當(dāng)然可以帶上時(shí)間段,這里從0到現(xiàn)在,來(lái)進(jìn)行消費(fèi),需要注意的一點(diǎn)是,在取出數(shù)據(jù)后我們需要用zrem方法將取出的數(shù)據(jù)從zset中刪除,防止其他線程重復(fù)消費(fèi)數(shù)據(jù)。在此之后進(jìn)行接下來(lái)的發(fā)貨通知等相關(guān)邏輯。代碼如下:
public void run(){  //獲取批量大小  int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));  try {   //批量獲取離發(fā)送時(shí)間最近的orderNum條數(shù)據(jù)	 Calendar calendar = Calendar.getInstance();	 long now = calendar.getTimeInMillis();	 //獲取無(wú)限早到現(xiàn)在的事件key(防止上次批量數(shù)量小于放入數(shù)量,存在歷史數(shù)據(jù)未消費(fèi)情況)	 Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);	 LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));   if (CollectionUtils.isNotEmpty(orders)){    //刪除key 防止重復(fù)發(fā)送    for (String orderId : orderIds) {     RedisUtils.getInstance().zrem(Constant.DELAY, orderId);    }	  //接下來(lái)執(zhí)行發(fā)送等業(yè)務(wù)邏輯        }  } catch (Exception e) {   LOGGER.warn("task.run exception:{}", e);  } }

至此完成了依賴redis和線程完成了延時(shí)發(fā)送的功能。

結(jié)語(yǔ)

那么對(duì)上面兩種不同的實(shí)現(xiàn)方式進(jìn)行一下優(yōu)缺點(diǎn)比較:

  • 第一種方式實(shí)現(xiàn)簡(jiǎn)單,不依賴外部組件,能夠快速的實(shí)現(xiàn)目標(biāo)功能,但缺點(diǎn)也很明顯,需要在特定的場(chǎng)景下使用,如果是我這種消息量大的情況下使用很可能是有問(wèn)題,當(dāng)然在數(shù)據(jù)源消息不多的情況下不失為好的選擇。
  • 第二種方式實(shí)現(xiàn)稍微復(fù)雜一點(diǎn),但是能夠適應(yīng)消息量大的場(chǎng)景,采用redis的zset作為了“中間件”的效果,并且?guī)椭覀冞M(jìn)行延時(shí)的功能實(shí)現(xiàn)能夠較好的適應(yīng)高并發(fā)場(chǎng)景,缺點(diǎn)在于在編寫的過(guò)程中需要考慮實(shí)際的因素較多,例如線程的執(zhí)行周期時(shí)間,發(fā)送可能會(huì)有一定時(shí)間的延遲,批量數(shù)據(jù)大小的設(shè)置等等。

綜上是本人這次延時(shí)功能的實(shí)現(xiàn)過(guò)程的兩種實(shí)現(xiàn)方式的總結(jié),具體采用哪種方式還需大家根據(jù)實(shí)際情況選擇,希望能給大家?guī)?lái)幫助。ps:由于本人的技術(shù)能力有限,文章中可能出現(xiàn)技術(shù)描述不準(zhǔn)確或者錯(cuò)誤的情況懇請(qǐng)各位大佬指出,我立馬進(jìn)行改正,避免誤導(dǎo)大家,謝謝!

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)VeVb武林網(wǎng)的支持。


注:相關(guān)教程知識(shí)閱讀請(qǐng)移步到JAVA教程頻道。
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
国产精品久久久久久久久动漫| 精品视频中文字幕| 9.1麻豆精品| 电影在线一区| 一区二区三区中文字幕在线观看| 久久国产露脸精品国产| 久久久久免费精品| 高潮一区二区三区乱码| 日韩中文字幕免费视频| 亚洲石原莉奈一区二区在线观看| 影音先锋男人资源站| 免费大片黄在线观看视频网站| 久久精品黄色片| 四虎精品影院在线观看视频| 老司机aⅴ在线精品导航| 亚洲欧美日韩一二三区| 欧美黄色一区二区| 欧美交换国产一区内射| 欧美黄色一区二区三区| 欧美丰满熟妇xxxxx| 岛国精品在线| 日韩精品999| 在线观看免费视频综合| 性色a∨人人爽网站| 亚洲精品乱码久久久久久不卡| 91久久国产综合久久91| 欧美久久久久中文字幕| 黄色成人精品网站| 男男一级淫片免费播放| 大桥未久一区二区三区| 婷婷婷国产在线视频| 国产免费黄色网址| 亚洲国产精品高清| 大陆极品少妇内射aaaaa| 媚黑女一区二区| 国产午夜精品一区二区| 色综合视频在线| 精品少妇人妻av一区二区| 国产精品a成v人在线播放| 另类尿喷潮videofree| 91视频.com| 国产乱码精品一区二区三区忘忧草| 毛片网站在线免费观看| 香蕉成人在线| 91亚洲精品视频在线观看| 人妻精品一区一区三区蜜桃91| 6080午夜伦理| 日韩黄色在线免费观看| 国产美女高潮一区二区三区| 国产一区二区在线免费观看| 福利在线免费| 疯狂试爱三2浴室激情视频| 国产最顶级的黄色片在线免费观看| 91原色影院| 国产精品乱子久久久久| 欧美在线视频一区二区三区| 中文字幕欧美日韩一区二区| 蜜桃传媒视频第一区入口在线看| 97精品视频在线看| 欧美顶级毛片在线播放| 国产精品中文字幕亚洲欧美| 最近2019中文字幕大全第二页| 亚洲精品一区二区三区影院| 日韩福利二区| 欧洲精品中文字幕| 牲欧美videos精品| 久久久久久久久久久久久91| 日韩欧美激情电影| 欧美日本黄视频| 在线日韩欧美| 日本一级淫片演员| 国产精品视频yy9299一区| 色先锋最新资源| 日产精品99久久久久久| 亚洲成色777777在线观看影院| 欧亚一区二区| 久久国产中文字幕| 亚洲图片小说综合| 欧美日韩中文字幕视频| 国产精品 欧美 日韩| 国产私拍精品| 黄色免费在线网站| 免费成人看片网址| 久久这里只有精品1| 一级黄色大片网站| 在线观看视频h| 日本一区二区不卡高清更新| 亚洲午夜精品久久久久久性色| 福利社在线免费视频| 日韩国产成人精品| av噜噜色噜噜久久| 亚洲欧美国内爽妇网| 亚洲天堂中文字幕在线| 在线播放日韩导航| 日韩av在线导航| 日韩成人激情视频| 亚洲国产视频网站| xfav资源| 亚洲日本韩国在线| 99re视频在线播放| 亚洲视频在线播放| 亚洲成人免费av| 亚洲不卡在线观看| 福利资源在线观看| 小说区视频区图片区| 国产精品伊人久久| 欧美精品高清视频| 一本大道伊人av久久综合| 影音先锋男人在线| 久久精品国产一区二区三区不卡| 欧美日韩国产高清一区二区| 亚洲成人在线视频网站| 人成福利视频在线观看| 日本成片免费高清| 欧美在线91| 国产欧美日韩视频| 欧美精品久久久久久久自慰| 中文字幕欧美在线| 李宗瑞系列合集久久| 美女不穿衣服的网站| 日韩高清av一区二区三区| 久久精品国产亚洲av无码娇色| 在线视频欧美日韩| 欧美艳星brazzers| 欧美激情在线一区| 传媒视频在线| 美女18一级毛片一品久道久久综合| 在线观看不卡av| av福利在线| av高清久久久| 欧美精品tushy高清| 国产精品久久久久久久久动漫| 国产又粗又长又爽| 日韩三级视频在线看| 国产91在线播放九色| 人妻中文字幕一区二区三区| 久久综合导航| 激情自拍一区| 中文在线一区二区三区| 国产亚洲精品久久久久久无几年桃| 色视频在线免费| 黑人与娇小精品av专区| 亚洲欧美日韩综合国产aⅴ| 中文字幕日韩三级片| 一区二区三区四区影院| 午夜久久久久久噜噜噜噜| 日韩中文字幕1| 欧美精品久久久久久久久46p| 男人c女人视频| 亚洲在线久久| 日本亚洲一区二区| 国产youjizz在线| 色哟哟亚洲精品一区二区| 中文字幕中文字幕一区三区| 亚洲韩日在线| 国产精彩视频在线| 国产精品美女久久久久av爽李琼| 视频二区不卡| 在线看的黄色网址| 日本精品视频在线播放| 欧美一区二区麻豆红桃视频| 日韩高清在线播放| 91影院在线免费观看视频| 麻豆成人免费视频| 国产aⅴ精品一区二区四区| 人妻人人澡人人添人人爽| 最新日韩av| 日韩女优制服丝袜电影| 一二三四区精品视频| 久久99蜜桃精品久久久久小说| 97香蕉久久夜色精品国产| 海角社区69精品视频| 欧美极品视频在线观看| 国产91足控脚交在线观看| 亚洲色图视频在线观看| 中文字幕一区二区三区四区欧美| 亚洲一级片免费| 欧美成人一区二区三区在线观看| 日本一级在线观看| 欧美精品一区二区三区久久| 在线免费看av| 国产精品久久久久免费a∨大胸| 亚洲午夜女主播在线直播| 在线成人亚洲| 乱妇乱女熟妇熟女网站| 粉嫩aⅴ一区二区三区四区| 日日摸日日碰夜夜爽无码| 性xxxfreexxxx性欧美| 国产伦子伦对白在线播放观看| 亚洲一区二区三区综合| 男人操女人逼免费视频| 白丝女仆被免费网站| 中文在线免费一区三区高中清不卡| 欧美激情aaaa| 日日干日日操| 欧美成人自拍| 青娱乐国产视频| 色天天色综合| 草草视频在线| 日韩一级完整毛片| 麻豆视频免费看| 狠狠躁夜夜躁人人躁婷婷91| 婷婷六月国产精品久久不卡| 一级一片免费看| 国产一区二区三区久久| 轻轻草在线视频| 国产精品一区2区| 咪咪色在线视频| 日本xxxxxxxxx18| 无码aⅴ精品一区二区三区| 91精品国产91久久久久久不卡| 蜜桃av鲁一鲁一鲁一鲁俄罗斯的| 国产写真视频在线观看| 91在线视频网址| 蜜芽在线视频| 四虎884aa成人精品| 国产亚洲视频在线| 久久一综合视频| 黄色片一区二区| 亚洲午夜成aⅴ人片| 不卡的av在线| 鲁大师影院一区二区三区| 国产精品手机视频| 无码人妻少妇色欲av一区二区| 四虎国产成人免费观看| 成年视频在线观看| 国产主播自拍av| 欧美俄罗斯乱妇| 女同激情久久av久久| 在线视频这里只有精品| 2021天堂中文幕一二区在线观| 欧美精品一区在线观看| 国产一区二区三区四区| 色综合99久久久无码国产精品| 亚洲高清在线免费观看| 另类专区欧美蜜桃臀第一页| 国产精品成人观看视频国产奇米| 网友自拍视频在线| 亚洲激情二区| 天堂在线亚洲| 欧美少妇一级片| 黄色大片在线看| 国产女18毛片多18精品| 国产黄色一级片| 特大巨黑人吊性xxx视频| 欧美一级免费在线| 欧美aa在线观看| 国产高清免费在线| 夜夜爽8888| 精品国产伦一区二区三区观看说明| 午夜国产精品视频| 欧美一级特黄aaaaaa在线看片| 亚洲国产精品视频在线观看| 日韩欧美电影在线观看| 久久亚洲电影天堂| 天天av天天翘| 欧美日免费三级在线| 日本成人在线不卡视频| 黄在线免费观看| 日韩伦理在线视频| 亚洲日本欧美在线| 久久中文字幕免费| 亚洲精品国产精品国自产观看浪潮| 麻豆精品免费视频入口| 成人资源视频网站免费| 乳奴隷乳フ辱| 国产精品一区二区免费看| 一区二区激情| 亚洲电影观看| 亚洲人成人77777线观看| 欧美亚洲免费在线一区| 成年人小视频网站| 免费的黄网站在线观看| 性折磨bdsm欧美激情另类| 久久精品免视看国产成人| 高清视频在线观看一区| 欧美日韩免费观看一区| 国产视频在线观看一区二区| 91av手机在线| 成人精品视频| 无限资源日本好片| 国产精品一区二区三区免费视频| 日本亚洲欧美天堂免费| 91精品久久久久久蜜臀| 刘亦菲久久免费一区二区| 日韩精品电影一区二区三区| 9l视频自拍蝌蚪9l视频| 最新中文字幕在线视频| 性欧美8khd高清极品| 精品一区二区三区免费毛片爱| 国产a级片免费观看| 肉丝一区二区| 天天操天天干天天爽| 在线中文字幕视频| 亚洲一区二区电影| 在线观看黄色av| 日韩精品99| 91老司机福利在线| 欧美亚洲国产成人精品| 亚洲成a人片在线www| 精品毛片网大全| 欧美激情图片小说| 丰满少妇在线观看| 亚洲精品中文字幕乱码三区不卡| 福利一区二区免费视频| 免费黄色小视频在线观看| 中文字幕91视频| 中文字幕1区2区| 少妇一级淫免费观看| 九色porny自拍视频| 中文字幕一二三区在线观看| 91沈先生作品| 国产无遮挡在线观看| 日韩一级片一区二区| 永久免费网站在线| 狠狠入ady亚洲精品经典电影| 青青草免费观看视频| 久久亚洲中文字幕无码| 男女爱爱视频免费| 国产片乱18免费| 久久久久久久性潮| 精品国产av一区二区| 写真福利精品福利在线观看| 日本一区二区在线免费观看| 久久久久国产精品嫩草影院| 色内内免费视频播放| 亚洲欧美制服另类日韩| 99久久婷婷这里只有精品|