亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 服務器 > Web服務器 > 正文

Spark的廣播變量和累加器使用方法代碼示例

2024-09-01 13:53:04
字體:
來源:轉載
供稿:網友

一、廣播變量和累加器

通常情況下,當向Spark操作(如map,reduce)傳遞一個函數時,它會在一個遠程集群節點上執行,它會使用函數中所有變量的副本。這些變量被復制到所有的機器上,遠程機器上并沒有被更新的變量會向驅動程序回傳。在任務之間使用通用的,支持讀寫的共享變量是低效的。盡管如此,Spark提供了兩種有限類型的共享變量,廣播變量和累加器。

1.1 廣播變量:

廣播變量允許程序員將一個只讀的變量緩存在每臺機器上,而不用在任務之間傳遞變量。廣播變量可被用于有效地給每個節點一個大輸入數據集的副本。Spark還嘗試使用高效地廣播算法來分發變量,進而減少通信的開銷。

Spark的動作通過一系列的步驟執行,這些步驟由分布式的shuffle操作分開。Spark自動地廣播每個步驟每個任務需要的通用數據。這些廣播數據被序列化地緩存,在運行任務之前被反序列化出來。這意味著當我們需要在多個階段的任務之間使用相同的數據,或者以反序列化形式緩存數據是十分重要的時候,顯式地創建廣播變量才有用。

通過在一個變量v上調用SparkContext.broadcast(v)可以創建廣播變量。廣播變量是圍繞著v的封裝,可以通過value方法訪問這個變量。舉例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.valueres0: Array[Int] = Array(1, 2, 3)

在創建了廣播變量之后,在集群上的所有函數中應該使用它來替代使用v.這樣v就不會不止一次地在節點之間傳輸了。另外,為了確保所有的節點獲得相同的變量,對象v在被廣播之后就不應該再修改。

1.2 累加器:

累加器是僅僅被相關操作累加的變量,因此可以在并行中被有效地支持。它可以被用來實現計數器和總和。Spark原生地只支持數字類型的累加器,編程者可以添加新類型的支持。如果創建累加器時指定了名字,可以在Spark的UI界面看到。這有利于理解每個執行階段的進程。(對于python還不支持)

累加器通過對一個初始化了的變量v調用SparkContext.accumulator(v)來創建。在集群上運行的任務可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程序能夠讀取它的值,通過累加器的value方法。

下面的代碼展示了如何把一個數組中的所有元素累加到累加器上:

scala> val accum = sc.accumulator(0, "My Accumulator")accum: spark.Accumulator[Int] = 0scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)...10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.valueres2: Int = 10

盡管上面的例子使用了內置支持的累加器類型Int,但是開發人員也可以通過繼承AccumulatorParam類來創建它們自己的累加器類型。AccumulatorParam接口有兩個方法:
zero方法為你的類型提供一個0值。
addInPlace方法將兩個值相加。
假設我們有一個代表數學vector的Vector類。我們可以向下面這樣實現:

object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = {  Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = {  v1 += v2 }}// Then, create an Accumulator of this type:val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在Scala里,Spark提供更通用的累加接口來累加數據,盡管結果的類型和累加的數據類型可能不一致(例如,通過收集在一起的元素來創建一個列表)。同時,SparkContext..accumulableCollection方法來累加通用的Scala的集合類型。

累加器僅僅在動作操作內部被更新,Spark保證每個任務在累加器上的更新操作只被執行一次,也就是說,重啟任務也不會更新。在轉換操作中,用戶必須意識到每個任務對累加器的更新操作可能被不只一次執行,如果重新執行了任務和作業的階段。

累加器并沒有改變Spark的惰性求值模型。如果它們被RDD上的操作更新,它們的值只有當RDD因為動作操作被計算時才被更新。因此,當執行一個惰性的轉換操作,比如map時,不能保證對累加器值的更新被實際執行了。下面的代碼片段演示了此特性:

val accum = sc.accumulator(0)data.map { x => accum += x; f(x) }//在這里,accum的值仍然是0,因為沒有動作操作引起map被實際的計算.

二.Java和Scala版本的實戰演示

2.1 Java版本:

/** * 實例:利用廣播進行黑名單過濾! * 檢查新的數據 根據是否在廣播變量-黑名單內,從而實現過濾數據。 */public class BroadcastAccumulator { /**  * 創建一個List的廣播變量  *  */ private static volatile Broadcast<List<String>> broadcastList = null; /**  * 計數器!  */ private static volatile Accumulator<Integer> accumulator = null; public static void main(String[] args) {  SparkConf conf = new SparkConf().setMaster("local[2]").    setAppName("WordCountOnlineBroadcast");  JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));  /**   * 注意:分發廣播需要一個action操作觸發。   * 注意:廣播的是Arrays的asList 而非對象的引用。廣播Array數組的對象引用會出錯。   * 使用broadcast廣播黑名單到每個Executor中!   */  broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));  /**   * 累加器作為全局計數器!用于統計在線過濾了多少個黑名單!   * 在這里實例化。   */  accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");  JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);  /**   * 這里省去flatmap因為名單是一個個的!   */  JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {   @Override   public Tuple2<String, Integer> call(String word) {    return new Tuple2<String, Integer>(word, 1);   }  });  JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {   @Override   public Integer call(Integer v1, Integer v2) {    return v1 + v2;   }  });  /**   * Funtion里面 前幾個參數是 入參。   * 后面的出參。   * 體現在call方法里面!   *   */  wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {   @Override   public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {    rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {     @Override     public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {      if (broadcastList.value().contains(wordPair._1)) {       /**        * accumulator不僅僅用來計數。        * 可以同時寫進數據庫或者緩存中。        */       accumulator.add(wordPair._2);       return false;      }else {       return true;      }     };     /**      * 廣播和計數器的執行,需要進行一個action操作!      */    }).collect();    System.out.println("廣播器里面的值"+broadcastList.value());    System.out.println("計時器里面的值"+accumulator.value());    return null;   }  });  jsc.start();  jsc.awaitTermination();  jsc.close(); } }

2.2 Scala版本

package com.Streamingimport java.utilimport org.apache.spark.streaming.{Duration, StreamingContext}import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}import org.apache.spark.broadcast.Broadcast/** * Created by lxh on 2016/6/30. */object BroadcastAccumulatorStreaming { /** * 聲明一個廣播和累加器! */ private var broadcastList:Broadcast[List[String]] = _ private var accumulator:Accumulator[Int] = _ def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest") val sc = new SparkContext(sparkConf) /**  * duration是ms  */ val ssc = new StreamingContext(sc,Duration(2000)) // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark")) broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark")) accumulator= ssc.sparkContext.accumulator(0,"broadcasttest") /**  * 獲取數據!  */ val lines = ssc.socketTextStream("localhost",9999) /**  * 1.flatmap把行分割成詞。  * 2.map把詞變成tuple(word,1)  * 3.reducebykey累加value  * (4.sortBykey排名)  * 4.進行過濾。 value是否在累加器中。  * 5.打印顯示。  */ val words = lines.flatMap(line => line.split(" ")) val wordpair = words.map(word => (word,1)) wordpair.filter(record => {broadcastList.value.contains(record._1)}) val pair = wordpair.reduceByKey(_+_) /**  * 這個pair 是PairDStream<String, Integer>  * 查看這個id是否在黑名單中,如果是的話,累加器就+1  *//* pair.foreachRDD(rdd => {  rdd.filter(record => {  if (broadcastList.value.contains(record._1)) {   accumulator.add(1)   return true  } else {   return false  }  }) })*/ val filtedpair = pair.filter(record => {  if (broadcastList.value.contains(record._1)) {   accumulator.add(record._2)   true  } else {   false  }  }).print println("累加器的值"+accumulator.value) // pair.filter(record => {broadcastList.value.contains(record._1)}) /* val keypair = pair.map(pair => (pair._2,pair._1))*/ /**  * 如果DStream自己沒有某個算子操作。就通過轉化transform!  */ /* keypair.transform(rdd => {  rdd.sortByKey(false)//TODO })*/ pair.print() ssc.start() ssc.awaitTermination() }}

總結

以上就是本文關于Spark的廣播變量和累加器使用方法代碼示例的全部內容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復大家。感謝朋友們對VEVB武林網網站的支持。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
国产精品视频xxxx| 庆余年2免费日韩剧观看大牛| 国产精品久久久久秋霞鲁丝| 91社区国产高清| 欧美—级a级欧美特级ar全黄| 亚洲欧美激情精品一区二区| 久久精品国产成人| 成人免费看黄网站| 国产精品美女久久久久av超清| 欧美成人h版在线观看| 色中色综合影院手机版在线观看| 久久精品99久久久香蕉| 欧美亚州一区二区三区| 欧美成人全部免费| 国产精品麻豆va在线播放| 一区二区三区国产视频| 欧美激情中文字幕乱码免费| 久久久爽爽爽美女图片| 在线亚洲国产精品网| 亚洲日韩欧美视频| 亚洲国产日韩欧美在线图片| 最近中文字幕mv在线一区二区三区四区| 亚洲永久免费观看| 国产精品中文久久久久久久| 欧美性色视频在线| 亚洲黄一区二区| 欧美精品制服第一页| 2019国产精品自在线拍国产不卡| 午夜精品视频网站| 欧美另类69精品久久久久9999| 日本精品视频网站| 亚洲欧洲偷拍精品| 国产精品视频自在线| 国产欧美精品一区二区三区-老狼| 欧美日韩国产影院| 92裸体在线视频网站| 91亚洲人电影| 国产精品亚洲第一区| 国产精品高潮呻吟久久av无限| 国产精品久久久久久久久久| 国产视频999| 国产欧美精品一区二区| 亚洲级视频在线观看免费1级| 欧美中文字幕视频| 久久国产加勒比精品无码| 亚洲免费视频在线观看| 欧美日韩国产综合视频在线观看中文| 欧美国产日韩二区| 欧美三级欧美成人高清www| 色综合色综合网色综合| 91精品国产91久久| 久久久成人av| 亚洲区免费影片| 欧美大片在线影院| 日韩久久午夜影院| 亚洲欧洲国产伦综合| 96sao精品视频在线观看| 欧美精品18videos性欧| 亚洲精品国偷自产在线99热| 777国产偷窥盗摄精品视频| 国产专区欧美专区| 91精品国产色综合久久不卡98口| 日韩精品在线观看一区二区| 亚洲精品中文字幕女同| 亚洲高清一二三区| 色综合视频一区中文字幕| 最好看的2019的中文字幕视频| 国产婷婷色综合av蜜臀av| 中文字幕日韩专区| 亚洲精品综合久久中文字幕| 一区二区三区视频免费| 精品久久香蕉国产线看观看gif| 亚洲片在线资源| 亚洲福利视频久久| 精品国产精品三级精品av网址| 国产精品久久久久aaaa九色| 久久亚洲精品毛片| 久久久久久久999精品视频| 亚洲新中文字幕| 2025国产精品视频| 中文字幕亚洲欧美日韩高清| 亚洲一区第一页| 91青草视频久久| 不卡av在线播放| 亚洲免费人成在线视频观看| 国产精品久久久久久久久久免费| 国产欧美亚洲精品| 日韩精品极品在线观看播放免费视频| 欧美在线影院在线视频| 5278欧美一区二区三区| 日韩电视剧免费观看网站| 国产亚洲精品成人av久久ww| 欧美日韩成人在线视频| 国产va免费精品高清在线观看| 91精品国产精品| 7777kkkk成人观看| 色狠狠久久aa北条麻妃| 亚洲日韩欧美视频| 国产欧美一区二区三区四区| 在线观看精品自拍私拍| 亚洲人成在线观| 日韩禁在线播放| 国产一区av在线| 欧美午夜精品在线| 国产精品海角社区在线观看| 精品二区三区线观看| 国产在线播放91| 91在线视频导航| 亚洲欧美日韩在线一区| 亚洲第一综合天堂另类专| 欧美视频一区二区三区…| 久久91精品国产91久久跳| 国产精品劲爆视频| 成人激情视频小说免费下载| www.xxxx欧美| 51色欧美片视频在线观看| 中文字幕在线成人| 亚洲综合中文字幕68页| 欧美国产精品va在线观看| 精品中文字幕在线观看| 欧亚精品在线观看| 精品久久久久久电影| 色琪琪综合男人的天堂aⅴ视频| 欧美另类99xxxxx| 国产精品中文字幕久久久| 91欧美精品午夜性色福利在线| 日韩欧美999| 高清一区二区三区四区五区| 亚洲最大成人在线| 九九精品视频在线观看| 日韩av片电影专区| 欧美久久精品午夜青青大伊人| 欧美激情区在线播放| 一区二区三区在线播放欧美| 亚洲欧美精品伊人久久| 91国自产精品中文字幕亚洲| 亚洲男人第一av网站| 2021国产精品视频| 欧美日韩国产精品专区| 色悠悠久久久久| 日韩专区中文字幕| 九九热视频这里只有精品| 欧美日本高清视频| 成人精品一区二区三区电影黑人| 久久99精品久久久久久青青91| 亚洲区免费影片| 日韩欧美在线第一页| 精品久久久久久电影| 亚洲日韩欧美视频一区| 国模吧一区二区三区| 亚洲无亚洲人成网站77777| 日韩高清电影免费观看完整版| 91精品久久久久久久久久另类| 热草久综合在线| 欧美洲成人男女午夜视频| 欧美日韩国产精品一区二区三区四区| 国产综合在线视频| 亚洲第一免费网站| 亚洲国产精品人久久电影| 国产日韩亚洲欧美| 在线观看日韩视频| 国产精品扒开腿爽爽爽视频| 国产一区二区色| 日韩中文字幕国产精品|