Java lambda表达式实现Flink WordCount过程解析
这篇文章主要介绍了Javalambda表达式实现FlinkWordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
本篇我们将使用Java语言来实现Flink的单词统计。
代码开发
环境准备
导入Flink1.9pom依赖
org.apache.flink flink-java 1.9.0 org.apache.flink flink-streaming-java_2.11 1.9.0 org.apache.commons commons-lang3 3.7
构建Flink流处理环境
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
自定义source
每秒生成一行文本
DataStreamSourcewordLineDS=env.addSource(newRichSourceFunction (){ privatebooleanisCanal=false; privateString[]words={ "importantoraclejdklicenseupdate", "theoraclejdklicensehaschangedforreleasesstartingapril162019", "theneworacletechnologynetworklicenseagreementfororaclejavaseissubstantiallydifferentfrompriororaclejdklicensesthenewlicensepermitscertainusessuchas", "personaluseanddevelopmentuseatnocostbutotherusesauthorizedunderpriororaclejdklicensesmaynolongerbeavailablepleasereviewthetermscarefullybefore", "downloadingandusingthisproductanfaqisavailablehere", "commerciallicenseandsupportisavailablewithalowcostjavasesubscription", "oraclealsoprovidesthelatestopenjdkreleaseundertheopensourcegpllicenseatjdkjavanet" }; @Override publicvoidrun(SourceContext ctx)throwsException{ //每秒发送一行文本 while(!isCanal){ intrandomIndex=RandomUtils.nextInt(0,words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override publicvoidcancel(){ isCanal=true; } });
单词计算
//3.单词统计 //3.1将文本行切分成一个个的单词 SingleOutputStreamOperatorwordsDS=wordLineDS.flatMap((Stringline,Collector ctx)->{ //切分单词 Arrays.stream(line.split("")).forEach(word->{ ctx.collect(word); }); }).returns(Types.STRING); //3.2将单词转换为一个个的元组 SingleOutputStreamOperator >tupleDS=wordsDS .map(word->Tuple2.of(word,1)) .returns(Types.TUPLE(Types.STRING,Types.INT)); //3.3按照单词进行分组 KeyedStream ,String>keyedDS=tupleDS.keyBy(tuple->tuple.f0); //3.4对每组单词数量进行累加 SingleOutputStreamOperator >resultDS=keyedDS .timeWindow(Time.seconds(3)) .reduce((t1,t2)->Tuple2.of(t1.f0,t1.f1+t2.f1)); resultDS.print();
参考代码
publicclassWordCount{ publicstaticvoidmain(String[]args)throwsException{ //1.构建Flink流式初始化环境 StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2.自定义source-每秒发送一行文本 DataStreamSourcewordLineDS=env.addSource(newRichSourceFunction (){ privatebooleanisCanal=false; privateString[]words={ "importantoraclejdklicenseupdate", "theoraclejdklicensehaschangedforreleasesstartingapril162019", "theneworacletechnologynetworklicenseagreementfororaclejavaseissubstantiallydifferentfrompriororaclejdklicensesthenewlicensepermitscertainusessuchas", "personaluseanddevelopmentuseatnocostbutotherusesauthorizedunderpriororaclejdklicensesmaynolongerbeavailablepleasereviewthetermscarefullybefore", "downloadingandusingthisproductanfaqisavailablehere", "commerciallicenseandsupportisavailablewithalowcostjavasesubscription", "oraclealsoprovidesthelatestopenjdkreleaseundertheopensourcegpllicenseatjdkjavanet" }; @Override publicvoidrun(SourceContext ctx)throwsException{ //每秒发送一行文本 while(!isCanal){ intrandomIndex=RandomUtils.nextInt(0,words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override publicvoidcancel(){ isCanal=true; } }); //3.单词统计 //3.1将文本行切分成一个个的单词 SingleOutputStreamOperator wordsDS=wordLineDS.flatMap((Stringline,Collector ctx)->{ //切分单词 Arrays.stream(line.split("")).forEach(word->{ ctx.collect(word); }); }).returns(Types.STRING); //3.2将单词转换为一个个的元组 SingleOutputStreamOperator >tupleDS=wordsDS .map(word->Tuple2.of(word,1)) .returns(Types.TUPLE(Types.STRING,Types.INT)); //3.3按照单词进行分组 KeyedStream ,String>keyedDS=tupleDS.keyBy(tuple->tuple.f0); //3.4对每组单词数量进行累加 SingleOutputStreamOperator >resultDS=keyedDS .timeWindow(Time.seconds(3)) .reduce((t1,t2)->Tuple2.of(t1.f0,t1.f1+t2.f1)); resultDS.print(); env.execute("app"); } }
Flink对JavaLambda表达式支持情况
Flink支持JavaAPI所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。
我们来看下上述的这段代码:
SingleOutputStreamOperatorwordsDS=wordLineDS.flatMap((Stringline,Collector ctx)->{ //切分单词 Arrays.stream(line.split("")).forEach(word->{ ctx.collect(word); }); }).returns(Types.STRING);
之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码
@Public @FunctionalInterface publicinterfaceFlatMapFunctionextendsFunction,Serializable{ /** *ThecoremethodoftheFlatMapFunction.Takesanelementfromtheinputdatasetandtransforms *itintozero,one,ormoreelements. * *@paramvalueTheinputvalue. *@paramoutThecollectorforreturningresultvalues. * *@throwsExceptionThismethodmaythrowexceptions.Throwinganexceptionwillcausetheoperation *tofailandmaytriggerrecovery. */ voidflatMap(Tvalue,Collector out)throwsException; }
我们发现flatMap的第二个参数是Collector
voidflatMap(Tvalue,Collectorout)
这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:
org.apache.flink.api.common.functions.InvalidTypesException:Thegenerictypeparametersof'Collector'aremissing. Inmanycaseslambdamethodsdon'tprovideenoughinformationforautomatictypeextractionwhenJavagenericsareinvolved. Aneasyworkaroundistousean(anonymous)classinsteadthatimplementsthe'org.apache.flink.api.common.functions.FlatMapFunction'interface. Otherwisethetypehastobespecifiedexplicitlyusingtypeinformation.
这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。
所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息
我们再看一段代码:
SingleOutputStreamOperator>tupleDS=wordsDS .map(word->Tuple2.of(word,1)) .returns(Types.TUPLE(Types.STRING,Types.INT));
为什么map后面也需要指定类型呢?
因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。
更多关于对JavaLambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。