亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 服務器 > Web服務器 > 正文

淺談Spark RDD API中的Map和Reduce

2024-09-01 13:53:11
字體:
來源:轉載
供稿:網友

RDD是什么?

RDD是Spark中的抽象數據結構類型,任何數據在Spark中都被表示為RDD。從編程的角度來看,RDD可以簡單看成是一個數組。和普通數組的區別是,RDD中的數據是分區存儲的,這樣不同分區的數據就可以分布在不同的機器上,同時可以被并行處理。因此,Spark應用程序所做的無非是把需要處理的數據轉換為RDD,然后對RDD進行一系列的變換和操作從而得到結果。本文為第一部分,將介紹Spark RDD中與Map和Reduce相關的API中。

如何創建RDD?

RDD可以從普通數組創建出來,也可以從文件系統或者HDFS中的文件創建出來。

舉例:從普通數組創建RDD,里面包含了1到9這9個數字,它們分別在3個分區中。

scala> val a = sc.parallelize(1 to 9, 3)a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12

舉例:讀取文件README.md來創建RDD,文件中的每一行就是RDD中的一個元素

scala> val b = sc.textFile("README.md")b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12

雖然還有別的方式可以創建RDD,但在本文中我們主要使用上述兩種方式來創建RDD以說明RDD的API。

map

map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

舉例:

scala> val a = sc.parallelize(1 to 9, 3)scala> val b = a.map(x => x*2)scala> a.collectres10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> b.collectres11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每個元素都乘以2來產生一個新的RDD。

mapPartitions

mapPartitions是map的一個變種。map的輸入函數是應用于RDD中每個元素,而mapPartitions的輸入函數是應用于每個分區,也就是把每個分區中的內容作為整體來處理的。
它的函數定義為:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f即為輸入函數,它處理每個分區里面的內容。每個分區中的內容將以Iterator[T]傳遞給輸入函數f,f的輸出結果是Iterator[U]。最終的RDD由所有分區經過輸入函數處理后的結果合并起來的。

舉例:

scala> val a = sc.parallelize(1 to 9, 3)scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {  var res = List[(T, T)]()   var pre = iter.next while (iter.hasNext) {    val cur = iter.next;     res .::= (pre, cur) pre = cur;  }   res.iterator}scala> a.mapPartitions(myfunc).collectres0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函數myfunc是把分區中一個元素和它的下一個元素組成一個Tuple。因為分區中最后一個元素沒有下一個元素了,所以(3,4)和(6,7)不在結果中。

mapPartitions還有些變種,比如mapPartitionsWithContext,它能把處理過程中的一些狀態信息傳遞給用戶指定的輸入函數。還有mapPartitionsWithIndex,它能把分區的index傳遞給用戶指定的輸入函數。

mapValues

mapValues顧名思義就是輸入函數應用于RDD中Kev-Value的Value,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函數只適用于元素為KV對的RDD。

舉例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)scala> val b = a.map(x => (x.length, x))scala> b.mapValues("x" + _ + "x").collectres5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

mapWith

mapWith是map的另外一個變種,map只需要一個輸入函數,而mapWith有兩個輸入函數。它的定義如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一個函數constructA是把RDD的partition index(index從0開始)作為輸入,輸出為新類型A;

第二個函數f是把二元組(T, A)作為輸入(其中T為原RDD中的元素,A為第一個函數的輸出),輸出類型為U。

舉例:把partition index 乘以10,然后加上2作為新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) x.mapWith(a => a * 10)((a, b) => (b + 2)).collect res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

flatMap

與map類似,區別是原RDD中的元素經map處理后只能生成一個元素,而原RDD中的元素經flatmap處理后可生成多個元素來構建新RDD。

舉例:對原RDD中的每個元素x產生y個元素(從1到y,y為元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)scala> val b = a.flatMap(x => 1 to x)scala> b.collectres12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

flatMapWith

flatMapWith與mapWith很類似,都是接收兩個函數,一個函數把partitionIndex作為輸入,輸出是一個新類型A;另外一個函數是以二元組(T,A)作為輸入,輸出為一個序列,這些序列里面的元素組成了新的RDD。它的定義如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

舉例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collectres58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,8, 2, 9)

flatMapValues

flatMapValues類似于mapValues,不同的在于flatMapValues應用于元素為KV對的RDD中Value。每個一元素的Value被輸入函數映射為一系列的值,然后這些值再與原RDD中的Key組成一系列新的KV對。

舉例

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))scala> val b = a.flatMapValues(x=>x.to(5))scala> b.collectres3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

上述例子中原RDD中每個元素的值被轉換為一個序列(從其當前值到5),比如第一個KV對(1,2), 其值2被轉換為2,3,4,5。然后其再與原KV對中Key組成一系列新的KV對(1,2),(1,3),(1,4),(1,5)。

reduce

reduce將RDD中元素兩兩傳遞給輸入函數,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函數直到最后只有一個值為止。

舉例

scala> val c = sc.parallelize(1 to 10)scala> c.reduce((x, y) => x + y)res4: Int = 55

上述例子對RDD中的元素求和。

reduceByKey

顧名思義,reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行reduce,因此,Key相同的多個元素的值被reduce為一個值,然后與原RDD中的Key組成一個新的KV對。

舉例:

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))scala> a.reduceByKey((x,y) => x + y).collectres7: Array[(Int, Int)] = Array((1,2), (3,10))

上述例子中,對Key相同的元素的值求和,因此Key為3的兩個元素被轉為了(3,10)。

Reference

本文中的部分例子來自:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

總結

以上就是本文關于淺談Spark RDD API中的Map和Reduce的全部內容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復大家的。感謝朋友們對本站的支持!


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
国产成人精品久久| 亚洲综合中文字幕68页| 国自在线精品视频| 欧美成人免费全部观看天天性色| 国产精品成人va在线观看| 日韩电影第一页| 亚洲国产精彩中文乱码av| 欧美日韩中国免费专区在线看| 精品国产美女在线| 欧美成人免费在线视频| 欧美视频在线观看免费网址| 91欧美激情另类亚洲| 日韩精品欧美激情| 亚洲第一页在线| 亚洲第一中文字幕在线观看| 久久精品2019中文字幕| 欧美精品在线免费播放| 国产日韩欧美成人| 久久99视频精品| 亚洲黄页网在线观看| 国产亚洲福利一区| 国产免费观看久久黄| 国产一区二区三区精品久久久| 国产91九色视频| 成人精品一区二区三区电影黑人| 日韩欧美一区视频| 国产精品视频免费观看www| 精品国产美女在线| 亚洲欧美国产日韩中文字幕| 久久免费观看视频| 久久中文字幕在线视频| 久久久噜久噜久久综合| 日韩欧美国产网站| 91精品国产色综合久久不卡98口| 亚洲视屏在线播放| 亚洲美女视频网| 国产精品国产福利国产秒拍| 最近2019中文字幕在线高清| 欧美又大又硬又粗bbbbb| 琪琪第一精品导航| 国产精品免费一区豆花| 日本精品va在线观看| 2024亚洲男人天堂| 欧美与黑人午夜性猛交久久久| 亚洲精品网站在线播放gif| 国产精品视频一区二区高潮| 国产区精品在线观看| 国产欧美日韩中文字幕在线| 欧美日韩国产中文字幕| 国内精品中文字幕| 欧美在线观看一区二区三区| 少妇激情综合网| 欧美精品www在线观看| 亚洲精品电影久久久| 日韩视频在线观看免费| 久久久精品一区二区三区| 欧美黄色免费网站| 亚洲午夜色婷婷在线| 亚洲第一视频网站| 久久天天躁狠狠躁老女人| 亚洲一区二区日本| 国产精品视频26uuu| 色先锋资源久久综合5566| 国产精品色午夜在线观看| 日韩在线观看免费全集电视剧网站| 久久精品久久久久电影| 日韩精品视频在线观看网址| 国产精品自在线| 福利一区视频在线观看| 欧美肥老妇视频| 91牛牛免费视频| 欧美在线视频一二三| 国产主播欧美精品| 日韩精品一区二区三区第95| 久久99精品久久久久久琪琪| 超碰精品一区二区三区乱码| 欧美在线不卡区| 久久免费视频在线观看| 在线a欧美视频| 亚洲天堂网站在线观看视频| 亚洲人成免费电影| 日韩成人在线电影网| 韩国欧美亚洲国产| 亚洲自拍偷拍第一页| 4438全国亚洲精品在线观看视频| 成人妇女淫片aaaa视频| 久久久久成人精品| 国产一区二区三区丝袜| 亚洲免费视频一区二区| 日韩欧美高清视频| 久久综合亚洲社区| 日韩欧美国产激情| 亚洲国产日韩一区| 日韩av成人在线观看| 国产精品免费视频久久久| 在线成人免费网站| 国产精品∨欧美精品v日韩精品| 欧美专区日韩视频| 欧美午夜精品久久久久久人妖| 亚洲成人久久久久| 亚洲精品成人久久| 日本精品性网站在线观看| 色悠悠久久88| 超碰97人人做人人爱少妇| 亚洲精品福利资源站| 一区二区三区国产在线观看| 欧美性xxxxxxxxx| 97成人超碰免| 一区二区中文字幕| 69影院欧美专区视频| 国产精品日日摸夜夜添夜夜av| 欧美黑人极品猛少妇色xxxxx| 日韩亚洲成人av在线| 狠狠色狠狠色综合日日五| 久久久久久久影院| 欧美一级视频在线观看| 亚洲最大av网| 亚洲国产日韩欧美在线99| 欧美在线视频网站| 在线视频国产日韩| 亚洲成人a**站| 日本国产高清不卡| 亚洲国产成人精品久久| 国产日韩综合一区二区性色av| 国产精品美女在线| 欧美日韩国产丝袜另类| 国产日韩亚洲欧美| 亚洲精品国精品久久99热| 8090成年在线看片午夜| 欧美黑人狂野猛交老妇| 一本色道久久综合狠狠躁篇怎么玩| 亚洲色图美腿丝袜| 夜夜躁日日躁狠狠久久88av| 亚洲精品日韩丝袜精品| 视频在线观看99| 隔壁老王国产在线精品| 亚洲国产精品悠悠久久琪琪| 91久久夜色精品国产网站| 日产精品99久久久久久| 97精品国产91久久久久久| 国产精品第二页| 91av福利视频| 亚洲国产欧美精品| 国产美女搞久久| 久久亚洲春色中文字幕| 久久香蕉国产线看观看av| 欧美放荡办公室videos4k| 欧美视频免费在线观看| 最近2019免费中文字幕视频三| 亚洲精品电影久久久| 波霸ol色综合久久| 欧美日韩日本国产| 午夜精品一区二区三区在线播放| 97视频在线观看网址| 亚洲性夜色噜噜噜7777| 国产成人精品日本亚洲| 亚洲欧洲激情在线| 欧美日韩精品在线观看| 97视频在线观看网址| 久久久久国产精品一区| 久久久久久久久久久成人| 精品国产91乱高清在线观看| 亚洲精品久久久久中文字幕二区| 久久久久中文字幕2018|