Java实现心跳机制的方法
一、心跳机制简介
在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息,如果长时间没有收到对方的回复,则断开与对方的连接。
发包方既可以是服务端,也可以是客户端,这要看具体实现。因为是每隔固定时间发送一次,类似心跳,所以发送的固定信息称为心跳包。心跳包一般为比较小的包,可根据具体实现。心跳包主要应用于长连接的保持与短线链接。
一般而言,应该客户端主动向服务器发送心跳包,因为服务器向客户端发送心跳包会影响服务器的性能。
二、心跳机制实现方式
心跳机制有两种实现方式,一种基于TCP自带的心跳包,TCP的SO_KEEPALIVE选项可以,系统默认的默认跳帧频率为2小时,超过2小时后,本地的TCP实现会发送一个数据包给远程的Socket.如果远程Socket没有发回响应,TCP实现就会持续尝试11分钟,直到接收到响应为止。否则就会自动断开Socket连接。但TCP自带的心跳包无法检测比较敏感地知道对方的状态,默认2小时的空闲时间,对于大多数的应用而言太长了。可以手工开启KeepAlive功能并设置合理的KeepAlive参数。
另一种在应用层自己进行实现,基本步骤如下:
Client使用定时器,不断发送心跳;
Server收到心跳后,回复一个包;
Server为每个Client启动超时定时器,如果在指定时间内没有收到Client的心跳包,则Client失效。
三、Java实现心跳机制
这里基于Java实现的简单RPC框架实现心跳机制。Java实现代码如下所示:
心跳客户端类:
publicclassHeartbeatClientimplementsRunnable{
privateStringserverIP="127.0.0.1";
privateintserverPort=8089;
privateStringnodeID=UUID.randomUUID().toString();
privatebooleanisRunning=true;
//最近的心跳时间
privatelonglastHeartbeat;
//心跳间隔时间
privatelongheartBeatInterval=10*1000;
publicvoidrun(){
try{
while(isRunning){
HeartbeatHandlerhandler=RPClient.getRemoteProxyObj(HeartbeatHandler.class,newInetSocketAddress(serverIP,serverPort));
longstartTime=System.currentTimeMillis();
//是否达到发送心跳的周期时间
if(startTime-lastHeartbeat>heartBeatInterval){
System.out.println("sendaheartbeat");
lastHeartbeat=startTime;
HeartbeatEntityentity=newHeartbeatEntity();
entity.setTime(startTime);
entity.setNodeID(nodeID);
//向服务器发送心跳,并返回需要执行的命令
Cmdercmds=handler.sendHeartBeat(entity);
if(!processCommand(cmds))
continue;
}
}
}catch(Exceptione){
e.printStackTrace();
}
}
privatebooleanprocessCommand(Cmdercmds){
//...
returntrue;
}
}
心跳包实体类:
publicclassHeartbeatEntityimplementsSerializable{
privatelongtime;
privateStringnodeID;
privateStringerror;
privateMapinfo=newHashMap();
publicStringgetNodeID(){
returnnodeID;
}
publicvoidsetNodeID(StringnodeID){
this.nodeID=nodeID;
}
publicStringgetError(){
returnerror;
}
publicvoidsetError(Stringerror){
this.error=error;
}
publicMapgetInfo(){
returninfo;
}
publicvoidsetInfo(Mapinfo){
this.info=info;
}
publiclonggetTime(){
returntime;
}
publicvoidsetTime(longtime){
this.time=time;
}
}
服务器接受心跳包返回的命令对象类:
publicclassCmderimplementsSerializable{
privateStringnodeID;
privateStringerror;
privateMapinfo=newHashMap();
publicStringgetNodeID(){
returnnodeID;
}
publicvoidsetNodeID(StringnodeID){
this.nodeID=nodeID;
}
publicStringgetError(){
returnerror;
}
publicvoidsetError(Stringerror){
this.error=error;
}
publicMapgetInfo(){
returninfo;
}
publicvoidsetInfo(Mapinfo){
this.info=info;
}
}
RPC服务注册中心:
publicclassServiceCenter{
privateExecutorServiceexecutor=Executors.newFixedThreadPool(20);
privatefinalConcurrentHashMapserviceRegistry=newConcurrentHashMap();
privateAtomicBooleanisRunning=newAtomicBoolean(true);
//服务器监听端口
privateintport=8089;
//心跳监听器
HeartbeatLinstenerlinstener;
//单例模式
privatestaticclassSingleHolder{
privatestaticfinalServiceCenterINSTANCE=newServiceCenter();
}
privateServiceCenter(){
}
publicstaticServiceCentergetInstance(){
returnSingleHolder.INSTANCE;
}
publicvoidregister(ClassserviceInterface,Classimpl){
System.out.println("regeistservice"+serviceInterface.getName());
serviceRegistry.put(serviceInterface.getName(),impl);
}
publicvoidstart()throwsIOException{
ServerSocketserver=newServerSocket();
server.bind(newInetSocketAddress(port));
System.out.println("startserver");
linstener=HeartbeatLinstener.getInstance();
System.out.println("startlistenheartbeat");
try{
while(true){
//1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行
executor.execute(newServiceTask(server.accept()));
}
}finally{
server.close();
}
}
publicvoidstop(){
isRunning.set(false);
executor.shutdown();
}
publicbooleanisRunning(){
returnisRunning.get();
}
publicintgetPort(){
returnport;
}
publicvoidsettPort(intport){
this.port=port;
}
publicConcurrentHashMapgetServiceRegistry(){
returnserviceRegistry;
}
privateclassServiceTaskimplementsRunnable{
Socketclent=null;
publicServiceTask(Socketclient){
this.clent=client;
}
publicvoidrun(){
ObjectInputStreaminput=null;
ObjectOutputStreamoutput=null;
try{
//2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
input=newObjectInputStream(clent.getInputStream());
StringserviceName=input.readUTF();
StringmethodName=input.readUTF();
Class>[]parameterTypes=(Class>[])input.readObject();
Object[]arguments=(Object[])input.readObject();
ClassserviceClass=serviceRegistry.get(serviceName);
if(serviceClass==null){
thrownewClassNotFoundException(serviceName+"notfound");
}
Methodmethod=serviceClass.getMethod(methodName,parameterTypes);
Objectresult=method.invoke(serviceClass.newInstance(),arguments);
//3.将执行结果反序列化,通过socket发送给客户端
output=newObjectOutputStream(clent.getOutputStream());
output.writeObject(result);
}catch(Exceptione){
e.printStackTrace();
}finally{
if(output!=null){
try{
output.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
if(input!=null){
try{
input.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
if(clent!=null){
try{
clent.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
}
}
}
心跳监听类:
packagecom.cang.heartbeat;
importjava.io.IOException;
importjava.io.ObjectInputStream;
importjava.io.ObjectOutputStream;
importjava.lang.reflect.Method;
importjava.net.InetSocketAddress;
importjava.net.ServerSocket;
importjava.net.Socket;
importjava.util.Iterator;
importjava.util.Map;
importjava.util.concurrent.ConcurrentHashMap;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.atomic.AtomicBoolean;
/**
*心跳监听保存信息
*
*@authorcang
*@create_time2016-09-2811:40
*/
publicclassHeartbeatLinstener{
privateExecutorServiceexecutor=Executors.newFixedThreadPool(20);
privatefinalConcurrentHashMapnodes=newConcurrentHashMap();
privatefinalConcurrentHashMapnodeStatus=newConcurrentHashMap();
privatelongtimeout=10*1000;
//服务器监听端口
privateintport=8089;
//单例模式
privatestaticclassSingleHolder{
privatestaticfinalHeartbeatLinstenerINSTANCE=newHeartbeatLinstener();
}
privateHeartbeatLinstener(){
}
publicstaticHeartbeatLinstenergetInstance(){
returnSingleHolder.INSTANCE;
}
publicConcurrentHashMapgetNodes(){
returnnodes;
}
publicvoidregisterNode(StringnodeId,ObjectnodeInfo){
nodes.put(nodeId,nodeInfo);
nodeStatus.put(nodeId,System.currentTimeMillis());
}
publicvoidremoveNode(StringnodeID){
if(nodes.containsKey(nodeID)){
nodes.remove(nodeID);
}
}
//检测节点是否有效
publicbooleancheckNodeValid(Stringkey){
if(!nodes.containsKey(key)||!nodeStatus.containsKey(key))returnfalse;
if((System.currentTimeMillis()-nodeStatus.get(key))>timeout)returnfalse;
returntrue;
}
//删除所有失效节点
publicvoidremoveInValidNode(){
Iterator>it=nodeStatus.entrySet().iterator();
while(it.hasNext()){
Map.Entrye=it.next();
if((System.currentTimeMillis()-nodeStatus.get(e.getKey()))>timeout){
nodes.remove(e.getKey());
}
}
}
}
心跳处理类接口:
publicinterfaceHeartbeatHandler{
publicCmdersendHeartBeat(HeartbeatEntityinfo);
}
心跳处理实现类:
publicclassHeartbeatHandlerImplimplementsHeartbeatHandler{
publicCmdersendHeartBeat(HeartbeatEntityinfo){
HeartbeatLinstenerlinstener=HeartbeatLinstener.getInstance();
//添加节点
if(!linstener.checkNodeValid(info.getNodeID())){
linstener.registerNode(info.getNodeID(),info);
}
//其他操作
Cmdercmder=newCmder();
cmder.setNodeID(info.getNodeID());
//...
System.out.println("currentallthenodes:");
Mapnodes=linstener.getNodes();
for(Map.Entrye:nodes.entrySet()){
System.out.println(e.getKey()+":"+e.getValue());
}
System.out.println("hadleaheartbeat");
returncmder;
}
}
测试类:
publicclassHeartbeatTest{
publicstaticvoidmain(String[]args){
newThread(newRunnable(){
publicvoidrun(){
try{
ServiceCenterserviceServer=ServiceCenter.getInstance();
serviceServer.register(HeartbeatHandler.class,HeartbeatHandlerImpl.class);
serviceServer.start();
}catch(IOExceptione){
e.printStackTrace();
}
}
}).start();
Threadclient1=newThread(newHeartbeatClient());
client1.start();
Threadclient2=newThread(newHeartbeatClient());
client2.start();
}
}
四、总结
上面的代码还有很多不足的地方,希望有空能进行改善:
- 配置为硬编码;
- 命令类Cmder没有实际实现,返回的Cmder对象没有实际进行处理;
其他小问题就暂时不管了,希望以后能重写上面的代码。
以上就是Java实现心跳机制的方法的详细内容,更多关于Java实现心跳机制的资料请关注毛票票其它相关文章!