Java实现心跳机制的方法
一、心跳机制简介
在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息,如果长时间没有收到对方的回复,则断开与对方的连接。
发包方既可以是服务端,也可以是客户端,这要看具体实现。因为是每隔固定时间发送一次,类似心跳,所以发送的固定信息称为心跳包。心跳包一般为比较小的包,可根据具体实现。心跳包主要应用于长连接的保持与短线链接。
一般而言,应该客户端主动向服务器发送心跳包,因为服务器向客户端发送心跳包会影响服务器的性能。
二、心跳机制实现方式
心跳机制有两种实现方式,一种基于TCP自带的心跳包,TCP的SO_KEEPALIVE选项可以,系统默认的默认跳帧频率为2小时,超过2小时后,本地的TCP实现会发送一个数据包给远程的Socket.如果远程Socket没有发回响应,TCP实现就会持续尝试11分钟,直到接收到响应为止。否则就会自动断开Socket连接。但TCP自带的心跳包无法检测比较敏感地知道对方的状态,默认2小时的空闲时间,对于大多数的应用而言太长了。可以手工开启KeepAlive功能并设置合理的KeepAlive参数。
另一种在应用层自己进行实现,基本步骤如下:
Client使用定时器,不断发送心跳;
Server收到心跳后,回复一个包;
Server为每个Client启动超时定时器,如果在指定时间内没有收到Client的心跳包,则Client失效。
三、Java实现心跳机制
这里基于Java实现的简单RPC框架实现心跳机制。Java实现代码如下所示:
心跳客户端类:
publicclassHeartbeatClientimplementsRunnable{ privateStringserverIP="127.0.0.1"; privateintserverPort=8089; privateStringnodeID=UUID.randomUUID().toString(); privatebooleanisRunning=true; //最近的心跳时间 privatelonglastHeartbeat; //心跳间隔时间 privatelongheartBeatInterval=10*1000; publicvoidrun(){ try{ while(isRunning){ HeartbeatHandlerhandler=RPClient.getRemoteProxyObj(HeartbeatHandler.class,newInetSocketAddress(serverIP,serverPort)); longstartTime=System.currentTimeMillis(); //是否达到发送心跳的周期时间 if(startTime-lastHeartbeat>heartBeatInterval){ System.out.println("sendaheartbeat"); lastHeartbeat=startTime; HeartbeatEntityentity=newHeartbeatEntity(); entity.setTime(startTime); entity.setNodeID(nodeID); //向服务器发送心跳,并返回需要执行的命令 Cmdercmds=handler.sendHeartBeat(entity); if(!processCommand(cmds)) continue; } } }catch(Exceptione){ e.printStackTrace(); } } privatebooleanprocessCommand(Cmdercmds){ //... returntrue; } }
心跳包实体类:
publicclassHeartbeatEntityimplementsSerializable{ privatelongtime; privateStringnodeID; privateStringerror; privateMapinfo=newHashMap (); publicStringgetNodeID(){ returnnodeID; } publicvoidsetNodeID(StringnodeID){ this.nodeID=nodeID; } publicStringgetError(){ returnerror; } publicvoidsetError(Stringerror){ this.error=error; } publicMap getInfo(){ returninfo; } publicvoidsetInfo(Map info){ this.info=info; } publiclonggetTime(){ returntime; } publicvoidsetTime(longtime){ this.time=time; } }
服务器接受心跳包返回的命令对象类:
publicclassCmderimplementsSerializable{ privateStringnodeID; privateStringerror; privateMapinfo=newHashMap (); publicStringgetNodeID(){ returnnodeID; } publicvoidsetNodeID(StringnodeID){ this.nodeID=nodeID; } publicStringgetError(){ returnerror; } publicvoidsetError(Stringerror){ this.error=error; } publicMap getInfo(){ returninfo; } publicvoidsetInfo(Map info){ this.info=info; } }
RPC服务注册中心:
publicclassServiceCenter{ privateExecutorServiceexecutor=Executors.newFixedThreadPool(20); privatefinalConcurrentHashMapserviceRegistry=newConcurrentHashMap (); privateAtomicBooleanisRunning=newAtomicBoolean(true); //服务器监听端口 privateintport=8089; //心跳监听器 HeartbeatLinstenerlinstener; //单例模式 privatestaticclassSingleHolder{ privatestaticfinalServiceCenterINSTANCE=newServiceCenter(); } privateServiceCenter(){ } publicstaticServiceCentergetInstance(){ returnSingleHolder.INSTANCE; } publicvoidregister(ClassserviceInterface,Classimpl){ System.out.println("regeistservice"+serviceInterface.getName()); serviceRegistry.put(serviceInterface.getName(),impl); } publicvoidstart()throwsIOException{ ServerSocketserver=newServerSocket(); server.bind(newInetSocketAddress(port)); System.out.println("startserver"); linstener=HeartbeatLinstener.getInstance(); System.out.println("startlistenheartbeat"); try{ while(true){ //1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行 executor.execute(newServiceTask(server.accept())); } }finally{ server.close(); } } publicvoidstop(){ isRunning.set(false); executor.shutdown(); } publicbooleanisRunning(){ returnisRunning.get(); } publicintgetPort(){ returnport; } publicvoidsettPort(intport){ this.port=port; } publicConcurrentHashMap getServiceRegistry(){ returnserviceRegistry; } privateclassServiceTaskimplementsRunnable{ Socketclent=null; publicServiceTask(Socketclient){ this.clent=client; } publicvoidrun(){ ObjectInputStreaminput=null; ObjectOutputStreamoutput=null; try{ //2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果 input=newObjectInputStream(clent.getInputStream()); StringserviceName=input.readUTF(); StringmethodName=input.readUTF(); Class>[]parameterTypes=(Class>[])input.readObject(); Object[]arguments=(Object[])input.readObject(); ClassserviceClass=serviceRegistry.get(serviceName); if(serviceClass==null){ thrownewClassNotFoundException(serviceName+"notfound"); } Methodmethod=serviceClass.getMethod(methodName,parameterTypes); Objectresult=method.invoke(serviceClass.newInstance(),arguments); //3.将执行结果反序列化,通过socket发送给客户端 output=newObjectOutputStream(clent.getOutputStream()); output.writeObject(result); }catch(Exceptione){ e.printStackTrace(); }finally{ if(output!=null){ try{ output.close(); }catch(IOExceptione){ e.printStackTrace(); } } if(input!=null){ try{ input.close(); }catch(IOExceptione){ e.printStackTrace(); } } if(clent!=null){ try{ clent.close(); }catch(IOExceptione){ e.printStackTrace(); } } } } } }
心跳监听类:
packagecom.cang.heartbeat; importjava.io.IOException; importjava.io.ObjectInputStream; importjava.io.ObjectOutputStream; importjava.lang.reflect.Method; importjava.net.InetSocketAddress; importjava.net.ServerSocket; importjava.net.Socket; importjava.util.Iterator; importjava.util.Map; importjava.util.concurrent.ConcurrentHashMap; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; importjava.util.concurrent.atomic.AtomicBoolean; /** *心跳监听保存信息 * *@authorcang *@create_time2016-09-2811:40 */ publicclassHeartbeatLinstener{ privateExecutorServiceexecutor=Executors.newFixedThreadPool(20); privatefinalConcurrentHashMapnodes=newConcurrentHashMap (); privatefinalConcurrentHashMap nodeStatus=newConcurrentHashMap (); privatelongtimeout=10*1000; //服务器监听端口 privateintport=8089; //单例模式 privatestaticclassSingleHolder{ privatestaticfinalHeartbeatLinstenerINSTANCE=newHeartbeatLinstener(); } privateHeartbeatLinstener(){ } publicstaticHeartbeatLinstenergetInstance(){ returnSingleHolder.INSTANCE; } publicConcurrentHashMap getNodes(){ returnnodes; } publicvoidregisterNode(StringnodeId,ObjectnodeInfo){ nodes.put(nodeId,nodeInfo); nodeStatus.put(nodeId,System.currentTimeMillis()); } publicvoidremoveNode(StringnodeID){ if(nodes.containsKey(nodeID)){ nodes.remove(nodeID); } } //检测节点是否有效 publicbooleancheckNodeValid(Stringkey){ if(!nodes.containsKey(key)||!nodeStatus.containsKey(key))returnfalse; if((System.currentTimeMillis()-nodeStatus.get(key))>timeout)returnfalse; returntrue; } //删除所有失效节点 publicvoidremoveInValidNode(){ Iterator >it=nodeStatus.entrySet().iterator(); while(it.hasNext()){ Map.Entry e=it.next(); if((System.currentTimeMillis()-nodeStatus.get(e.getKey()))>timeout){ nodes.remove(e.getKey()); } } } }
心跳处理类接口:
publicinterfaceHeartbeatHandler{ publicCmdersendHeartBeat(HeartbeatEntityinfo); }
心跳处理实现类:
publicclassHeartbeatHandlerImplimplementsHeartbeatHandler{ publicCmdersendHeartBeat(HeartbeatEntityinfo){ HeartbeatLinstenerlinstener=HeartbeatLinstener.getInstance(); //添加节点 if(!linstener.checkNodeValid(info.getNodeID())){ linstener.registerNode(info.getNodeID(),info); } //其他操作 Cmdercmder=newCmder(); cmder.setNodeID(info.getNodeID()); //... System.out.println("currentallthenodes:"); Mapnodes=linstener.getNodes(); for(Map.Entrye:nodes.entrySet()){ System.out.println(e.getKey()+":"+e.getValue()); } System.out.println("hadleaheartbeat"); returncmder; } }
测试类:
publicclassHeartbeatTest{ publicstaticvoidmain(String[]args){ newThread(newRunnable(){ publicvoidrun(){ try{ ServiceCenterserviceServer=ServiceCenter.getInstance(); serviceServer.register(HeartbeatHandler.class,HeartbeatHandlerImpl.class); serviceServer.start(); }catch(IOExceptione){ e.printStackTrace(); } } }).start(); Threadclient1=newThread(newHeartbeatClient()); client1.start(); Threadclient2=newThread(newHeartbeatClient()); client2.start(); } }
四、总结
上面的代码还有很多不足的地方,希望有空能进行改善:
- 配置为硬编码;
- 命令类Cmder没有实际实现,返回的Cmder对象没有实际进行处理;
其他小问题就暂时不管了,希望以后能重写上面的代码。
以上就是Java实现心跳机制的方法的详细内容,更多关于Java实现心跳机制的资料请关注毛票票其它相关文章!