很遺憾,spark的設計架構并不是為了高并發請求而設計的,我們嘗試在網絡條件不好的集群下,進行100并發的查詢,在壓測3天后發現了內存泄露。
a)在進行大量小SQL的壓測過程中發現,有大量的activejob在spark ui上一直處于pending狀態,且永遠不結束,如下圖所示
b)并且發現driver內存爆滿
c)用內存分析分析工具分析了下
短時間內 SPARK 提交大量的SQL ,而且SQL里面存在大量的 union與join的情形,會創建大量的event對象,使得這里的 event數量超過10000個event ,一旦超過10000個event就開始丟棄 event,而這個event是用來回收 資源的,丟棄了 資源就無法回收了。 針對UI頁面的這個問題,我們將這個隊列長度的限制給取消了。
抓包發現
這些event是通過post方法傳遞的,并寫入到隊列里
但是也是由一個單線程進行postToAll的
但是在高并發情況下,單線程的postToAll的速度沒有post的速度快,會導致隊列堆積的event越來越多,如果是持續性的高并發的SQL查詢,這里就會導致內存泄露
接下來我們在分析下postToAll的方法里面,那個路徑是最慢的,導致事件處理最慢的邏輯是那個?
可能您都不敢相信,通過jstack抓取分析,程序大部分時間都阻塞在記錄日志上
可以通過禁用這個地方的log來提升event的速度
log4j.logger.org.apache.spark.scheduler=ERROR
說道這里,Cleaner的設計應該算是spark最糟糕的設計。spark的ContextCleaner是用于回收與清理已經完成了的 廣播boradcast,shuffle數據的。但是高并發下,我們發現這個地方積累的數據會越來越多,最終導致driver內存跑滿而掛掉。
l我們先看下,是如何觸發內存回收的
沒錯,就是通過System.gc() 回收的內存,如果我們在jvm里配置了禁止執行System.gc,這個邏輯就等于廢掉(而且有很多jvm的優化參數一般都推薦配置禁止system.gc 參數)
lclean過程
這是一個單線程的邏輯,而且每次清理都要協同很多機器一同清理,清理速度相對來說比較慢,但是SQL并發很大的時候,產生速度超過了清理速度,整個driver就會發生內存泄露。而且brocadcast如果占用內存太多,也會使用非常多的本地磁盤小文件,我們在測試中發現,高持續性并發的情況下本地磁盤用于存儲blockmanager的目錄占據了我們60%的存儲空間。
我們再來分析下 clean里面,那個邏輯最慢
真正的瓶頸在于blockManagerMaster里面的removeBroadcast,因為這部分邏輯是需要跨越多臺機器的。
針對這種問題,
l我們在SQL層加了一個SQLWAITING邏輯,判斷了堆積長度,如果堆積長度超過了我們的設定值,我們這里將阻塞新的SQL的執行。堆積長度可以通過更改conf目錄下的ya100_env_default.sh中的ydb.sql.waiting.queue.size的值來設置。
l建議集群的帶寬要大一些,萬兆網絡肯定會比千兆網絡的清理速度快很多。
l給集群休息的機會,不要一直持續性的高并發,讓集群有間斷的機會。
l增大spark的線程池,可以調節conf下的spark-defaults.conf的如下值來改善。
發現spark,hive,lucene都非常鐘愛使用threadlocal來管理臨時的session對象,期待SQL執行完畢后這些對象能夠自動釋放,但是與此同時spark又使用了線程池,線程池里的線程一直不結束,這些資源一直就不釋放,時間久了內存就堆積起來了。
針對這個問題,延云修改了spark關鍵線程池的實現,更改為每1個小時,強制更換線程池為新的線程池,舊的線程數能夠自動釋放。
您會發現,隨著請求的session變多,spark會在hdfs和本地磁盤創建海量的磁盤目錄,最終會因為本地磁盤與hdfs上的目錄過多,而導致文件系統和整個文件系統癱瘓。在YDB里面我們針對這種情況也做了處理。
為什么會有這些對象在里面,我們看下源碼
多達10萬多個JDOPersistenceManager
通過debug工具監控發現,spark的listerner隨著時間的積累,通知(post)速度運來越慢
發現所有代碼都卡在了onpostevent上
jstack的結果如下
研究下了調用邏輯如下,發現是循環調用listerners,而且listerner都是空執行才會產生上面的jstack截圖
通過內存發現有30多萬個linterner在里面
發現都是大多數都是同一個listener,我們核對下該處源碼
最終定位問題
確系是這個地方的BUG ,每次創建JDBC連接的時候 ,spark就會增加一個listener, 時間久了,listener就會積累越來越多 針對這個問題 我簡單的修改了一行代碼,開始進入下一輪的壓測
測試發現,即使只有1條記錄,使用 spark進行一次SQL查詢也會耗時1秒,對很多即席查詢來說1秒的等待,對用戶體驗非常不友好。針對這個問題,我們在spark與hive的細節代碼上進行了局部調優,調優后,響應時間由原先的1秒縮減到現在的200~300毫秒。
以下是我們改動過的地方
另外使用Hadoop namenode HA的同學會注意到,如果第一個namenode是standby狀態,這個地方會更慢,就不止一秒,所以除了改動源碼外,如果使用namenode ha的同學一定要注意,將active狀態的node一定要放在前面。
頻繁的hiveConf初始化,需要讀取core-default.xml,hdfs-default.xml,yarn-default.xml
,maPReduce-default.xml,hive-default.xml等多個xml文件,而這些xml文件都是內嵌在jar包內的。
第一,解壓這些jar包需要耗費較多的時間,第二每次都對這些xml文件解析也耗費時間。
lconfiguration的序列化,采用了壓縮的方式進行序列化,有全局鎖的問題
lconfiguration每次序列化,傳遞了太多了沒用的配置項了,1000多個配置項,占用60多Kb。我們剔除了不是必須傳輸的配置項后,縮減到44個配置項,2kb的大小。
由于SPARK-3015 的BUG,spark的cleaner 目前為單線程回收模式。
大家留意spark源碼注釋
其中的單線程瓶頸點在于廣播數據的cleaner,由于要跨越很多臺機器,需要通過akka進行網絡交互。如果回收并發特別大,SPARK-3015 的bug報告會出現網絡擁堵,導致大量的 timeout出現。
為什么回收量特變大呢? 其實是因為cleaner 本質是通過system.gc(),定期執行的,默認積累30分鐘或者進行了gc后才觸發cleaner,這樣就會導致瞬間,大量的akka并發執行,集中釋放,網絡不瞬間癱瘓才不怪呢。但是單線程回收意味著回收速度恒定,如果查詢并發很大,回收速度跟不上cleaner的速度,會導致cleaner積累很多,會導致進程OOM(YDB做了修改,會限制前臺查詢的并發)。不論是OOM還是限制并發都不是我們希望看到的,所以針對高并發情況下,這種單線程的回收速度是滿足不了高并發的需求的。對于官方的這樣的做法,我們表示并不是一個完美的cleaner方案。并發回收一定要支持,只要解決akka的timeout問題即可。所以這個問題要仔細分析一下,akka為什么會timeout,是因為cleaner占據了太多的資源,那么我們是否可以控制下cleaner的并發呢?比如說使用4個并發,而不是默認將全部的并發線程都給占滿呢?這樣及解決了cleaner的回收速度,也解決了akka的問題不是更好么?針對這個問題,我們最終還是選擇了修改spark的ContextCleaner對象,將廣播數據的回收 改成多線程的方式,但現在了線程的并發數量,從而解決了該問題。
新聞熱點
疑難解答