Spark的广播变量和累加器使用方法代码示例
一、广播变量和累加器
通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。
1.1广播变量:
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
Spark的动作通过一系列的步骤执行,这些步骤由分布式的shuffle操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
通过在一个变量v上调用SparkContext.broadcast(v)可以创建广播变量。广播变量是围绕着v的封装,可以通过value方法访问这个变量。举例如下:
scala>valbroadcastVar=sc.broadcast(Array(1,2,3)) broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0) scala>broadcastVar.value res0:Array[Int]=Array(1,2,3)
在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改。
1.2累加器:
累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持)
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。
下面的代码展示了如何把一个数组中的所有元素累加到累加器上:
scala>valaccum=sc.accumulator(0,"MyAccumulator") accum:spark.Accumulator[Int]=0 scala>sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x) ... 10/09/2918:41:08INFOSparkContext:Tasksfinishedin0.317106s scala>accum.value res2:Int=10
尽管上面的例子使用了内置支持的累加器类型Int,但是开发人员也可以通过继承AccumulatorParam类来创建它们自己的累加器类型。AccumulatorParam接口有两个方法:
zero方法为你的类型提供一个0值。
addInPlace方法将两个值相加。
假设我们有一个代表数学vector的Vector类。我们可以向下面这样实现:
objectVectorAccumulatorParamextendsAccumulatorParam[Vector]{
defzero(initialValue:Vector):Vector={
Vector.zeros(initialValue.size)
}
defaddInPlace(v1:Vector,v2:Vector):Vector={
v1+=v2
}
}
//Then,createanAccumulatorofthistype:
valvecAccum=sc.accumulator(newVector(...))(VectorAccumulatorParam)
在Scala里,Spark提供更通用的累加接口来累加数据,尽管结果的类型和累加的数据类型可能不一致(例如,通过收集在一起的元素来创建一个列表)。同时,SparkContext..accumulableCollection方法来累加通用的Scala的集合类型。
累加器仅仅在动作操作内部被更新,Spark保证每个任务在累加器上的更新操作只被执行一次,也就是说,重启任务也不会更新。在转换操作中,用户必须意识到每个任务对累加器的更新操作可能被不只一次执行,如果重新执行了任务和作业的阶段。
累加器并没有改变Spark的惰性求值模型。如果它们被RDD上的操作更新,它们的值只有当RDD因为动作操作被计算时才被更新。因此,当执行一个惰性的转换操作,比如map时,不能保证对累加器值的更新被实际执行了。下面的代码片段演示了此特性:
valaccum=sc.accumulator(0)
data.map{x=>accum+=x;f(x)}
//在这里,accum的值仍然是0,因为没有动作操作引起map被实际的计算.
二.Java和Scala版本的实战演示
2.1Java版本:
/**
*实例:利用广播进行黑名单过滤!
*检查新的数据根据是否在广播变量-黑名单内,从而实现过滤数据。
*/
publicclassBroadcastAccumulator{
/**
*创建一个List的广播变量
*
*/
privatestaticvolatileBroadcast>broadcastList=null;
/**
*计数器!
*/
privatestaticvolatileAccumulatoraccumulator=null;
publicstaticvoidmain(String[]args){
SparkConfconf=newSparkConf().setMaster("local[2]").
setAppName("WordCountOnlineBroadcast");
JavaStreamingContextjsc=newJavaStreamingContext(conf,Durations.seconds(5));
/**
*注意:分发广播需要一个action操作触发。
*注意:广播的是Arrays的asList而非对象的引用。广播Array数组的对象引用会出错。
*使用broadcast广播黑名单到每个Executor中!
*/
broadcastList=jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
/**
*累加器作为全局计数器!用于统计在线过滤了多少个黑名单!
*在这里实例化。
*/
accumulator=jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");
JavaReceiverInputDStreamlines=jsc.socketTextStream("Master",9999);
/**
*这里省去flatmap因为名单是一个个的!
*/
JavaPairDStreampairs=lines.mapToPair(newPairFunction(){
@Override
publicTuple2call(Stringword){
returnnewTuple2(word,1);
}
});
JavaPairDStreamwordsCount=pairs.reduceByKey(newFunction2(){
@Override
publicIntegercall(Integerv1,Integerv2){
returnv1+v2;
}
});
/**
*Funtion里面前几个参数是入参。
*后面的出参。
*体现在call方法里面!
*
*/
wordsCount.foreach(newFunction2,Time,Void>(){
@Override
publicVoidcall(JavaPairRDDrdd,Timetime)throwsException{
rdd.filter(newFunction,Boolean>(){
@Override
publicBooleancall(Tuple2wordPair)throwsException{
if(broadcastList.value().contains(wordPair._1)){
/**
*accumulator不仅仅用来计数。
*可以同时写进数据库或者缓存中。
*/
accumulator.add(wordPair._2);
returnfalse;
}else{
returntrue;
}
};
/**
*广播和计数器的执行,需要进行一个action操作!
*/
}).collect();
System.out.println("广播器里面的值"+broadcastList.value());
System.out.println("计时器里面的值"+accumulator.value());
returnnull;
}
});
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
2.2Scala版本
packagecom.Streaming
importjava.util
importorg.apache.spark.streaming.{Duration,StreamingContext}
importorg.apache.spark.{Accumulable,Accumulator,SparkContext,SparkConf}
importorg.apache.spark.broadcast.Broadcast
/**
*Createdbylxhon2016/6/30.
*/
objectBroadcastAccumulatorStreaming{
/**
*声明一个广播和累加器!
*/
privatevarbroadcastList:Broadcast[List[String]]=_
privatevaraccumulator:Accumulator[Int]=_
defmain(args:Array[String]){
valsparkConf=newSparkConf().setMaster("local[4]").setAppName("broadcasttest")
valsc=newSparkContext(sparkConf)
/**
*duration是ms
*/
valssc=newStreamingContext(sc,Duration(2000))
//broadcastList=ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
broadcastList=ssc.sparkContext.broadcast(List("Hadoop","Spark"))
accumulator=ssc.sparkContext.accumulator(0,"broadcasttest")
/**
*获取数据!
*/
vallines=ssc.socketTextStream("localhost",9999)
/**
*1.flatmap把行分割成词。
*2.map把词变成tuple(word,1)
*3.reducebykey累加value
*(4.sortBykey排名)
*4.进行过滤。value是否在累加器中。
*5.打印显示。
*/
valwords=lines.flatMap(line=>line.split(""))
valwordpair=words.map(word=>(word,1))
wordpair.filter(record=>{broadcastList.value.contains(record._1)})
valpair=wordpair.reduceByKey(_+_)
/**
*这个pair是PairDStream
*查看这个id是否在黑名单中,如果是的话,累加器就+1
*/
/*pair.foreachRDD(rdd=>{
rdd.filter(record=>{
if(broadcastList.value.contains(record._1)){
accumulator.add(1)
returntrue
}else{
returnfalse
}
})
})*/
valfiltedpair=pair.filter(record=>{
if(broadcastList.value.contains(record._1)){
accumulator.add(record._2)
true
}else{
false
}
}).print
println("累加器的值"+accumulator.value)
//pair.filter(record=>{broadcastList.value.contains(record._1)})
/*valkeypair=pair.map(pair=>(pair._2,pair._1))*/
/**
*如果DStream自己没有某个算子操作。就通过转化transform!
*/
/*keypair.transform(rdd=>{
rdd.sortByKey(false)//TODO
})*/
pair.print()
ssc.start()
ssc.awaitTermination()
}
}
总结
以上就是本文关于Spark的广播变量和累加器使用方法代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:详解Java编写并运行spark应用程序的方法 、Spark入门简介等,有什么问题可以随时留言,小编会及时回复大家。感谢朋友们对毛票票网站的支持。