亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

利用Redis實現延時處理的方法實例

2024-07-14 08:43:52
字體:
來源:轉載
供稿:網友

背景

在開發中,往往會遇到一些關于延時任務的需求。例如

•生成訂單30分鐘未支付,則自動取消

•生成訂單60秒后,給用戶發短信

對上述的任務,我們給一個專業的名字來形容,那就是延時任務。

最近需要做一個延時處理的功能,主要是從kafka中消費消息后根據消息中的某個延時字段來進行延時處理,在實際的實現過程中有一些需要注意的地方,記錄如下。

實現過程

說到java中的定時功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下幾點:

  • Timer使用的是絕對時間,系統時間的改變會對Timer產生一定的影響;而ScheduledThreadPoolExecutor使用的是相對時間,所以不會有這個問題。
  • Timer使用單線程來處理任務,長時間運行的任務會導致其他任務的延時處理,而ScheduledThreadPoolExecutor可以自定義線程數量。
  • Timer沒有對運行時異常進行處理,一旦某個任務觸發運行時異常,會導致整個Timer崩潰,而ScheduledThreadPoolExecutor對運行時異常做了捕獲(可以在 afterExecute() 回調方法中進行處理),所以更加安全。

1、ScheduledThreadPoolExecutor決定了用ScheduledThreadPoolExecutor來進行實現,接下來就是代碼編寫啦(大體流程代碼)。

主要的延時實現如下:

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new ThreadPoolExecutor.AbortPolicy());//從消息中取出延遲時間及相關信息的代碼略int delayTime = 0;executorService.scheduleWithFixedDelay(new Runnable() {  @Override  public void run() {   //具體操作邏輯  }},0,delayTime, TimeUnit.SECONDS);

其中NamedThreadFactory是我自定義的一個線程工廠,主要給線程池定義名稱及相關日志打印便于后續的問題分析,這里就不多做介紹了。拒絕策略也是采用默認的拒絕策略。

然后測試了一下,滿足目標需求的功能,可以做到延遲指定時間后執行,至此似乎功能就被完成了。

大家可能疑問,這也太簡單了有什么好說的,但是這種方式實現簡單是簡單但是存在一個潛在的問題,問題在哪呢,讓我們看一下ScheduledThreadPoolExecutor的源碼:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0,  TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}

ScheduledThreadPoolExecutor由于它自身的延時和周期的特性,默認使用了DelayWorkQueue,而并不像我們平時使用的SingleThreadExecutor等構造是可以使用自己定義的LinkedBlockingQueue并且設置隊列大小,問題就出在這里。

DelayWrokQueue是一個無界隊列,而我們的目標數據源是kafka,也就是一個高并發高吞吐的消息隊列,很大可能在某一時間段有大量的消息過來從而導致OOM,在使用多線程時我們是肯定要考慮到OOM的可能性的,因為OOM帶來的后果往往比較嚴重,系統OOM臨時的解決辦法一般只能是重啟,可能會導致用戶數據丟失等不可能挽回的問題,所以從編碼設計階段要采用盡可能穩妥的手段來避免這些問題。

2、采用redis和線程結合

這一次換了思路,采用redis來幫助我們做緩沖,從而避免消息過多OOM的問題。

相關redis zset api:

//添加元素ZADD key score member [[score member] [score member] …]//根據分值及限制數量查詢ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]//從zset中刪除指定成員ZREM key member [member …]

我們采用redis基礎數據結構的zset結構,采用score來存儲我們目標發送時間的數值,整體處理流程如下:

  • 第一步數據存儲:9:10分從kafka接收了一條a的訂單消息,要求30分鐘后進行發貨通知,那我們就將當前時間加上30分鐘然后轉為時間戳作為a的score,key為a的訂單號存入redis中。代碼如下:
public void onMessage(String topic, String message) {  String orderId;		int delayTime = 0;  try {   Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {   }.getType());   if (msgMap.isEmpty()) {    return;   }   LOGGER.info("onMessage kafka content:{}", msgMap.toString());	 orderId = msgMap.get("orderId");   if(StringUtils.isNotEmpty(orderId)){    delayTime = Integer.parseInt(msgMap.get("delayTime"));    Calendar calendar = Calendar.getInstance();    //計算出預計發送時間    calendar.add(Calendar.MINUTE, delayTime);    long sendTime = calendar.getTimeInMillis();    RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);    LOGGER.info("orderId:{}---放入redis中等待發送---sendTime:{}", ---orderId:{}, sendTime);   }  } catch (Exception e) {   LOGGER.info("onMessage 延時發送異常:{}", e);  } }
  • 第二步數據處理:另起一個線程具體調度時間根據業務需求來定,我這里3分鐘執行一次,內部邏輯:從redis中取出一定量的zset數據,如何取呢,使用zset的zrangeByScore方法,根據數據的score進行排序,當然可以帶上時間段,這里從0到現在,來進行消費,需要注意的一點是,在取出數據后我們需要用zrem方法將取出的數據從zset中刪除,防止其他線程重復消費數據。在此之后進行接下來的發貨通知等相關邏輯。代碼如下:
public void run(){  //獲取批量大小  int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));  try {   //批量獲取離發送時間最近的orderNum條數據	 Calendar calendar = Calendar.getInstance();	 long now = calendar.getTimeInMillis();	 //獲取無限早到現在的事件key(防止上次批量數量小于放入數量,存在歷史數據未消費情況)	 Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);	 LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));   if (CollectionUtils.isNotEmpty(orders)){    //刪除key 防止重復發送    for (String orderId : orderIds) {     RedisUtils.getInstance().zrem(Constant.DELAY, orderId);    }	  //接下來執行發送等業務邏輯        }  } catch (Exception e) {   LOGGER.warn("task.run exception:{}", e);  } }

至此完成了依賴redis和線程完成了延時發送的功能。

結語

那么對上面兩種不同的實現方式進行一下優缺點比較:

  • 第一種方式實現簡單,不依賴外部組件,能夠快速的實現目標功能,但缺點也很明顯,需要在特定的場景下使用,如果是我這種消息量大的情況下使用很可能是有問題,當然在數據源消息不多的情況下不失為好的選擇。
  • 第二種方式實現稍微復雜一點,但是能夠適應消息量大的場景,采用redis的zset作為了“中間件”的效果,并且幫助我們進行延時的功能實現能夠較好的適應高并發場景,缺點在于在編寫的過程中需要考慮實際的因素較多,例如線程的執行周期時間,發送可能會有一定時間的延遲,批量數據大小的設置等等。

綜上是本人這次延時功能的實現過程的兩種實現方式的總結,具體采用哪種方式還需大家根據實際情況選擇,希望能給大家帶來幫助。ps:由于本人的技術能力有限,文章中可能出現技術描述不準確或者錯誤的情況懇請各位大佬指出,我立馬進行改正,避免誤導大家,謝謝!

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對VeVb武林網的支持。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
中文在线资源观看视频网站免费不卡| 亚洲国产精品国自产拍av秋霞| 亚洲欧美日韩国产中文| 日韩小视频网址| 日韩美女免费视频| 成人国产精品免费视频| 亲子乱一区二区三区电影| 精品亚洲aⅴ在线观看| 亚洲精品自在久久| 欧美在线中文字幕| 日韩精品欧美激情| 国产精品一区二区性色av| 国产成人亚洲综合| 欧美电影电视剧在线观看| 亚洲精品国产精品自产a区红杏吧| 日本久久中文字幕| 亚洲精品中文字幕av| 日韩精品丝袜在线| 亚洲最大av网站| 日韩av在线播放资源| 91精品视频免费观看| 欧美午夜无遮挡| 在线免费看av不卡| 欧美精品video| 日日狠狠久久偷偷四色综合免费| 久久亚洲春色中文字幕| 国产亚洲精品美女| 日韩欧美在线视频| 日本aⅴ大伊香蕉精品视频| 国产日韩中文字幕在线| 91免费人成网站在线观看18| 欧美精品久久久久久久久| 97视频在线观看亚洲| 97视频在线观看视频免费视频| 国产精品pans私拍| 国产va免费精品高清在线观看| 日韩在线视频一区| 亚洲福利视频久久| www国产精品com| 国产精品中文字幕在线观看| 欧美午夜美女看片| 亚洲福利视频免费观看| 91中文字幕在线| 日韩成人黄色av| 国产精品久久久久久久久久尿| 亚洲福利在线看| 自拍亚洲一区欧美另类| 欧美性精品220| 欧美精品在线免费播放| 91精品国产成人www| 成人在线视频福利| 亚洲人成五月天| 日韩成人免费视频| 日韩久久免费电影| 亚洲欧美日韩天堂一区二区| 国产不卡视频在线| 亚洲激情视频网| 久久人人爽人人| 亚洲级视频在线观看免费1级| 欧美黄色片视频| 最近2019中文字幕在线高清| 亚洲第一区在线| 久久99热精品| 欧美专区福利在线| 欧美激情精品久久久久久大尺度| 日韩一区二区久久久| 久久精品一本久久99精品| 成人在线小视频| 亚洲精品免费在线视频| 亚洲国产精品嫩草影院久久| 精品久久久久久久久久国产| 国产成人精品网站| 亚州av一区二区| 日韩av手机在线看| 国产亚洲精品综合一区91| 亚洲国产欧美一区二区三区久久| 日本精品久久久久久久| 91精品国产综合久久久久久久久| 色先锋资源久久综合5566| 中文字幕亚洲字幕| 欧美亚州一区二区三区| 精品亚洲va在线va天堂资源站| 欧美超级免费视 在线| 欧美性xxxx极品hd欧美风情| 亚洲jizzjizz日本少妇| 欧美黑人一级爽快片淫片高清| 国模吧一区二区三区| 欧美精品在线免费| 欧美日韩免费观看中文| 久久五月天综合| 91网在线免费观看| 91影院在线免费观看视频| 日韩在线一区二区三区免费视频| 精品国产一区二区三区久久狼5月| 国产亚洲精品久久久久久牛牛| 久久噜噜噜精品国产亚洲综合| 狠狠躁夜夜躁久久躁别揉| 国产精品久久91| 亚洲精品理论电影| 久久天天躁狠狠躁老女人| 九九九热精品免费视频观看网站| 亚洲天堂开心观看| 91精品久久久久久| 国产精品一区久久久| 九九热在线精品视频| 久久久久久久网站| 91在线观看免费| 亚洲国产精品久久久久| 欧美日韩性视频| 国产乱肥老妇国产一区二| 中文字幕在线观看亚洲| 中文字幕精品影院| 色婷婷综合成人| 中文字幕亚洲激情| 久久久久久久久久婷婷| 日韩一区二区三区xxxx| 亚洲精品中文字| 亚洲高清av在线| 国产成人精品综合| 在线成人一区二区| 欧美午夜久久久| 国产婷婷色综合av蜜臀av| 亚洲老板91色精品久久| 国产精品久久久久久久久借妻| 国产成人精品国内自产拍免费看| 日韩理论片久久| 亚洲999一在线观看www| 自拍视频国产精品| 亚洲一区二区三区视频播放| 色偷偷偷综合中文字幕;dd| 久久国产天堂福利天堂| 亚洲精品久久7777777| 亚洲色图50p| 91在线免费视频| 国产精品自拍视频| 日本高清不卡的在线| 亚洲性无码av在线| 精品一区二区三区三区| 91久久久久久| 色系列之999| 午夜精品福利视频| 亚洲欧美日韩综合| 国产成人高清激情视频在线观看| 国产精品亚洲аv天堂网| 在线观看成人黄色| 国产精品美女无圣光视频| 亚洲精美色品网站| 久久亚洲精品毛片| 久久夜精品香蕉| 欧美极品欧美精品欧美视频| 国产原创欧美精品| 国产精品无码专区在线观看| 红桃视频成人在线观看| 国内精品久久久久久久久| 亚洲一区二区中文字幕| 中文字幕视频一区二区在线有码| 伊人伊成久久人综合网小说| 日韩久久免费电影| 精品国产鲁一鲁一区二区张丽| 亚洲美女在线观看| 高清日韩电视剧大全免费播放在线观看| 丝袜情趣国产精品| 91产国在线观看动作片喷水| 日韩中文视频免费在线观看|