累加器(accumulator)是Spark中提供的一種分布式的變量機制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個常見用途是在調試時對作業執行過程中的事件進行計數。
累加器簡單使用
Spark內置的提供了Long和Double類型的累加器。下面是一個簡單的使用示例,在這個例子中我們在過濾掉RDD中奇數的同時進行計數,最后計算剩下整數的和。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //統計奇數的個數 val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{ if(n%2!=0) accum.add(1L) n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop()
結果為:
sum: 20
accum: 5
這是結果正常的情況,但是在使用累加器的過程中如果對于spark的執行過程理解的不夠深入就會遇到兩類典型的錯誤:少加(或者沒加)、多加。
自定義累加器
自定義累加器類型的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。官方同時給出了一個實現的示例:CollectionAccumulator類,這個類允許以集合的形式收集spark應用執行過程中的一些信息。例如,我們可以用這個類收集Spark處理數據時的一些細節,當然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的信息的規模要加以控制,不宜過大。
繼承AccumulatorV2類,并復寫它的所有方法
package sparkimport constant.Constantimport org.apache.spark.util.AccumulatorV2import util.getFieldFromConcatStringimport util.setFieldFromConcatStringopen class SessionAccmulator : AccumulatorV2<String, String>() { private var result = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" override fun value(): String { return this.result } /** * 合并數據 */ override fun merge(other: AccumulatorV2<String, String>?) { if (other == null) return else { if (other is SessionAccmulator) { var newResult = "" val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s, Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m, Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m, Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9, Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60) resultArray.forEach { val oldValue = other.result.getFieldFromConcatString("|", it) if (oldValue.isNotEmpty()) { val newValue = oldValue.toInt() + 1 //找到原因,一直在循環賦予值,debug30分鐘 很煩 if (newResult.isEmpty()){ newResult = result.setFieldFromConcatString("|", it, newValue.toString()) } //問題就在于這里,自定義沒有寫錯,合并錯了 newResult = newResult.setFieldFromConcatString("|", it, newValue.toString()) } } result = newResult } } } override fun copy(): AccumulatorV2<String, String> { val sessionAccmulator = SessionAccmulator() sessionAccmulator.result = this.result return sessionAccmulator } override fun add(p0: String?) { val v1 = this.result val v2 = p0 if (v2.isNullOrEmpty()){ return }else{ var newResult = "" val oldValue = v1.getFieldFromConcatString("|", v2!!) if (oldValue.isNotEmpty()){ val newValue = oldValue.toInt() + 1 newResult = result.setFieldFromConcatString("|", v2, newValue.toString()) } result = newResult } } override fun reset() { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" result = newResult } override fun isZero(): Boolean { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" return this.result == newResult }}
方法介紹
value方法:獲取累加器中的值
merge方法:該方法特別重要,一定要寫對,這個方法是各個task的累加器進行合并的方法(下面介紹執行流程中將要用到)
iszero方法:判斷是否為初始值
reset方法:重置累加器中的值
copy方法:拷貝累加器
spark中累加器的執行流程:
首先有幾個task,spark engine就調用copy方法拷貝幾個累加器(不注冊的),然后在各個task中進行累加(注意在此過程中,被最初注冊的累加器的值是不變的),執行最后將調用merge方法和各個task的結果累計器進行合并(此時被注冊的累加器是初始值)
總結
以上就是本文關于Spark自定義累加器的使用實例詳解的全部內容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復大家的。
新聞熱點
疑難解答
圖片精選