亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 服務器 > Web服務器 > 正文

SparkGraphx計算指定節點的N度關系節點源碼

2024-09-01 13:53:06
字體:
來源:轉載
供稿:網友

直接上代碼:

package horizon.graphx.utilimport java.security.InvalidParameterExceptionimport horizon.graphx.util.CollectionUtil.CollectionHelperimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.storage.StorageLevelimport scala.collection.mutable.ArrayBufferimport scala.reflect.ClassTag/** * Created by yepei.ye on 2017/1/19. * Description:用于在圖中為指定的節點計算這些節點的N度關系節點,輸出這些節點與源節點的路徑長度和節點id */object GraphNdegUtil { val maxNDegVerticesCount = 10000 val maxDegree = 1000 /** * 計算節點的N度關系 * * @param edges * @param choosedVertex * @param degree * @tparam ED * @return */ def aggNdegreedVertices[ED: ClassTag](edges: RDD[(VertexId, VertexId)], choosedVertex: RDD[VertexId], degree: Int): VertexRDD[Map[Int, Set[VertexId]]] = { val simpleGraph = Graph.fromEdgeTuples(edges, 0, Option(PartitionStrategy.EdgePartition2D), StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER) aggNdegreedVertices(simpleGraph, choosedVertex, degree) } def aggNdegreedVerticesWithAttr[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], choosedVertex: RDD[VertexId], degree: Int, sendFilter: (VD, VD) => Boolean = (_: VD, _: VD) => true): VertexRDD[Map[Int, Set[VD]]] = { val ndegs: VertexRDD[Map[Int, Set[VertexId]]] = aggNdegreedVertices(graph, choosedVertex, degree, sendFilter) val flated: RDD[Ver[VD]] = ndegs.flatMap(e => e._2.flatMap(t => t._2.map(s => Ver(e._1, s, t._1, null.asInstanceOf[VD])))).persist(StorageLevel.MEMORY_AND_DISK_SER) val matched: RDD[Ver[VD]] = flated.map(e => (e.id, e)).join(graph.vertices).map(e => e._2._1.copy(attr = e._2._2)).persist(StorageLevel.MEMORY_AND_DISK_SER) flated.unpersist(blocking = false) ndegs.unpersist(blocking = false) val grouped: RDD[(VertexId, Map[Int, Set[VD]])] = matched.map(e => (e.source, ArrayBuffer(e))).reduceByKey(_ ++= _).map(e => (e._1, e._2.map(t => (t.degree, Set(t.attr))).reduceByKey(_ ++ _).toMap)) matched.unpersist(blocking = false) VertexRDD(grouped) } def aggNdegreedVertices[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],              choosedVertex: RDD[VertexId],              degree: Int,              sendFilter: (VD, VD) => Boolean = (_: VD, _: VD) => true              ): VertexRDD[Map[Int, Set[VertexId]]] = { if (degree < 1) {  throw new InvalidParameterException("度參數錯誤:" + degree) } val initVertex = choosedVertex.map(e => (e, true)).persist(StorageLevel.MEMORY_AND_DISK_SER) var g: Graph[DegVertex[VD], Int] = graph.outerJoinVertices(graph.degrees)((_, old, deg) => (deg.getOrElse(0), old))  .subgraph(vpred = (_, a) => a._1 <= maxDegree)  //去掉大節點  .outerJoinVertices(initVertex)((id, old, hasReceivedMsg) => {  DegVertex(old._2, hasReceivedMsg.getOrElse(false), ArrayBuffer((id, 0))) //初始化要發消息的節點 }).mapEdges(_ => 0).cache() //簡化邊屬性 choosedVertex.unpersist(blocking = false) var i = 0 var prevG: Graph[DegVertex[VD], Int] = null var newVertexRdd: VertexRDD[ArrayBuffer[(VertexId, Int)]] = null while (i < degree + 1) {  prevG = g  //發第i+1輪消息  newVertexRdd = prevG.aggregateMessages[ArrayBuffer[(VertexId, Int)]](sendMsg(_, sendFilter), (a, b) => reduceVertexIds(a ++ b)).persist(StorageLevel.MEMORY_AND_DISK_SER)  g = g.outerJoinVertices(newVertexRdd)((vid, old, msg) => if (msg.isDefined) updateVertexByMsg(vid, old, msg.get) else old.copy(init = false)).cache()  prevG.unpersistVertices(blocking = false)  prevG.edges.unpersist(blocking = false)  newVertexRdd.unpersist(blocking = false)  i += 1 } newVertexRdd.unpersist(blocking = false) val maped = g.vertices.join(initVertex).mapValues(e => sortResult(e._1)).persist(StorageLevel.MEMORY_AND_DISK_SER) initVertex.unpersist() g.unpersist(blocking = false) VertexRDD(maped) } private case class Ver[VD: ClassTag](source: VertexId, id: VertexId, degree: Int, attr: VD = null.asInstanceOf[VD]) private def updateVertexByMsg[VD: ClassTag](vertexId: VertexId, oldAttr: DegVertex[VD], msg: ArrayBuffer[(VertexId, Int)]): DegVertex[VD] = { val addOne = msg.map(e => (e._1, e._2 + 1)) val newMsg = reduceVertexIds(oldAttr.degVertices ++ addOne) oldAttr.copy(init = msg.nonEmpty, degVertices = newMsg) } private def sortResult[VD: ClassTag](degs: DegVertex[VD]): Map[Int, Set[VertexId]] = degs.degVertices.map(e => (e._2, Set(e._1))).reduceByKey(_ ++ _).toMap case class DegVertex[VD: ClassTag](var attr: VD, init: Boolean = false, degVertices: ArrayBuffer[(VertexId, Int)]) case class VertexDegInfo[VD: ClassTag](var attr: VD, init: Boolean = false, degVertices: ArrayBuffer[(VertexId, Int)]) private def sendMsg[VD: ClassTag](e: EdgeContext[DegVertex[VD], Int, ArrayBuffer[(VertexId, Int)]], sendFilter: (VD, VD) => Boolean): Unit = { try {  val src = e.srcAttr  val dst = e.dstAttr  //只有dst是ready狀態才接收消息  if (src.degVertices.size < maxNDegVerticesCount && (src.init || dst.init) && dst.degVertices.size < maxNDegVerticesCount && !isAttrSame(src, dst)) {  if (sendFilter(src.attr, dst.attr)) {   e.sendToDst(reduceVertexIds(src.degVertices))  }  if (sendFilter(dst.attr, dst.attr)) {   e.sendToSrc(reduceVertexIds(dst.degVertices))  }  } } catch {  case ex: Exception =>  println(s"==========error found: exception:${ex.getMessage}," +   s"edgeTriplet:(srcId:${e.srcId},srcAttr:(${e.srcAttr.attr},${e.srcAttr.init},${e.srcAttr.degVertices.size}))," +   s"dstId:${e.dstId},dstAttr:(${e.dstAttr.attr},${e.dstAttr.init},${e.dstAttr.degVertices.size}),attr:${e.attr}")  ex.printStackTrace()  throw ex } } private def reduceVertexIds(ids: ArrayBuffer[(VertexId, Int)]): ArrayBuffer[(VertexId, Int)] = ArrayBuffer() ++= ids.reduceByKey(Math.min) private def isAttrSame[VD: ClassTag](a: DegVertex[VD], b: DegVertex[VD]): Boolean = a.init == b.init && allKeysAreSame(a.degVertices, b.degVertices) private def allKeysAreSame(a: ArrayBuffer[(VertexId, Int)], b: ArrayBuffer[(VertexId, Int)]): Boolean = { val aKeys = a.map(e => e._1).toSet val bKeys = b.map(e => e._1).toSet if (aKeys.size != bKeys.size || aKeys.isEmpty) return false aKeys.diff(bKeys).isEmpty && bKeys.diff(aKeys).isEmpty }}

其中sortResult方法里對Traversable[(K,V)]類型的集合使用了reduceByKey方法,這個方法是自行封裝的,使用時需要導入,代碼如下:

/** * Created by yepei.ye on 2016/12/21. * Description: */object CollectionUtil { /** * 對具有Traversable[(K, V)]類型的集合添加reduceByKey相關方法 * * @param collection * @param kt * @param vt * @tparam K * @tparam V */ implicit class CollectionHelper[K, V](collection: Traversable[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) { def reduceByKey(f: (V, V) => V): Traversable[(K, V)] = collection.groupBy(_._1).map { case (_: K, values: Traversable[(K, V)]) => values.reduce((a, b) => (a._1, f(a._2, b._2))) } /**  * reduceByKey的同時,返回被reduce掉的元素的集合  *  * @param f  * @return  */ def reduceByKeyWithReduced(f: (V, V) => V)(implicit kt: ClassTag[K], vt: ClassTag[V]): (Traversable[(K, V)], Traversable[(K, V)]) = {  val reduced: ArrayBuffer[(K, V)] = ArrayBuffer()  val newSeq = collection.groupBy(_._1).map {  case (_: K, values: Traversable[(K, V)]) => values.reduce((a, b) => {   val newValue: V = f(a._2, b._2)   val reducedValue: V = if (newValue == a._2) b._2 else a._2   val reducedPair: (K, V) = (a._1, reducedValue)   reduced += reducedPair   (a._1, newValue)  })  }  (newSeq, reduced.toTraversable) } }}

 

總結

以上就是本文關于SparkGraphx計算指定節點的N度關系節點源碼的全部內容了,希望對大家有所幫助。有什么問題請留言,小編會及時回復大家的。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
91亚洲精品在线观看| 欧美另类高清videos| 国产精品成人va在线观看| 中文字幕亚洲一区二区三区| 清纯唯美亚洲激情| 日韩中文在线观看| 欧美成人一区在线| 欧美国产日本高清在线| 久久久久久久久久久亚洲| 91极品女神在线| 欧美乱大交xxxxx另类电影| 国产精品综合不卡av| 色播久久人人爽人人爽人人片视av| 亚洲人成啪啪网站| 国产精品第100页| 欧美高清一级大片| 亚洲综合中文字幕在线| 国产一区二区三区三区在线观看| 欧美专区日韩视频| 亚洲xxxx妇黄裸体| 欧美在线性视频| 日本aⅴ大伊香蕉精品视频| 亚洲毛片在线免费观看| 精品日韩视频在线观看| 国产午夜精品全部视频播放| 96精品久久久久中文字幕| 欧美激情在线观看视频| 欧美激情视频三区| 久久久成人精品| 91chinesevideo永久地址| 色综合久久久久久中文网| 国产区精品在线观看| 91精品91久久久久久| 国内成人精品一区| 欧美激情一级精品国产| 深夜福利日韩在线看| 日韩欧美高清视频| 日韩一区二区在线视频| 中文字幕国产亚洲2019| 欧美丰满片xxx777| 九九热精品视频在线播放| 久久久国产精彩视频美女艺术照福利| www高清在线视频日韩欧美| 北条麻妃久久精品| 在线观看91久久久久久| 国产成人中文字幕| 欧美日韩免费在线| 精品欧美一区二区三区| 国产精品成熟老女人| 亚洲天堂第二页| 欧美成人在线网站| 久久躁日日躁aaaaxxxx| 色黄久久久久久| 久久99久久99精品免观看粉嫩| 亚洲精品456在线播放狼人| 自拍偷拍亚洲精品| 欧美尺度大的性做爰视频| 性欧美长视频免费观看不卡| 国产精品久久久久久网站| 欧美亚洲另类制服自拍| 色老头一区二区三区在线观看| 欧美电影免费观看| 美女少妇精品视频| 欧美大片免费观看| 欧美人与性动交a欧美精品| 国产精品久久久久久超碰| 亚洲xxxx视频| 国产精品高清网站| 国产成人小视频在线观看| 日韩欧美精品中文字幕| 国产亚洲视频中文字幕视频| 欧美风情在线观看| 麻豆国产精品va在线观看不卡| 国产成人精品久久久| 亚洲午夜精品久久久久久久久久久久| 亚洲人成网7777777国产| 欧美大尺度激情区在线播放| 国产精品久久二区| 久久人人看视频| 亚洲精品乱码久久久久久金桔影视| 欧美精品videosex牲欧美| 国产v综合ⅴ日韩v欧美大片| 亚洲男人天堂九九视频| 欧美在线免费观看| 国产精品十八以下禁看| 亚洲free嫩bbb| 懂色av中文一区二区三区天美| 精品亚洲一区二区三区在线观看| 69影院欧美专区视频| 国产精品盗摄久久久| 欧美精品久久久久久久免费观看| 性色av香蕉一区二区| 亚洲第一精品福利| 日韩中文字幕欧美| 国产精品久久久久久婷婷天堂| 91视频免费网站| 欧美裸体xxxxx| 欧美一级淫片播放口| 91久久国产精品| 成人久久18免费网站图片| 国产成人精品在线视频| 国产精品第10页| 清纯唯美日韩制服另类| 福利一区福利二区微拍刺激| 亚洲精品乱码久久久久久金桔影视| 丝袜亚洲欧美日韩综合| 日韩精品999| 国产精品三级久久久久久电影| 久久综合伊人77777蜜臀| 这里只有精品视频在线| 国产精品手机播放| 欧美在线一级视频| 成人国内精品久久久久一区| 欧美性视频精品| 国产亚洲欧美日韩美女| 亚洲欧美综合图区| 91国内免费在线视频| 色播久久人人爽人人爽人人片视av| 国产精品入口日韩视频大尺度| 亚洲成人教育av| 91av在线网站| 欧美日韩亚洲一区二| 亚洲自拍偷拍一区| 国产精品一区专区欧美日韩| 国产福利精品视频| 日本欧美在线视频| 亚洲第一综合天堂另类专| 日韩成人av网址| www国产亚洲精品久久网站| 欧美性猛交xxxx| 成人免费在线视频网址| 国产精品视频成人| 日韩在线视频线视频免费网站| 91香蕉嫩草神马影院在线观看| 日本韩国欧美精品大片卡二| 国产精品色午夜在线观看| 91深夜福利视频| 97免费在线视频| 久久精品青青大伊人av| 日韩av在线直播| 亚洲天堂免费在线| 精品国产91乱高清在线观看| 91成品人片a无限观看| 亚洲电影免费观看高清| 日韩欧美亚洲一二三区| 国产精品电影观看| 欧美一性一乱一交一视频| 2024亚洲男人天堂| 欧美巨大黑人极品精男| 精品无人区乱码1区2区3区在线| 亚洲欧美日韩久久久久久| 国产视频久久久久久久| 成人a免费视频| 国产成人久久久精品一区| 久久99国产综合精品女同| 亚洲第一视频网| 欧美精品激情在线观看| 中文字幕精品网| 在线观看中文字幕亚洲| 欧美日韩亚洲一区二区| 国产精品情侣自拍| 中日韩美女免费视频网址在线观看| 大胆欧美人体视频| 亚洲欧美日韩在线高清直播|