Spark处理数据排序问题如何避免OOM
错误思想
举个列子,当我们想要比较一个类型为RDD[(Long,(String,Int))]的RDD,让它先按Long分组,然后按int的值进行倒序排序,最容易想到的思维就是先分组,然后把Iterable转换为list,然后sortby,但是这样却有一个致命的缺点,就是Iterable在内存中是一个指针,不占内存,而list是一个容器,占用内存,如果Iterable含有元素过多,那么极易引起OOM
valcidAndSidCountGrouped:RDD[(Long,Iterable[(String,Int)])]=cidAndSidCount.groupByKey()
//4.排序,取top10
valresult:RDD[(Long,List[(String,Int)])]=cidAndSidCountGrouped.map{
case(cid,sidCountIt)=>
//sidCountIt排序,取前10
//Iterable转成容器式集合的时候,如果数据量过大,极有可能导致oom
(cid,sidCountIt.toList.sortBy(-_._2).take(5))
}
首先,我们要知道,RDD的排序需要shuffle,是采用了内存+磁盘来完成的排序.这样能有效避免OOM的风险,但是RDD是全部排序,所以需要针对性的过滤Key值来进行排序
方法一利用RDD排序特点
//把long(即key值)提取出来
valcids:List[Long]=categoryCountList.map(_.cid.toLong)
valbuffer:ListBuffer[(Long,List[(String,Int)])]=ListBuffer[(Long,List[(String,Int)])]()
//根据每个key来过滤RDD
for(cid<-cids){
/*
List((15,(632972a4-f811-4000-b920-dc12ea803a41,10)),(15,(f34878b8-1784-4d81-a4d1-0c93ce53e942,8)),(15,(5e3545a0-1521-4ad6-91fe-e792c20c46da,8)),(15,(66a421b0-839d-49ae-a386-5fa3ed75226f,8)),(15,(9fa653ec-5a22-4938-83c5-21521d083cd0,8)))
目标:
(9,List((199f8e1d-db1a-4174-b0c2-ef095aaef3ee,9),(329b966c-d61b-46ad-949a-7e37142d384a,8),(5e3545a0-1521-4ad6-91fe-e792c20c46da,8),(e306c00b-a6c5-44c2-9c77-15e919340324,7),(bed60a57-3f81-4616-9e8b-067445695a77,7)))
*/
valarr:Array[(String,Int)]=cidAndSidCount.filter(cid==_._1)
.sortBy(-_._2._2)
.take(5)
.map(_._2)
buffer+=((cid,arr.toList))
}
buffer.foreach(println)
这样做也有缺点:即有多少个key,就有多少个Job,占用资源
方法二利用TreeSet自动排序特性
defstatCategoryTop10Session_3(sc:SparkContext,
categoryCountList:List[CategroyCount],
userVisitActionRDD:RDD[UserVisitAction])={
//1.过滤出来top10品类的所有点击记录
//1.1先map出来top10的品类id
valcids=categoryCountList.map(_.cid.toLong)
valtopCategoryActionRDD:RDD[UserVisitAction]=userVisitActionRDD.filter(action=>cids.contains(action.click_category_id))
//2.计算每个品类下的每个session的点击量rdd((cid,sid),1)
valcidAndSidCount:RDD[(Long,(String,Int))]=topCategoryActionRDD
.map(action=>((action.click_category_id,action.session_id),1))
//使用自定义分区器重点理解分区器的原理
.reduceByKey(newCategoryPartitioner(cids),_+_)
.map{
case((cid,sid),count)=>(cid,(sid,count))
}
//3.排序取top10
//因为已经按key分好了区,所以用Mappartitions,在每个分区中新建一个TreeSet即可
valresult:RDD[(Long,List[SessionInfo])]=cidAndSidCount.mapPartitions((it:Iterator[(Long,(String,Int))])=>{
//new一个TreeSet,并同时指定排序规则
vartreeSet:mutable.TreeSet[CategorySession]=newmutable.TreeSet[CategorySession]()(newOrdering[CategorySession]{
overridedefcompare(x:CategorySession,y:CategorySession):Int={
if(x.clickCount>=y.clickCount)-1else1
}
})
varid=0l
iter.foreach({
case(l,session)=>{
id=l
treeSet.add(session)
if(treeSet.size>10)treeSet=treeSet.take(10)
}
})
Iterator(id,treeSet)
})
result.collect.foreach(println)
Thread.sleep(1000000)
}
}
/*
根据传入的key值来决定分区号,让相同key进入相同的分区,能够避免多次shuffle
*/
classCategoryPartitioner(cids:List[Long])extendsPartitioner{
//用cid索引,作为将来他的分区索引.
privatevalcidWithIndex:Map[Long,Int]=cids.zipWithIndex.toMap
//返回集合的长度
overridedefnumPartitions:Int=cids.length
//根据key返回分区的索引
overridedefgetPartition(key:Any):Int={
keymatch{
//根据品类id返回分区的索引!0-9
case(cid:Long,_)=>
cidWithIndex(cid)
}
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。