JAVA流控及超流控后的延迟处理实例
本文实例讲述了JAVA流控及超流控后的延迟处理方法。分享给大家供大家参考。具体实现方法如下:
流控检查(每半秒累计,因此最小留空阀值只能做到每秒2条):
importjava.text.SimpleDateFormat;
importjava.util.Date;
importjava.lang.Thread;
/**
*流量控制
*
*@authorchenx
*/
publicclassOverflowController{
privateintmaxSendCountPerSecend;//该条链路上流控阀值
privateDatesendTime=newDate();
privateintsendCount=0;//该条链路上发送的数量
publicOverflowController(intmaxSendCountPerSecend){
if(maxSendCountPerSecend<2){
maxSendCountPerSecend=2;
}
this.maxSendCountPerSecend=maxSendCountPerSecend;
}
publicintgetMaxSendCountPerSecend(){
if(getMilliseconds(newDate())>=500){
returnmaxSendCountPerSecend/2;
}
returnmaxSendCountPerSecend-(maxSendCountPerSecend/2);
}
/**
*是否超流控
*/
publicbooleanisOverflow(intsendNum){
synchronized(this){
Datenow=newDate();
if(now.getTime()-sendTime.getTime()>=500){
sendTime=now;
sendCount=sendNum;
}else{
if(sendCount+sendNum>getMaxSendCountPerSecend()){
returntrue;
}else{
sendCount+=sendNum;
}
}
returnfalse;
}
}
/**
*获取指定时间的毫秒数
*/
privateintgetMilliseconds(Datedate){
SimpleDateFormatdf=newSimpleDateFormat("SSS");
returnInteger.valueOf(df.format(date));
}
publicstaticvoidmain(String[]args)throwsInterruptedException{
OverflowControlleroc=newOverflowController(50);
SimpleDateFormatdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss:SSS");
for(inti=0;i<=100;i++){
if(oc.isOverflow(1)){
System.out.println(i+"-isOverflow-"+df.format(newDate()));
}else{
System.out.println(i+"-sendOk-"+df.format(newDate()));
}
Thread.sleep(10);
}
}
}超流控后的延迟处理,由于java中没有.net的“延迟委托”一说:
ThreadPool.RegisterWaitForSingleObject(
WaitHandlewaitObject,
WaitOrTimerCallbackcallBack,
Objectstate,
intmillisecondsTimeOutInterval,
boolexecuteOnlyOnce
)
Java下需实现一个简单的延迟队列:
importjava.util.concurrent.Delayed;
importjava.util.concurrent.TimeUnit;
publicclassDelayEntryimplementsDelayed{
privateintcount;
privatelongdequeuedTimeMillis;//出队列时间
publicintgetCount(){
returncount;
}
publicvoidsetCount(intcount){
this.count=count;
}
publiclonggetDequeuedTimeMillis(){
returndequeuedTimeMillis;
}
publicDelayEntry(longdelayMillis){
dequeuedTimeMillis=System.currentTimeMillis()+delayMillis;
}
@Override
publicintcompareTo(Delayedo){
DelayEntryde=(DelayEntry)o;
longtimeout=dequeuedTimeMillis-de.dequeuedTimeMillis;
returntimeout>0?1:timeout<0?-1:0;
}
@Override
publiclonggetDelay(TimeUnitunit){
returndequeuedTimeMillis-System.currentTimeMillis();
}
}importjava.util.concurrent.DelayQueue;
publicclassDelayService{
publicvoidrun(){
DelayQueue<DelayEntry>queue=newDelayQueue<DelayEntry>();
DelayConsumerdelayConsumer=newDelayConsumer(queue);
delayConsumer.start();
for(inti=0;i<100;i++){
DelayEntryde=newDelayEntry(5000);
de.setCount(i);
System.out.println(System.currentTimeMillis()+"--------"+de.getCount());
queue.add(de);
}
}
classDelayConsumerextendsThread{
DelayQueue<DelayEntry>queue;
publicDelayConsumer(DelayQueue<DelayEntry>queue){
this.queue=queue;
}
publicvoidrun(){
while(true){
try{
DelayEntryde=queue.take();
System.out.println("queuesize="+queue.size());
System.out.println(de.getCount());
System.out.println(System.currentTimeMillis());
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
}
publicstaticvoidmain(String[]args){
DelayServiceds=newDelayService();
ds.run();
}
}
希望本文所述对大家的Java程序设计有所帮助。