SparkGraphx计算指定节点的N度关系节点源码
直接上代码:
packagehorizon.graphx.util importjava.security.InvalidParameterException importhorizon.graphx.util.CollectionUtil.CollectionHelper importorg.apache.spark.graphx._ importorg.apache.spark.rdd.RDD importorg.apache.spark.storage.StorageLevel importscala.collection.mutable.ArrayBuffer importscala.reflect.ClassTag /** *Createdbyyepei.yeon2017/1/19. *Description:用于在图中为指定的节点计算这些节点的N度关系节点,输出这些节点与源节点的路径长度和节点id */ objectGraphNdegUtil{ valmaxNDegVerticesCount=10000 valmaxDegree=1000 /** *计算节点的N度关系 * *@paramedges *@paramchoosedVertex *@paramdegree *@tparamED *@return */ defaggNdegreedVertices[ED:ClassTag](edges:RDD[(VertexId,VertexId)],choosedVertex:RDD[VertexId],degree:Int):VertexRDD[Map[Int,Set[VertexId]]]={ valsimpleGraph=Graph.fromEdgeTuples(edges,0,Option(PartitionStrategy.EdgePartition2D),StorageLevel.MEMORY_AND_DISK_SER,StorageLevel.MEMORY_AND_DISK_SER) aggNdegreedVertices(simpleGraph,choosedVertex,degree) } defaggNdegreedVerticesWithAttr[VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],choosedVertex:RDD[VertexId],degree:Int,sendFilter:(VD,VD)=>Boolean=(_:VD,_:VD)=>true):VertexRDD[Map[Int,Set[VD]]]={ valndegs:VertexRDD[Map[Int,Set[VertexId]]]=aggNdegreedVertices(graph,choosedVertex,degree,sendFilter) valflated:RDD[Ver[VD]]=ndegs.flatMap(e=>e._2.flatMap(t=>t._2.map(s=>Ver(e._1,s,t._1,null.asInstanceOf[VD])))).persist(StorageLevel.MEMORY_AND_DISK_SER) valmatched:RDD[Ver[VD]]=flated.map(e=>(e.id,e)).join(graph.vertices).map(e=>e._2._1.copy(attr=e._2._2)).persist(StorageLevel.MEMORY_AND_DISK_SER) flated.unpersist(blocking=false) ndegs.unpersist(blocking=false) valgrouped:RDD[(VertexId,Map[Int,Set[VD]])]=matched.map(e=>(e.source,ArrayBuffer(e))).reduceByKey(_++=_).map(e=>(e._1,e._2.map(t=>(t.degree,Set(t.attr))).reduceByKey(_++_).toMap)) matched.unpersist(blocking=false) VertexRDD(grouped) } defaggNdegreedVertices[VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED], choosedVertex:RDD[VertexId], degree:Int, sendFilter:(VD,VD)=>Boolean=(_:VD,_:VD)=>true ):VertexRDD[Map[Int,Set[VertexId]]]={ if(degree<1){ thrownewInvalidParameterException("度参数错误:"+degree) } valinitVertex=choosedVertex.map(e=>(e,true)).persist(StorageLevel.MEMORY_AND_DISK_SER) varg:Graph[DegVertex[VD],Int]=graph.outerJoinVertices(graph.degrees)((_,old,deg)=>(deg.getOrElse(0),old)) .subgraph(vpred=(_,a)=>a._1<=maxDegree) //去掉大节点 .outerJoinVertices(initVertex)((id,old,hasReceivedMsg)=>{ DegVertex(old._2,hasReceivedMsg.getOrElse(false),ArrayBuffer((id,0)))//初始化要发消息的节点 }).mapEdges(_=>0).cache()//简化边属性 choosedVertex.unpersist(blocking=false) vari=0 varprevG:Graph[DegVertex[VD],Int]=null varnewVertexRdd:VertexRDD[ArrayBuffer[(VertexId,Int)]]=null while(ireduceVertexIds(a++b)).persist(StorageLevel.MEMORY_AND_DISK_SER) g=g.outerJoinVertices(newVertexRdd)((vid,old,msg)=>if(msg.isDefined)updateVertexByMsg(vid,old,msg.get)elseold.copy(init=false)).cache() prevG.unpersistVertices(blocking=false) prevG.edges.unpersist(blocking=false) newVertexRdd.unpersist(blocking=false) i+=1 } newVertexRdd.unpersist(blocking=false) valmaped=g.vertices.join(initVertex).mapValues(e=>sortResult(e._1)).persist(StorageLevel.MEMORY_AND_DISK_SER) initVertex.unpersist() g.unpersist(blocking=false) VertexRDD(maped) } privatecaseclassVer[VD:ClassTag](source:VertexId,id:VertexId,degree:Int,attr:VD=null.asInstanceOf[VD]) privatedefupdateVertexByMsg[VD:ClassTag](vertexId:VertexId,oldAttr:DegVertex[VD],msg:ArrayBuffer[(VertexId,Int)]):DegVertex[VD]={ valaddOne=msg.map(e=>(e._1,e._2+1)) valnewMsg=reduceVertexIds(oldAttr.degVertices++addOne) oldAttr.copy(init=msg.nonEmpty,degVertices=newMsg) } privatedefsortResult[VD:ClassTag](degs:DegVertex[VD]):Map[Int,Set[VertexId]]=degs.degVertices.map(e=>(e._2,Set(e._1))).reduceByKey(_++_).toMap caseclassDegVertex[VD:ClassTag](varattr:VD,init:Boolean=false,degVertices:ArrayBuffer[(VertexId,Int)]) caseclassVertexDegInfo[VD:ClassTag](varattr:VD,init:Boolean=false,degVertices:ArrayBuffer[(VertexId,Int)]) privatedefsendMsg[VD:ClassTag](e:EdgeContext[DegVertex[VD],Int,ArrayBuffer[(VertexId,Int)]],sendFilter:(VD,VD)=>Boolean):Unit={ try{ valsrc=e.srcAttr valdst=e.dstAttr //只有dst是ready状态才接收消息 if(src.degVertices.size println(s"==========errorfound:exception:${ex.getMessage},"+ s"edgeTriplet:(srcId:${e.srcId},srcAttr:(${e.srcAttr.attr},${e.srcAttr.init},${e.srcAttr.degVertices.size})),"+ s"dstId:${e.dstId},dstAttr:(${e.dstAttr.attr},${e.dstAttr.init},${e.dstAttr.degVertices.size}),attr:${e.attr}") ex.printStackTrace() throwex } } privatedefreduceVertexIds(ids:ArrayBuffer[(VertexId,Int)]):ArrayBuffer[(VertexId,Int)]=ArrayBuffer()++=ids.reduceByKey(Math.min) privatedefisAttrSame[VD:ClassTag](a:DegVertex[VD],b:DegVertex[VD]):Boolean=a.init==b.init&&allKeysAreSame(a.degVertices,b.degVertices) privatedefallKeysAreSame(a:ArrayBuffer[(VertexId,Int)],b:ArrayBuffer[(VertexId,Int)]):Boolean={ valaKeys=a.map(e=>e._1).toSet valbKeys=b.map(e=>e._1).toSet if(aKeys.size!=bKeys.size||aKeys.isEmpty)returnfalse aKeys.diff(bKeys).isEmpty&&bKeys.diff(aKeys).isEmpty } }
其中sortResult方法里对Traversable[(K,V)]类型的集合使用了reduceByKey方法,这个方法是自行封装的,使用时需要导入,代码如下:
/** *Createdbyyepei.yeon2016/12/21. *Description: */ objectCollectionUtil{ /** *对具有Traversable[(K,V)]类型的集合添加reduceByKey相关方法 * *@paramcollection *@paramkt *@paramvt *@tparamK *@tparamV */ implicitclassCollectionHelper[K,V](collection:Traversable[(K,V)])(implicitkt:ClassTag[K],vt:ClassTag[V]){ defreduceByKey(f:(V,V)=>V):Traversable[(K,V)]=collection.groupBy(_._1).map{case(_:K,values:Traversable[(K,V)])=>values.reduce((a,b)=>(a._1,f(a._2,b._2)))} /** *reduceByKey的同时,返回被reduce掉的元素的集合 * *@paramf *@return */ defreduceByKeyWithReduced(f:(V,V)=>V)(implicitkt:ClassTag[K],vt:ClassTag[V]):(Traversable[(K,V)],Traversable[(K,V)])={ valreduced:ArrayBuffer[(K,V)]=ArrayBuffer() valnewSeq=collection.groupBy(_._1).map{ case(_:K,values:Traversable[(K,V)])=>values.reduce((a,b)=>{ valnewValue:V=f(a._2,b._2) valreducedValue:V=if(newValue==a._2)b._2elsea._2 valreducedPair:(K,V)=(a._1,reducedValue) reduced+=reducedPair (a._1,newValue) }) } (newSeq,reduced.toTraversable) } } }
总结
以上就是本文关于SparkGraphx计算指定节点的N度关系节点源码的全部内容了,希望对大家有所帮助。感兴趣的朋友可以参阅:浅谈七种常见的Hadoop和Spark项目案例 Spark的广播变量和累加器使用方法代码示例 Spark入门简介等,有什么问题请留言,小编会及时回复大家的。