SparkGraphx计算指定节点的N度关系节点源码
直接上代码:
packagehorizon.graphx.util
importjava.security.InvalidParameterException
importhorizon.graphx.util.CollectionUtil.CollectionHelper
importorg.apache.spark.graphx._
importorg.apache.spark.rdd.RDD
importorg.apache.spark.storage.StorageLevel
importscala.collection.mutable.ArrayBuffer
importscala.reflect.ClassTag
/**
*Createdbyyepei.yeon2017/1/19.
*Description:用于在图中为指定的节点计算这些节点的N度关系节点,输出这些节点与源节点的路径长度和节点id
*/
objectGraphNdegUtil{
valmaxNDegVerticesCount=10000
valmaxDegree=1000
/**
*计算节点的N度关系
*
*@paramedges
*@paramchoosedVertex
*@paramdegree
*@tparamED
*@return
*/
defaggNdegreedVertices[ED:ClassTag](edges:RDD[(VertexId,VertexId)],choosedVertex:RDD[VertexId],degree:Int):VertexRDD[Map[Int,Set[VertexId]]]={
valsimpleGraph=Graph.fromEdgeTuples(edges,0,Option(PartitionStrategy.EdgePartition2D),StorageLevel.MEMORY_AND_DISK_SER,StorageLevel.MEMORY_AND_DISK_SER)
aggNdegreedVertices(simpleGraph,choosedVertex,degree)
}
defaggNdegreedVerticesWithAttr[VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],choosedVertex:RDD[VertexId],degree:Int,sendFilter:(VD,VD)=>Boolean=(_:VD,_:VD)=>true):VertexRDD[Map[Int,Set[VD]]]={
valndegs:VertexRDD[Map[Int,Set[VertexId]]]=aggNdegreedVertices(graph,choosedVertex,degree,sendFilter)
valflated:RDD[Ver[VD]]=ndegs.flatMap(e=>e._2.flatMap(t=>t._2.map(s=>Ver(e._1,s,t._1,null.asInstanceOf[VD])))).persist(StorageLevel.MEMORY_AND_DISK_SER)
valmatched:RDD[Ver[VD]]=flated.map(e=>(e.id,e)).join(graph.vertices).map(e=>e._2._1.copy(attr=e._2._2)).persist(StorageLevel.MEMORY_AND_DISK_SER)
flated.unpersist(blocking=false)
ndegs.unpersist(blocking=false)
valgrouped:RDD[(VertexId,Map[Int,Set[VD]])]=matched.map(e=>(e.source,ArrayBuffer(e))).reduceByKey(_++=_).map(e=>(e._1,e._2.map(t=>(t.degree,Set(t.attr))).reduceByKey(_++_).toMap))
matched.unpersist(blocking=false)
VertexRDD(grouped)
}
defaggNdegreedVertices[VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],
choosedVertex:RDD[VertexId],
degree:Int,
sendFilter:(VD,VD)=>Boolean=(_:VD,_:VD)=>true
):VertexRDD[Map[Int,Set[VertexId]]]={
if(degree<1){
thrownewInvalidParameterException("度参数错误:"+degree)
}
valinitVertex=choosedVertex.map(e=>(e,true)).persist(StorageLevel.MEMORY_AND_DISK_SER)
varg:Graph[DegVertex[VD],Int]=graph.outerJoinVertices(graph.degrees)((_,old,deg)=>(deg.getOrElse(0),old))
.subgraph(vpred=(_,a)=>a._1<=maxDegree)
//去掉大节点
.outerJoinVertices(initVertex)((id,old,hasReceivedMsg)=>{
DegVertex(old._2,hasReceivedMsg.getOrElse(false),ArrayBuffer((id,0)))//初始化要发消息的节点
}).mapEdges(_=>0).cache()//简化边属性
choosedVertex.unpersist(blocking=false)
vari=0
varprevG:Graph[DegVertex[VD],Int]=null
varnewVertexRdd:VertexRDD[ArrayBuffer[(VertexId,Int)]]=null
while(ireduceVertexIds(a++b)).persist(StorageLevel.MEMORY_AND_DISK_SER)
g=g.outerJoinVertices(newVertexRdd)((vid,old,msg)=>if(msg.isDefined)updateVertexByMsg(vid,old,msg.get)elseold.copy(init=false)).cache()
prevG.unpersistVertices(blocking=false)
prevG.edges.unpersist(blocking=false)
newVertexRdd.unpersist(blocking=false)
i+=1
}
newVertexRdd.unpersist(blocking=false)
valmaped=g.vertices.join(initVertex).mapValues(e=>sortResult(e._1)).persist(StorageLevel.MEMORY_AND_DISK_SER)
initVertex.unpersist()
g.unpersist(blocking=false)
VertexRDD(maped)
}
privatecaseclassVer[VD:ClassTag](source:VertexId,id:VertexId,degree:Int,attr:VD=null.asInstanceOf[VD])
privatedefupdateVertexByMsg[VD:ClassTag](vertexId:VertexId,oldAttr:DegVertex[VD],msg:ArrayBuffer[(VertexId,Int)]):DegVertex[VD]={
valaddOne=msg.map(e=>(e._1,e._2+1))
valnewMsg=reduceVertexIds(oldAttr.degVertices++addOne)
oldAttr.copy(init=msg.nonEmpty,degVertices=newMsg)
}
privatedefsortResult[VD:ClassTag](degs:DegVertex[VD]):Map[Int,Set[VertexId]]=degs.degVertices.map(e=>(e._2,Set(e._1))).reduceByKey(_++_).toMap
caseclassDegVertex[VD:ClassTag](varattr:VD,init:Boolean=false,degVertices:ArrayBuffer[(VertexId,Int)])
caseclassVertexDegInfo[VD:ClassTag](varattr:VD,init:Boolean=false,degVertices:ArrayBuffer[(VertexId,Int)])
privatedefsendMsg[VD:ClassTag](e:EdgeContext[DegVertex[VD],Int,ArrayBuffer[(VertexId,Int)]],sendFilter:(VD,VD)=>Boolean):Unit={
try{
valsrc=e.srcAttr
valdst=e.dstAttr
//只有dst是ready状态才接收消息
if(src.degVertices.size
println(s"==========errorfound:exception:${ex.getMessage},"+
s"edgeTriplet:(srcId:${e.srcId},srcAttr:(${e.srcAttr.attr},${e.srcAttr.init},${e.srcAttr.degVertices.size})),"+
s"dstId:${e.dstId},dstAttr:(${e.dstAttr.attr},${e.dstAttr.init},${e.dstAttr.degVertices.size}),attr:${e.attr}")
ex.printStackTrace()
throwex
}
}
privatedefreduceVertexIds(ids:ArrayBuffer[(VertexId,Int)]):ArrayBuffer[(VertexId,Int)]=ArrayBuffer()++=ids.reduceByKey(Math.min)
privatedefisAttrSame[VD:ClassTag](a:DegVertex[VD],b:DegVertex[VD]):Boolean=a.init==b.init&&allKeysAreSame(a.degVertices,b.degVertices)
privatedefallKeysAreSame(a:ArrayBuffer[(VertexId,Int)],b:ArrayBuffer[(VertexId,Int)]):Boolean={
valaKeys=a.map(e=>e._1).toSet
valbKeys=b.map(e=>e._1).toSet
if(aKeys.size!=bKeys.size||aKeys.isEmpty)returnfalse
aKeys.diff(bKeys).isEmpty&&bKeys.diff(aKeys).isEmpty
}
}
其中sortResult方法里对Traversable[(K,V)]类型的集合使用了reduceByKey方法,这个方法是自行封装的,使用时需要导入,代码如下:
/**
*Createdbyyepei.yeon2016/12/21.
*Description:
*/
objectCollectionUtil{
/**
*对具有Traversable[(K,V)]类型的集合添加reduceByKey相关方法
*
*@paramcollection
*@paramkt
*@paramvt
*@tparamK
*@tparamV
*/
implicitclassCollectionHelper[K,V](collection:Traversable[(K,V)])(implicitkt:ClassTag[K],vt:ClassTag[V]){
defreduceByKey(f:(V,V)=>V):Traversable[(K,V)]=collection.groupBy(_._1).map{case(_:K,values:Traversable[(K,V)])=>values.reduce((a,b)=>(a._1,f(a._2,b._2)))}
/**
*reduceByKey的同时,返回被reduce掉的元素的集合
*
*@paramf
*@return
*/
defreduceByKeyWithReduced(f:(V,V)=>V)(implicitkt:ClassTag[K],vt:ClassTag[V]):(Traversable[(K,V)],Traversable[(K,V)])={
valreduced:ArrayBuffer[(K,V)]=ArrayBuffer()
valnewSeq=collection.groupBy(_._1).map{
case(_:K,values:Traversable[(K,V)])=>values.reduce((a,b)=>{
valnewValue:V=f(a._2,b._2)
valreducedValue:V=if(newValue==a._2)b._2elsea._2
valreducedPair:(K,V)=(a._1,reducedValue)
reduced+=reducedPair
(a._1,newValue)
})
}
(newSeq,reduced.toTraversable)
}
}
}
总结
以上就是本文关于SparkGraphx计算指定节点的N度关系节点源码的全部内容了,希望对大家有所帮助。感兴趣的朋友可以参阅:浅谈七种常见的Hadoop和Spark项目案例 Spark的广播变量和累加器使用方法代码示例 Spark入门简介等,有什么问题请留言,小编会及时回复大家的。