Apache FlinkCEP 实现超时状态监控的步骤详解
CEP-ComplexEventProcessing复杂事件处理。
订单下单后超过一定时间还未进行支付确认。
打车订单生成后超过一定时间没有确认上车。
外卖超过预定送达时间一定时限还没有确认送达。
ApacheFlinkCEPAPI
CEPTimeoutEventJob
FlinkCEP源码简析
DataStream和PatternStream
DataStream一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。
PatternStream是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。
CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:
public staticSingleOutputStreamOperator createPatternStream(...){...} public static SingleOutputStreamOperator createTimeoutPatternStream(...){...} final SingleOutputStreamOperator patternStream;
SingleOutputStreamOperator
@Public public class SingleOutputStreamOperatorextends DataStream {...}
PatternStream的构造方法:
PatternStream ( final DataStreaminputStream, final Pattern pattern){ this .inputStream=inputStream; this .pattern=pattern; this .comparator= null ; } PatternStream ( final DataStream inputStream, final Pattern pattern, final EventComparator comparator){ this .inputStream=inputStream; this .pattern=pattern; this .comparator=comparator; }
Pattern、Quantifier和EventComparator
Pattern是模式定义的BaseClass,Builder模式,定义好的模式会被NFACompiler用来生成NFA。
如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。
public class Pattern{ /**模式名称*/ private final String name; /**前面一个模式*/ private final Pattern previous; /**一个事件如果要被当前模式匹配到,必须满足的约束条件*/ private IterativeCondition condition; /**时间窗口长度,在时间长度内进行模式匹配*/ private Time windowTime; /**模式量词,意思是一个模式匹配几个事件等默认是匹配到一个*/ private Quantifier quantifier= Quantifier .one( ConsumingStrategy .STRICT); /**停止将事件收集到循环状态时,事件必须满足的条件*/ private IterativeCondition untilCondition; /** *适用于{@codetimes}模式,用来维护模式里事件可以连续发生的次数 */ private Times times; //匹配到事件之后的跳过策略 private final AfterMatchSkipStrategy afterMatchSkipStrategy; ... }
Quantifier是用来描述具体模式行为的,主要有三大类:
Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。
每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。
循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。
public
class
Quantifier
{
...
/**
*5个属性,可以组合,但并非所有的组合都是有效的
*/
public
enum
QuantifierProperty
{
SINGLE,
LOOPING,
TIMES,
OPTIONAL,
GREEDY
}
/**
*描述在此模式中匹配哪些事件的策略
*/
public
enum
ConsumingStrategy
{
STRICT,
SKIP_TILL_NEXT,
SKIP_TILL_ANY,
NOT_FOLLOW,
NOT_NEXT
}
/**
*描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到
*/
public
static
class
Times
{
private
final
int
from;
private
final
int
to;
private
Times
(
int
from,
int
to){
Preconditions
.checkArgument(from>
0
,
"Thefromshouldbeapositivenumbergreaterthan0."
);
Preconditions
.checkArgument(to>=from,
"Thetoshouldbeanumbergreaterthanorequaltofrom:"
+from+
"."
);
this
.from=from;
this
.to=to;
}
public
int
getFrom(){
return
from;
}
public
int
getTo(){
return
to;
}
//次数范围
public
static
Times
of(
int
from,
int
to){
return
new
Times
(from,to);
}
//指定具体次数
public
static
Times
of(
int
times){
return
new
Times
(times,times);
}
@Override
public
boolean
equals(
Object
o){
if
(
this
==o){
return
true
;
}
if
(o==
null
||getClass()!=o.getClass()){
return
false
;
}
Times
times=(
Times
)o;
return
from==times.from&&
to==times.to;
}
@Override
public
int
hashCode(){
return
Objects
.hash(from,to);
}
}
...
}
EventComparator,自定义事件比较器,实现EventComparator接口。
public interface EventComparatorextends Comparator , Serializable { long serialVersionUID= 1L ; }
NFACompiler和NFA
NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。
public
class
NFACompiler
{
...
/**
*NFAFactory创建NFA的接口
*
*@paramTypeoftheinputeventswhichareprocessedbytheNFA
*/
public
interface
NFAFactory
extends
Serializable
{
NFAcreateNFA();
}
/**
*NFAFactory的具体实现NFAFactoryImpl
*
*Theimplementationtakestheinputtypeserializer,thewindowtimeandthesetof
*statesandtheirtransitionstobeabletocreateanNFAfromthem.
*
*@paramTypeoftheinputeventswhichareprocessedbytheNFA
*/
private
static
class
NFAFactoryImpl
implements
NFAFactory
{
private
static
final
long
serialVersionUID=
8939783698296714379L
;
private
final
long
windowTime;
private
final
Collection
<
State
>states;
private
final
boolean
timeoutHandling;
private
NFAFactoryImpl
(
long
windowTime,
Collection
<
State
>states,
boolean
timeoutHandling){
this
.windowTime=windowTime;
this
.states=states;
this
.timeoutHandling=timeoutHandling;
}
@Override
public
NFAcreateNFA(){
//一个NFA由状态集合、时间窗口的长度和是否处理超时组成
return
new
NFA<>(states,windowTime,timeoutHandling);
}
}
}
NFA:Non-deterministicfiniteautomaton-非确定的有限(状态)自动机。
更多内容参见
https://zh.wikipedia.org/wiki/非确定有限状态自动机
public class NFA{ /** *NFACompiler返回的所有有效的NFA状态集合 *Thesearedirectlyderivedfromtheuser-specifiedpattern. */ private final Map < String , State >states; /** *Pattern.within(Time)指定的时间窗口长度 */ private final long windowTime; /** *一个超时匹配的标记 */ private final boolean handleTimeout; ... }
PatternSelectFunction和PatternFlatSelectFunction
当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。
public interface PatternSelectFunctionextends Function , Serializable { /** *从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识 */ OUTselect( Map < String , List >pattern) throws Exception ; }
PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector把匹配到的事件收集起来。
public interface PatternFlatSelectFunctionextends Function , Serializable { /** *生成一个或多个结果 */ void flatSelect( Map < String , List >pattern, Collector out) throws Exception ; }
SelectTimeoutCepOperator、PatternTimeoutFunction
SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。
SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。
模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。
还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。
public class SelectTimeoutCepOperatorextends AbstractKeyedCEPPatternOperator >{ private OutputTag timedOutOutputTag; public SelectTimeoutCepOperator ( TypeSerializer inputSerializer, boolean isProcessingTime, NFACompiler . NFAFactory nfaFactory, final EventComparator comparator, AfterMatchSkipStrategy skipStrategy, //参数命名混淆了flat...包括SelectWrapper类中的成员命名... PatternSelectFunction flatSelectFunction, PatternTimeoutFunction flatTimeoutFunction, OutputTag outputTag, OutputTag lateDataOutputTag){ super ( inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, new SelectWrapper <>(flatSelectFunction,flatTimeoutFunction), lateDataOutputTag); this .timedOutOutputTag=outputTag; } ... } public interface PatternTimeoutFunction extends Function , Serializable { OUTtimeout( Map < String , List >pattern, long timeoutTimestamp) throws Exception ; } public interface PatternFlatTimeoutFunction extends Function , Serializable { void timeout( Map < String , List >pattern, long timeoutTimestamp, Collector out) throws Exception ; }
CEP和CEPOperatorUtils
CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。
public
class
CEP{
public
static
PatternStream
pattern(
DataStream
input,
Pattern
pattern){
return
new
PatternStream
<>(input,pattern);
}
public
static
PatternStream
pattern(
DataStream
input,
Pattern
pattern,
EventComparator
comparator){
return
new
PatternStream
<>(input,pattern,comparator);
}
}
CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。
public
class
CEPOperatorUtils
{
...
private
static
SingleOutputStreamOperator
createPatternStream(
final
DataStream
inputStream,
final
Pattern
pattern,
final
TypeInformation
outTypeInfo,
final
boolean
timeoutHandling,
final
EventComparator
comparator,
final
OperatorBuilder
operatorBuilder){
final
TypeSerializer
inputSerializer=inputStream.getType().createSerializer(inputStream.getExecutionConfig());
//checkwhetherweuseprocessingtime
final
boolean
isProcessingTime=inputStream.getExecutionEnvironment().getStreamTimeCharacteristic()==
TimeCharacteristic
.
ProcessingTime
;
//compileourpatternintoaNFAFactorytoinstantiateNFAslateron
final
NFACompiler
.
NFAFactory
nfaFactory=
NFACompiler
.compileFactory(pattern,timeoutHandling);
final
SingleOutputStreamOperator
patternStream;
if
(inputStream
instanceof
KeyedStream
){
KeyedStream
keyedStream=(
KeyedStream
)inputStream;
patternStream=keyedStream.transform(
operatorBuilder.getKeyedOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()));
}
else
{
KeySelector
keySelector=
new
NullByteKeySelector
<>();
patternStream=inputStream.keyBy(keySelector).transform(
operatorBuilder.getOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()
)).forceNonParallel();
}
return
patternStream;
}
...
}
FlinkCEP实现步骤
- IN:DataSource->DataStream->Transformations->DataStream
- Pattern:Pattern.begin.where.next.where...times...
- PatternStream:CEP.pattern(DataStream,Pattern)
- DataStream:PatternStream.select(PatternSelectFunction)PatternStream.flatSelect(PatternSelectFunction)
- OUT:DataStream->Transformations->DataStream->DataSink
FlinkCEP匹配超时实现步骤
TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。
KeySelectorkeySelector= new NullByteKeySelector <>();
Pattern最后调用within设置窗口时间。如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。
- IN:DataSource->DataStream->Transformations->DataStream->keyBy->KeyedStream
- Pattern:Pattern.begin.where.next.where...within(TimewindowTime)
- PatternStream:CEP.pattern(KeyedStream,Pattern)
- OutputTag:newOutputTag(...)
- SingleOutputStreamOperator:PatternStream.flatSelect(OutputTag,PatternFlatTimeoutFunction,PatternFlatSelectFunction)
- DataStream:SingleOutputStreamOperator.getSideOutput(OutputTag)
- OUT:DataStream->Transformations->DataStream->DataSink
FlinkCEP超时不足
和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。
FlinkCEP超时完整demo
public
class
CEPTimeoutEventJob
{
private
static
final
String
LOCAL_KAFKA_BROKER=
"localhost:9092"
;
private
static
final
String
GROUP_ID=
CEPTimeoutEventJob
.
class
.getSimpleName();
private
static
final
String
GROUP_TOPIC=GROUP_ID;
public
static
void
main(
String
[]args)
throws
Exception
{
//参数
ParameterTool
params=
ParameterTool
.fromArgs(args);
StreamExecutionEnvironment
env=
StreamExecutionEnvironment
.getExecutionEnvironment();
//使用事件时间
env.setStreamTimeCharacteristic(
TimeCharacteristic
.
EventTime
);
env.enableCheckpointing(
5000
);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(
RestartStrategies
.fixedDelayRestart(
5
,
10000
));
//不使用POJO的时间
final
AssignerWithPeriodicWatermarks
extractor=
new
IngestionTimeExtractor
();
//与KafkaTopic的Partition保持一致
env.setParallelism(
3
);
Properties
kafkaProps=
new
Properties
();
kafkaProps.setProperty(
"bootstrap.servers"
,LOCAL_KAFKA_BROKER);
kafkaProps.setProperty(
"group.id"
,GROUP_ID);
//接入Kafka的消息
FlinkKafkaConsumer011
consumer=
new
FlinkKafkaConsumer011
<>(GROUP_TOPIC,
new
POJOSchema
(),kafkaProps);
DataStream
pojoDataStream=env.addSource(consumer)
.assignTimestampsAndWatermarks(extractor);
pojoDataStream.print();
//根据主键aid分组即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】
//1.
DataStream
keyedPojos=pojoDataStream
.keyBy(
"aid"
);
//从初始化到终态-一个完整的POJO事件序列
//2.
Pattern
completedPojo=
Pattern
.begin(
"init"
)
.where(
new
SimpleCondition
(){
private
static
final
long
serialVersionUID=-
6847788055093903603L
;
@Override
public
boolean
filter(POJOpojo)
throws
Exception
{
return
"02"
.equals(pojo.getAstatus());
}
})
.followedBy(
"end"
)
//.next("end")
.where(
new
SimpleCondition
(){
private
static
final
long
serialVersionUID=-
2655089736460847552L
;
@Override
public
boolean
filter(POJOpojo)
throws
Exception
{
return
"00"
.equals(pojo.getAstatus())||
"01"
.equals(pojo.getAstatus());
}
});
//找出1分钟内【便于测试】都没有到终态的事件aid
//如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时则生成多个PatternStream
//3.
PatternStream
patternStream=CEP.pattern(keyedPojos,completedPojo.within(
Time
.minutes(
1
)));
//定义侧面输出timedout
//4.
OutputTag
timedout=
new
OutputTag
(
"timedout"
){
private
static
final
long
serialVersionUID=
773503794597666247L
;
};
//OutputTagtimeoutOutputTag,PatternFlatTimeoutFunctionpatternFlatTimeoutFunction,PatternFlatSelectFunctionpatternFlatSelectFunction
//5.
SingleOutputStreamOperator
timeoutPojos=patternStream.flatSelect(
timedout,
new
POJOTimedOut
(),
new
FlatSelectNothing
()
);
//打印输出超时的POJO
//6.7.
timeoutPojos.getSideOutput(timedout).print();
timeoutPojos.print();
env.execute(
CEPTimeoutEventJob
.
class
.getSimpleName());
}
/**
*把超时的事件收集起来
*/
public
static
class
POJOTimedOut
implements
PatternFlatTimeoutFunction
{
private
static
final
long
serialVersionUID=-
4214641891396057732L
;
@Override
public
void
timeout(
Map
<
String
,
List
>map,
long
l,
Collector
collector)
throws
Exception
{
if
(
null
!=map.get(
"init"
)){
for
(POJOpojoInit:map.get(
"init"
)){
System
.out.println(
"timeoutinit:"
+pojoInit.getAid());
collector.collect(pojoInit);
}
}
//因为end超时了,还没收到end,所以这里是拿不到end的
System
.out.println(
"timeoutend:"
+map.get(
"end"
));
}
}
/**
*通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了
*一分钟时间内走完init和end的数据
*
*@param
*/
public
static
class
FlatSelectNothing
implements
PatternFlatSelectFunction
{
private
static
final
long
serialVersionUID=-
3029589950677623844L
;
@Override
public
void
flatSelect(
Map
<
String
,
List
>pattern,
Collector
collector){
System
.out.println(
"flatSelect:"
+pattern);
}
}
}
测试结果(followedBy):
3
>POJO{aid=
'ID000-0'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563419728242
,energy=
529.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'02'
,createTime=
null
,updateTime=
null
}
3
>POJO{aid=
'ID000-1'
,astyle=
'STYLE000-2'
,aname=
'NAME-1'
,logTime=
1563419728783
,energy=
348.00
,age=
26
,tt=
2019
-
07
-
18
,astatus=
'02'
,createTime=
null
,updateTime=
null
}
3
>POJO{aid=
'ID000-0'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563419749259
,energy=
492.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'00'
,createTime=
null
,updateTime=
null
}
flatSelect:{init=[POJO{aid=
'ID000-0'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563419728242
,energy=
529.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'02'
,createTime=
null
,updateTime=
null
}],
end
=[POJO{aid=
'ID000-0'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563419749259
,energy=
492.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'00'
,createTime=
null
,updateTime=
null
}]}
timeoutinit:ID000-
1
3
>POJO{aid=
'ID000-1'
,astyle=
'STYLE000-2'
,aname=
'NAME-1'
,logTime=
1563419728783
,energy=
348.00
,age=
26
,tt=
2019
-
07
-
18
,astatus=
'02'
,createTime=
null
,updateTime=
null
}
timeout
end
:
null
3
>POJO{aid=
'ID000-2'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563419829639
,energy=
467.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'03'
,createTime=
null
,updateTime=
null
}
3
>POJO{aid=
'ID000-2'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563419841394
,energy=
107.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'00'
,createTime=
null
,updateTime=
null
}
3
>POJO{aid=
'ID000-3'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563419967721
,energy=
431.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'02'
,createTime=
null
,updateTime=
null
}
3
>POJO{aid=
'ID000-3'
,astyle=
'STYLE000-2'
,aname=
'NAME-0'
,logTime=
1563419979567
,energy=
32.00
,age=
26
,tt=
2019
-
07
-
18
,astatus=
'03'
,createTime=
null
,updateTime=
null
}
3
>POJO{aid=
'ID000-3'
,astyle=
'STYLE000-2'
,aname=
'NAME-0'
,logTime=
1563419993612
,energy=
542.00
,age=
26
,tt=
2019
-
07
-
18
,astatus=
'01'
,createTime=
null
,updateTime=
null
}
flatSelect:{init=[POJO{aid=
'ID000-3'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563419967721
,energy=
431.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'02'
,createTime=
null
,updateTime=
null
}],
end
=[POJO{aid=
'ID000-3'
,astyle=
'STYLE000-2'
,aname=
'NAME-0'
,logTime=
1563419993612
,energy=
542.00
,age=
26
,tt=
2019
-
07
-
18
,astatus=
'01'
,createTime=
null
,updateTime=
null
}]}
3
>POJO{aid=
'ID000-4'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563420063760
,energy=
122.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'02'
,createTime=
null
,updateTime=
null
}
3
>POJO{aid=
'ID000-4'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563420078008
,energy=
275.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'03'
,createTime=
null
,updateTime=
null
}
timeoutinit:ID000-
4
3
>POJO{aid=
'ID000-4'
,astyle=
'STYLE000-0'
,aname=
'NAME-0'
,logTime=
1563420063760
,energy=
122.00
,age=
0
,tt=
2019
-
07
-
18
,astatus=
'02'
,createTime=
null
,updateTime=
null
}
timeout
end
:
null
总结
以上所述是小编给大家介绍的ApacheFlinkCEP实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。