背景
在開發中,往往會遇到一些關于延時任務的需求。例如
•生成訂單30分鐘未支付,則自動取消
•生成訂單60秒后,給用戶發短信
對上述的任務,我們給一個專業的名字來形容,那就是延時任務。
最近需要做一個延時處理的功能,主要是從kafka中消費消息后根據消息中的某個延時字段來進行延時處理,在實際的實現過程中有一些需要注意的地方,記錄如下。
實現過程
說到java中的定時功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下幾點:
1、ScheduledThreadPoolExecutor決定了用ScheduledThreadPoolExecutor來進行實現,接下來就是代碼編寫啦(大體流程代碼)。
主要的延時實現如下:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new ThreadPoolExecutor.AbortPolicy());//從消息中取出延遲時間及相關信息的代碼略int delayTime = 0;executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { //具體操作邏輯 }},0,delayTime, TimeUnit.SECONDS);
其中NamedThreadFactory是我自定義的一個線程工廠,主要給線程池定義名稱及相關日志打印便于后續的問題分析,這里就不多做介紹了。拒絕策略也是采用默認的拒絕策略。
然后測試了一下,滿足目標需求的功能,可以做到延遲指定時間后執行,至此似乎功能就被完成了。
大家可能疑問,這也太簡單了有什么好說的,但是這種方式實現簡單是簡單但是存在一個潛在的問題,問題在哪呢,讓我們看一下ScheduledThreadPoolExecutor的源碼:
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
ScheduledThreadPoolExecutor由于它自身的延時和周期的特性,默認使用了DelayWorkQueue,而并不像我們平時使用的SingleThreadExecutor等構造是可以使用自己定義的LinkedBlockingQueue并且設置隊列大小,問題就出在這里。
DelayWrokQueue是一個無界隊列,而我們的目標數據源是kafka,也就是一個高并發高吞吐的消息隊列,很大可能在某一時間段有大量的消息過來從而導致OOM,在使用多線程時我們是肯定要考慮到OOM的可能性的,因為OOM帶來的后果往往比較嚴重,系統OOM臨時的解決辦法一般只能是重啟,可能會導致用戶數據丟失等不可能挽回的問題,所以從編碼設計階段要采用盡可能穩妥的手段來避免這些問題。
2、采用redis和線程結合
這一次換了思路,采用redis來幫助我們做緩沖,從而避免消息過多OOM的問題。
相關redis zset api:
//添加元素ZADD key score member [[score member] [score member] …]//根據分值及限制數量查詢ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]//從zset中刪除指定成員ZREM key member [member …]
我們采用redis基礎數據結構的zset結構,采用score來存儲我們目標發送時間的數值,整體處理流程如下:
public void onMessage(String topic, String message) { String orderId; int delayTime = 0; try { Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() { }.getType()); if (msgMap.isEmpty()) { return; } LOGGER.info("onMessage kafka content:{}", msgMap.toString()); orderId = msgMap.get("orderId"); if(StringUtils.isNotEmpty(orderId)){ delayTime = Integer.parseInt(msgMap.get("delayTime")); Calendar calendar = Calendar.getInstance(); //計算出預計發送時間 calendar.add(Calendar.MINUTE, delayTime); long sendTime = calendar.getTimeInMillis(); RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId); LOGGER.info("orderId:{}---放入redis中等待發送---sendTime:{}", ---orderId:{}, sendTime); } } catch (Exception e) { LOGGER.info("onMessage 延時發送異常:{}", e); } }
public void run(){ //獲取批量大小 int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100")); try { //批量獲取離發送時間最近的orderNum條數據 Calendar calendar = Calendar.getInstance(); long now = calendar.getTimeInMillis(); //獲取無限早到現在的事件key(防止上次批量數量小于放入數量,存在歷史數據未消費情況) Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum); LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds)); if (CollectionUtils.isNotEmpty(orders)){ //刪除key 防止重復發送 for (String orderId : orderIds) { RedisUtils.getInstance().zrem(Constant.DELAY, orderId); } //接下來執行發送等業務邏輯 } } catch (Exception e) { LOGGER.warn("task.run exception:{}", e); } }
至此完成了依賴redis和線程完成了延時發送的功能。
結語
那么對上面兩種不同的實現方式進行一下優缺點比較:
綜上是本人這次延時功能的實現過程的兩種實現方式的總結,具體采用哪種方式還需大家根據實際情況選擇,希望能給大家帶來幫助。ps:由于本人的技術能力有限,文章中可能出現技術描述不準確或者錯誤的情況懇請各位大佬指出,我立馬進行改正,避免誤導大家,謝謝!
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對VeVb武林網的支持。
新聞熱點
疑難解答
圖片精選