亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 服務器 > Web服務器 > 正文

Spark自定義累加器的使用實例詳解

2024-09-01 13:53:05
字體:
來源:轉載
供稿:網友

累加器(accumulator)是Spark中提供的一種分布式的變量機制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個常見用途是在調試時對作業執行過程中的事件進行計數。

累加器簡單使用

Spark內置的提供了Long和Double類型的累加器。下面是一個簡單的使用示例,在這個例子中我們在過濾掉RDD中奇數的同時進行計數,最后計算剩下整數的和。

val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //統計奇數的個數 val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{  if(n%2!=0) accum.add(1L)   n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop() 

結果為:

sum: 20
accum: 5

這是結果正常的情況,但是在使用累加器的過程中如果對于spark的執行過程理解的不夠深入就會遇到兩類典型的錯誤:少加(或者沒加)、多加。

自定義累加器

自定義累加器類型的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。官方同時給出了一個實現的示例:CollectionAccumulator類,這個類允許以集合的形式收集spark應用執行過程中的一些信息。例如,我們可以用這個類收集Spark處理數據時的一些細節,當然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的信息的規模要加以控制,不宜過大。

繼承AccumulatorV2類,并復寫它的所有方法

package sparkimport constant.Constantimport org.apache.spark.util.AccumulatorV2import util.getFieldFromConcatStringimport util.setFieldFromConcatStringopen class SessionAccmulator : AccumulatorV2<String, String>() {  private var result = Constant.SESSION_COUNT + "=0|"+      Constant.TIME_PERIOD_1s_3s + "=0|"+      Constant.TIME_PERIOD_4s_6s + "=0|"+      Constant.TIME_PERIOD_7s_9s + "=0|"+      Constant.TIME_PERIOD_10s_30s + "=0|"+      Constant.TIME_PERIOD_30s_60s + "=0|"+      Constant.TIME_PERIOD_1m_3m + "=0|"+      Constant.TIME_PERIOD_3m_10m + "=0|"+      Constant.TIME_PERIOD_10m_30m + "=0|"+      Constant.TIME_PERIOD_30m + "=0|"+      Constant.STEP_PERIOD_1_3 + "=0|"+      Constant.STEP_PERIOD_4_6 + "=0|"+      Constant.STEP_PERIOD_7_9 + "=0|"+      Constant.STEP_PERIOD_10_30 + "=0|"+      Constant.STEP_PERIOD_30_60 + "=0|"+      Constant.STEP_PERIOD_60 + "=0"  override fun value(): String {    return this.result  }  /**   * 合并數據   */  override fun merge(other: AccumulatorV2<String, String>?) {    if (other == null) return else {      if (other is SessionAccmulator) {        var newResult = ""        val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,            Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,            Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,            Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,            Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)        resultArray.forEach {          val oldValue = other.result.getFieldFromConcatString("|", it)          if (oldValue.isNotEmpty()) {            val newValue = oldValue.toInt() + 1            //找到原因,一直在循環賦予值,debug30分鐘 很煩            if (newResult.isEmpty()){              newResult = result.setFieldFromConcatString("|", it, newValue.toString())            }            //問題就在于這里,自定義沒有寫錯,合并錯了            newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())          }        }        result = newResult      }    }  }  override fun copy(): AccumulatorV2<String, String> {    val sessionAccmulator = SessionAccmulator()    sessionAccmulator.result = this.result    return sessionAccmulator  }  override fun add(p0: String?) {    val v1 = this.result    val v2 = p0    if (v2.isNullOrEmpty()){      return    }else{      var newResult = ""      val oldValue = v1.getFieldFromConcatString("|", v2!!)      if (oldValue.isNotEmpty()){        val newValue = oldValue.toInt() + 1        newResult = result.setFieldFromConcatString("|", v2, newValue.toString())      }      result = newResult    }  }  override fun reset() {    val newResult = Constant.SESSION_COUNT + "=0|"+        Constant.TIME_PERIOD_1s_3s + "=0|"+        Constant.TIME_PERIOD_4s_6s + "=0|"+        Constant.TIME_PERIOD_7s_9s + "=0|"+        Constant.TIME_PERIOD_10s_30s + "=0|"+        Constant.TIME_PERIOD_30s_60s + "=0|"+        Constant.TIME_PERIOD_1m_3m + "=0|"+        Constant.TIME_PERIOD_3m_10m + "=0|"+        Constant.TIME_PERIOD_10m_30m + "=0|"+        Constant.TIME_PERIOD_30m + "=0|"+        Constant.STEP_PERIOD_1_3 + "=0|"+        Constant.STEP_PERIOD_4_6 + "=0|"+        Constant.STEP_PERIOD_7_9 + "=0|"+        Constant.STEP_PERIOD_10_30 + "=0|"+        Constant.STEP_PERIOD_30_60 + "=0|"+        Constant.STEP_PERIOD_60 + "=0"    result = newResult  }  override fun isZero(): Boolean {    val newResult = Constant.SESSION_COUNT + "=0|"+        Constant.TIME_PERIOD_1s_3s + "=0|"+        Constant.TIME_PERIOD_4s_6s + "=0|"+        Constant.TIME_PERIOD_7s_9s + "=0|"+        Constant.TIME_PERIOD_10s_30s + "=0|"+        Constant.TIME_PERIOD_30s_60s + "=0|"+        Constant.TIME_PERIOD_1m_3m + "=0|"+        Constant.TIME_PERIOD_3m_10m + "=0|"+        Constant.TIME_PERIOD_10m_30m + "=0|"+        Constant.TIME_PERIOD_30m + "=0|"+        Constant.STEP_PERIOD_1_3 + "=0|"+        Constant.STEP_PERIOD_4_6 + "=0|"+        Constant.STEP_PERIOD_7_9 + "=0|"+        Constant.STEP_PERIOD_10_30 + "=0|"+        Constant.STEP_PERIOD_30_60 + "=0|"+        Constant.STEP_PERIOD_60 + "=0"    return this.result == newResult  }}

方法介紹

value方法:獲取累加器中的值

       merge方法:該方法特別重要,一定要寫對,這個方法是各個task的累加器進行合并的方法(下面介紹執行流程中將要用到)

        iszero方法:判斷是否為初始值

        reset方法:重置累加器中的值

        copy方法:拷貝累加器

spark中累加器的執行流程:

          首先有幾個task,spark engine就調用copy方法拷貝幾個累加器(不注冊的),然后在各個task中進行累加(注意在此過程中,被最初注冊的累加器的值是不變的),執行最后將調用merge方法和各個task的結果累計器進行合并(此時被注冊的累加器是初始值)

總結

以上就是本文關于Spark自定義累加器的使用實例詳解的全部內容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復大家的。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲欧美制服综合另类| 欧美日韩精品在线| 久热精品视频在线观看| 亚洲精品mp4| 中文字幕国内精品| 欧美日韩爱爱视频| 亚洲视频在线观看免费| 亚洲石原莉奈一区二区在线观看| 欧亚精品在线观看| 欧美国产欧美亚洲国产日韩mv天天看完整| 亚洲天堂免费视频| 国产日韩在线亚洲字幕中文| 国产精品自拍偷拍| 国产一区二区三区精品久久久| 日本欧美一级片| 亚洲精品综合久久中文字幕| 欧美大片免费观看在线观看网站推荐| 日韩av在线网| 91国在线精品国内播放| 亚洲精品国偷自产在线99热| 欧美成人网在线| 亚洲第一精品电影| 国产美女精品视频免费观看| 欧美亚洲成人精品| 国产精品91在线观看| 国产欧美一区二区三区久久人妖| 日韩人在线观看| 日韩av免费在线播放| 日韩成人网免费视频| 国内揄拍国内精品| 亚洲国产三级网| 欧美电影免费播放| 51视频国产精品一区二区| 91久久国产综合久久91精品网站| 国产日韩av在线播放| 国产精品久久久久久久久免费看| 欧美日韩另类在线| 欧美老女人bb| 久久频这里精品99香蕉| 国产日韩欧美电影在线观看| 国产精品一区专区欧美日韩| 欧美超级免费视 在线| 亚洲国产成人精品久久久国产成人一区| 国产成人在线精品| 亚洲人成毛片在线播放| 中文字幕av一区二区三区谷原希美| 国产日韩欧美视频在线| 日韩网站免费观看高清| 亚洲999一在线观看www| 国产日韩欧美一二三区| 亚洲一区二区久久| 国产精品老女人视频| 亚洲成人av在线| 国产主播精品在线| 精品成人乱色一区二区| 91精品久久久久久久久青青| www.久久久久久.com| 亚洲一区中文字幕| 欧美有码在线视频| 97色在线视频| 欧美激情精品久久久久久大尺度| 亚洲最大成人在线| 亚洲人成啪啪网站| 亚洲少妇激情视频| 日韩视频精品在线| 亚洲视频axxx| 亚洲级视频在线观看免费1级| 中文字幕免费精品一区| 疯狂做受xxxx欧美肥白少妇| 久久久国产视频91| 91九色国产社区在线观看| 久久久久久成人精品| 国产精品久久久久久久久久ktv| 国产精品久久久久国产a级| 国内精品国产三级国产在线专| 国产99久久精品一区二区 夜夜躁日日躁| 亚洲激情在线观看| 日韩欧美精品中文字幕| 蜜臀久久99精品久久久无需会员| 亚洲xxx自由成熟| 国产专区精品视频| 9.1国产丝袜在线观看| 精品久久久香蕉免费精品视频| 亚洲理论片在线观看| xxxx性欧美| 91免费看视频.| 国产日韩欧美中文在线播放| 久热精品在线视频| 91老司机在线| 久久99久久99精品免观看粉嫩| 日韩欧美极品在线观看| 久久成人综合视频| 在线视频日韩精品| 欧美噜噜久久久xxx| 国产精品自产拍高潮在线观看| 视频在线一区二区| 亚洲国产精彩中文乱码av| 激情成人中文字幕| 亚洲精品资源美女情侣酒店| 欧美日韩国产精品| 亚洲激情在线观看| 精品电影在线观看| 日韩欧美在线视频观看| 成人av色在线观看| www.欧美精品一二三区| 国产精品永久免费视频| 国产免费久久av| 欧美电影免费观看网站| 日韩av三级在线观看| 久久综合伊人77777尤物| 欧美午夜女人视频在线| 欧美激情视频三区| 日韩欧美在线免费观看| 日韩影视在线观看| 成人av色在线观看| 中文字幕日韩在线播放| 97在线观看视频国产| 国产精品久久久久久久久久新婚| 在线看片第一页欧美| 91产国在线观看动作片喷水| 在线看欧美日韩| 亚洲精品国产精品乱码不99按摩| 欧美性理论片在线观看片免费| 欧美性在线视频| 日本国产高清不卡| 亚洲欧美另类人妖| 欧美成人精品不卡视频在线观看| 欧美成人久久久| 视频直播国产精品| 精品国产乱码久久久久久婷婷| 欧美性生交大片免费| 国产成人高清激情视频在线观看| 一区二区欧美日韩视频| 欧美成年人在线观看| 国产成人综合一区二区三区| 久久这里只有精品视频首页| 欧美刺激性大交免费视频| 亚洲国产精久久久久久久| 夜色77av精品影院| 欧美性极品少妇精品网站| 亚洲精品国产福利| 91视频国产一区| 久久99亚洲精品| 97人人爽人人喊人人模波多| 亚洲乱码国产乱码精品精| 日韩亚洲欧美中文高清在线| 在线视频日韩精品| 国产精品久久久久久久9999| 亚洲人成网7777777国产| 国产精品免费一区| 国产精品稀缺呦系列在线| 精品国产一区二区三区久久狼黑人| 欧美孕妇孕交黑巨大网站| 亚洲第一页中文字幕| 欧美自拍大量在线观看| 日韩欧美第一页| 国产97在线亚洲| 国产精彩精品视频| 亚洲伊人一本大道中文字幕| 欧美香蕉大胸在线视频观看| 日韩av免费观影| 国产性猛交xxxx免费看久久| 日韩成人在线免费观看| 777午夜精品福利在线观看|