java解析JT808协议的实现代码
本篇文章将介绍JT808协议的解析思路。
另请大神绕路,不喜勿喷!
先写个大致的思路,有疑问可以联系本人,联系方式:
emial:hylexus@163.com
1JT808协议扫盲
1.1数据类型
| 数据类型 | 描述及要求 |
|---|---|
| BYTE | 无符号单字节整形(字节,8位) |
| WORD | 无符号双字节整形(字,16位) |
| DWORD | 无符号四字节整形(双字,32位) |
| BYTE[n] | n字节 |
| BCD[n] | 8421码,n字节 |
| STRING | GBK编码,若无数据,置空 |
1.2消息结构
| 标识位 | 消息头 | 消息体 | 校验码 | 标识位 |
|---|---|---|---|---|
| 1byte(0x7e) | 16byte | 1byte | 1byte(0x7e) |
1.3消息头
消息ID(0-1) 消息体属性(2-3) 终端手机号(4-9) 消息流水号(10-11) 消息包封装项(12-15)
byte[0-1] 消息IDword(16)
byte[2-3] 消息体属性word(16)
bit[0-9] 消息体长度
bit[10-12] 数据加密方式
此三位都为0,表示消息体不加密
第10位为1,表示消息体经过RSA算法加密
其它保留
bit[13] 分包
1:消息体卫长消息,进行分包发送处理,具体分包信息由消息包封装项决定
0:则消息头中无消息包封装项字段
bit[14-15] 保留
byte[4-9] 终端手机号或设备IDbcd[6]
根据安装后终端自身的手机号转换
手机号不足12位,则在前面补0
byte[10-11] 消息流水号word(16)
按发送顺序从0开始循环累加
byte[12-15] 消息包封装项
byte[0-1] 消息包总数(word(16))
该消息分包后得总包数
byte[2-3] 包序号(word(16))
从1开始
如果消息体属性中相关标识位确定消息分包处理,则该项有内容
否则无该项
2解析
整个消息体结构中最复杂的就是消息头了。
2.1消息体实体类
以下是对整个消息体抽象出来的一个java实体类。
importjava.nio.channels.Channel;
publicclassPackageData{
/**
*16byte消息头
*/
protectedMsgHeadermsgHeader;
//消息体字节数组
protectedbyte[]msgBodyBytes;
/**
*校验码1byte
*/
protectedintcheckSum;
//记录每个客户端的channel,以便下发信息给客户端
protectedChannelchannel;
publicMsgHeadergetMsgHeader(){
returnmsgHeader;
}
//TODOset和get方法在此处省略
//消息头
publicstaticclassMsgHeader{
//消息ID
protectedintmsgId;
///////========消息体属性
//byte[2-3]
protectedintmsgBodyPropsField;
//消息体长度
protectedintmsgBodyLength;
//数据加密方式
protectedintencryptionType;
//是否分包,true==>有消息包封装项
protectedbooleanhasSubPackage;
//保留位[14-15]
protectedStringreservedBit;
///////========消息体属性
//终端手机号
protectedStringterminalPhone;
//流水号
protectedintflowId;
////////=====消息包封装项
//byte[12-15]
protectedintpackageInfoField;
//消息包总数(word(16))
protectedlongtotalSubPackage;
//包序号(word(16))这次发送的这个消息包是分包中的第几个消息包,从1开始
protectedlongsubPackageSeq;
////////=====消息包封装项
//TODOset和get方法在此处省略
}
}
2.2字节数组到消息体实体类的转换
2.2.1消息转换器
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importcn.hylexus.jt808.util.BCD8421Operater;
importcn.hylexus.jt808.util.BitOperator;
importcn.hylexus.jt808.vo.PackageData;
importcn.hylexus.jt808.vo.PackageData.MsgHeader;
publicclassMsgDecoder{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(MsgDecoder.class);
privateBitOperatorbitOperator;
privateBCD8421Operaterbcd8421Operater;
publicMsgDecoder(){
this.bitOperator=newBitOperator();
this.bcd8421Operater=newBCD8421Operater();
}
//字节数组到消息体实体类
publicPackageDataqueueElement2PackageData(byte[]data){
PackageDataret=newPackageData();
//1.16byte或12byte消息头
MsgHeadermsgHeader=this.parseMsgHeaderFromBytes(data);
ret.setMsgHeader(msgHeader);
intmsgBodyByteStartIndex=12;
//2.消息体
//有子包信息,消息体起始字节后移四个字节:消息包总数(word(16))+包序号(word(16))
if(msgHeader.isHasSubPackage()){
msgBodyByteStartIndex=16;
}
byte[]tmp=newbyte[msgHeader.getMsgBodyLength()];
System.arraycopy(data,msgBodyByteStartIndex,tmp,0,tmp.length);
ret.setMsgBodyBytes(tmp);
//3.去掉分隔符之后,最后一位就是校验码
//intcheckSumInPkg=
//this.bitOperator.oneByteToInteger(data[data.length-1]);
intcheckSumInPkg=data[data.length-1];
intcalculatedCheckSum=this.bitOperator.getCheckSum4JT808(data,0,data.length-1);
ret.setCheckSum(checkSumInPkg);
if(checkSumInPkg!=calculatedCheckSum){
log.warn("检验码不一致,msgid:{},pkg:{},calculated:{}",msgHeader.getMsgId(),checkSumInPkg,calculatedCheckSum);
}
returnret;
}
privateMsgHeaderparseMsgHeaderFromBytes(byte[]data){
MsgHeadermsgHeader=newMsgHeader();
//1.消息IDword(16)
//byte[]tmp=newbyte[2];
//System.arraycopy(data,0,tmp,0,2);
//msgHeader.setMsgId(this.bitOperator.twoBytesToInteger(tmp));
msgHeader.setMsgId(this.parseIntFromBytes(data,0,2));
//2.消息体属性word(16)=================>
//System.arraycopy(data,2,tmp,0,2);
//intmsgBodyProps=this.bitOperator.twoBytesToInteger(tmp);
intmsgBodyProps=this.parseIntFromBytes(data,2,2);
msgHeader.setMsgBodyPropsField(msgBodyProps);
//[0-9]0000,0011,1111,1111(3FF)(消息体长度)
msgHeader.setMsgBodyLength(msgBodyProps&0x1ff);
//[10-12]0001,1100,0000,0000(1C00)(加密类型)
msgHeader.setEncryptionType((msgBodyProps&0xe00)>>10);
//[13_]0010,0000,0000,0000(2000)(是否有子包)
msgHeader.setHasSubPackage(((msgBodyProps&0x2000)>>13)==1);
//[14-15]1100,0000,0000,0000(C000)(保留位)
msgHeader.setReservedBit(((msgBodyProps&0xc000)>>14)+"");
//消息体属性word(16)<=================
//3.终端手机号bcd[6]
//tmp=newbyte[6];
//System.arraycopy(data,4,tmp,0,6);
//msgHeader.setTerminalPhone(this.bcd8421Operater.bcd2String(tmp));
msgHeader.setTerminalPhone(this.parseBcdStringFromBytes(data,4,6));
//4.消息流水号word(16)按发送顺序从0开始循环累加
//tmp=newbyte[2];
//System.arraycopy(data,10,tmp,0,2);
//msgHeader.setFlowId(this.bitOperator.twoBytesToInteger(tmp));
msgHeader.setFlowId(this.parseIntFromBytes(data,10,2));
//5.消息包封装项
//有子包信息
if(msgHeader.isHasSubPackage()){
//消息包封装项字段
msgHeader.setPackageInfoField(this.parseIntFromBytes(data,12,4));
//byte[0-1]消息包总数(word(16))
//tmp=newbyte[2];
//System.arraycopy(data,12,tmp,0,2);
//msgHeader.setTotalSubPackage(this.bitOperator.twoBytesToInteger(tmp));
msgHeader.setTotalSubPackage(this.parseIntFromBytes(data,12,2));
//byte[2-3]包序号(word(16))从1开始
//tmp=newbyte[2];
//System.arraycopy(data,14,tmp,0,2);
//msgHeader.setSubPackageSeq(this.bitOperator.twoBytesToInteger(tmp));
msgHeader.setSubPackageSeq(this.parseIntFromBytes(data,12,2));
}
returnmsgHeader;
}
protectedStringparseStringFromBytes(byte[]data,intstartIndex,intlenth){
returnthis.parseStringFromBytes(data,startIndex,lenth,null);
}
privateStringparseStringFromBytes(byte[]data,intstartIndex,intlenth,StringdefaultVal){
try{
byte[]tmp=newbyte[lenth];
System.arraycopy(data,startIndex,tmp,0,lenth);
returnnewString(tmp,"UTF-8");
}catch(Exceptione){
log.error("解析字符串出错:{}",e.getMessage());
e.printStackTrace();
returndefaultVal;
}
}
privateStringparseBcdStringFromBytes(byte[]data,intstartIndex,intlenth){
returnthis.parseBcdStringFromBytes(data,startIndex,lenth,null);
}
privateStringparseBcdStringFromBytes(byte[]data,intstartIndex,intlenth,StringdefaultVal){
try{
byte[]tmp=newbyte[lenth];
System.arraycopy(data,startIndex,tmp,0,lenth);
returnthis.bcd8421Operater.bcd2String(tmp);
}catch(Exceptione){
log.error("解析BCD(8421码)出错:{}",e.getMessage());
e.printStackTrace();
returndefaultVal;
}
}
privateintparseIntFromBytes(byte[]data,intstartIndex,intlength){
returnthis.parseIntFromBytes(data,startIndex,length,0);
}
privateintparseIntFromBytes(byte[]data,intstartIndex,intlength,intdefaultVal){
try{
//字节数大于4,从起始索引开始向后处理4个字节,其余超出部分丢弃
finalintlen=length>4?4:length;
byte[]tmp=newbyte[len];
System.arraycopy(data,startIndex,tmp,0,len);
returnbitOperator.byteToInteger(tmp);
}catch(Exceptione){
log.error("解析整数出错:{}",e.getMessage());
e.printStackTrace();
returndefaultVal;
}
}
}
2.2.2用到的工具类
2.2.2.1BCD操作工具类
packagecn.hylexus.jt808.util;
publicclassBCD8421Operater{
/**
*BCD字节数组===>String
*
*@parambytes
*@return十进制字符串
*/
publicStringbcd2String(byte[]bytes){
StringBuildertemp=newStringBuilder(bytes.length*2);
for(inti=0;i>>4);
//低四位
temp.append(bytes[i]&0x0f);
}
returntemp.toString().substring(0,1).equalsIgnoreCase("0")?temp.toString().substring(1):temp.toString();
}
/**
*字符串==>BCD字节数组
*
*@paramstr
*@returnBCD字节数组
*/
publicbyte[]string2Bcd(Stringstr){
//奇数,前补零
if((str.length()&0x1)==1){
str="0"+str;
}
byteret[]=newbyte[str.length()/2];
bytebs[]=str.getBytes();
for(inti=0;i='0')&&(asc<='9'))
return(byte)(asc-'0');
elseif((asc>='A')&&(asc<='F'))
return(byte)(asc-'A'+10);
elseif((asc>='a')&&(asc<='f'))
return(byte)(asc-'a'+10);
else
return(byte)(asc-48);
}
}
2.2.2.2位操作工具类
packagecn.hylexus.jt808.util;
importjava.util.Arrays;
importjava.util.List;
publicclassBitOperator{
/**
*把一个整形该为byte
*
*@paramvalue
*@return
*@throwsException
*/
publicbyteintegerTo1Byte(intvalue){
return(byte)(value&0xFF);
}
/**
*把一个整形该为1位的byte数组
*
*@paramvalue
*@return
*@throwsException
*/
publicbyte[]integerTo1Bytes(intvalue){
byte[]result=newbyte[1];
result[0]=(byte)(value&0xFF);
returnresult;
}
/**
*把一个整形改为2位的byte数组
*
*@paramvalue
*@return
*@throwsException
*/
publicbyte[]integerTo2Bytes(intvalue){
byte[]result=newbyte[2];
result[0]=(byte)((value>>>8)&0xFF);
result[1]=(byte)(value&0xFF);
returnresult;
}
/**
*把一个整形改为3位的byte数组
*
*@paramvalue
*@return
*@throwsException
*/
publicbyte[]integerTo3Bytes(intvalue){
byte[]result=newbyte[3];
result[0]=(byte)((value>>>16)&0xFF);
result[1]=(byte)((value>>>8)&0xFF);
result[2]=(byte)(value&0xFF);
returnresult;
}
/**
*把一个整形改为4位的byte数组
*
*@paramvalue
*@return
*@throwsException
*/
publicbyte[]integerTo4Bytes(intvalue){
byte[]result=newbyte[4];
result[0]=(byte)((value>>>24)&0xFF);
result[1]=(byte)((value>>>16)&0xFF);
result[2]=(byte)((value>>>8)&0xFF);
result[3]=(byte)(value&0xFF);
returnresult;
}
/**
*把byte[]转化位整形,通常为指令用
*
*@paramvalue
*@return
*@throwsException
*/
publicintbyteToInteger(byte[]value){
intresult;
if(value.length==1){
result=oneByteToInteger(value[0]);
}elseif(value.length==2){
result=twoBytesToInteger(value);
}elseif(value.length==3){
result=threeBytesToInteger(value);
}elseif(value.length==4){
result=fourBytesToInteger(value);
}else{
result=fourBytesToInteger(value);
}
returnresult;
}
/**
*把一个byte转化位整形,通常为指令用
*
*@paramvalue
*@return
*@throwsException
*/
publicintoneByteToInteger(bytevalue){
return(int)value&0xFF;
}
/**
*把一个2位的数组转化位整形
*
*@paramvalue
*@return
*@throwsException
*/
publicinttwoBytesToInteger(byte[]value){
//if(value.length<2){
//thrownewException("Bytearraytooshort!");
//}
inttemp0=value[0]&0xFF;
inttemp1=value[1]&0xFF;
return((temp0<<8)+temp1);
}
/**
*把一个3位的数组转化位整形
*
*@paramvalue
*@return
*@throwsException
*/
publicintthreeBytesToInteger(byte[]value){
inttemp0=value[0]&0xFF;
inttemp1=value[1]&0xFF;
inttemp2=value[2]&0xFF;
return((temp0<<16)+(temp1<<8)+temp2);
}
/**
*把一个4位的数组转化位整形,通常为指令用
*
*@paramvalue
*@return
*@throwsException
*/
publicintfourBytesToInteger(byte[]value){
//if(value.length<4){
//thrownewException("Bytearraytooshort!");
//}
inttemp0=value[0]&0xFF;
inttemp1=value[1]&0xFF;
inttemp2=value[2]&0xFF;
inttemp3=value[3]&0xFF;
return((temp0<<24)+(temp1<<16)+(temp2<<8)+temp3);
}
/**
*把一个4位的数组转化位整形
*
*@paramvalue
*@return
*@throwsException
*/
publiclongfourBytesToLong(byte[]value)throwsException{
//if(value.length<4){
//thrownewException("Bytearraytooshort!");
//}
inttemp0=value[0]&0xFF;
inttemp1=value[1]&0xFF;
inttemp2=value[2]&0xFF;
inttemp3=value[3]&0xFF;
return(((long)temp0<<24)+(temp1<<16)+(temp2<<8)+temp3);
}
/**
*把一个数组转化长整形
*
*@paramvalue
*@return
*@throwsException
*/
publiclongbytes2Long(byte[]value){
longresult=0;
intlen=value.length;
inttemp;
for(inti=0;i>>temp)&0x0ff;
}
}
returnresult;
}
/**
*得到一个消息ID
*
*@return
*@throwsException
*/
publicbyte[]generateTransactionID()throwsException{
byte[]id=newbyte[16];
System.arraycopy(integerTo2Bytes((int)(Math.random()*65536)),0,id,0,2);
System.arraycopy(integerTo2Bytes((int)(Math.random()*65536)),0,id,2,2);
System.arraycopy(integerTo2Bytes((int)(Math.random()*65536)),0,id,4,2);
System.arraycopy(integerTo2Bytes((int)(Math.random()*65536)),0,id,6,2);
System.arraycopy(integerTo2Bytes((int)(Math.random()*65536)),0,id,8,2);
System.arraycopy(integerTo2Bytes((int)(Math.random()*65536)),0,id,10,2);
System.arraycopy(integerTo2Bytes((int)(Math.random()*65536)),0,id,12,2);
System.arraycopy(integerTo2Bytes((int)(Math.random()*65536)),0,id,14,2);
returnid;
}
/**
*把IP拆分位int数组
*
*@paramip
*@return
*@throwsException
*/
publicint[]getIntIPValue(Stringip)throwsException{
String[]sip=ip.split("[.]");
//if(sip.length!=4){
//thrownewException("errorIPAddress");
//}
int[]intIP={Integer.parseInt(sip[0]),Integer.parseInt(sip[1]),Integer.parseInt(sip[2]),
Integer.parseInt(sip[3])};
returnintIP;
}
/**
*把byte类型IP地址转化位字符串
*
*@paramaddress
*@return
*@throwsException
*/
publicStringgetStringIPValue(byte[]address)throwsException{
intfirst=this.oneByteToInteger(address[0]);
intsecond=this.oneByteToInteger(address[1]);
intthird=this.oneByteToInteger(address[2]);
intfourth=this.oneByteToInteger(address[3]);
returnfirst+"."+second+"."+third+"."+fourth;
}
/**
*合并字节数组
*
*@paramfirst
*@paramrest
*@return
*/
publicbyte[]concatAll(byte[]first,byte[]...rest){
inttotalLength=first.length;
for(byte[]array:rest){
if(array!=null){
totalLength+=array.length;
}
}
byte[]result=Arrays.copyOf(first,totalLength);
intoffset=first.length;
for(byte[]array:rest){
if(array!=null){
System.arraycopy(array,0,result,offset,array.length);
offset+=array.length;
}
}
returnresult;
}
/**
*合并字节数组
*
*@paramrest
*@return
*/
publicbyte[]concatAll(Listrest){
inttotalLength=0;
for(byte[]array:rest){
if(array!=null){
totalLength+=array.length;
}
}
byte[]result=newbyte[totalLength];
intoffset=0;
for(byte[]array:rest){
if(array!=null){
System.arraycopy(array,0,result,offset,array.length);
offset+=array.length;
}
}
returnresult;
}
publicfloatbyte2Float(byte[]bs){
returnFloat.intBitsToFloat(
(((bs[3]&0xFF)<<24)+((bs[2]&0xFF)<<16)+((bs[1]&0xFF)<<8)+(bs[0]&0xFF)));
}
publicfloatbyteBE2Float(byte[]bytes){
intl;
l=bytes[0];
l&=0xff;
l|=((long)bytes[1]<<8);
l&=0xffff;
l|=((long)bytes[2]<<16);
l&=0xffffff;
l|=((long)bytes[3]<<24);
returnFloat.intBitsToFloat(l);
}
publicintgetCheckSum4JT808(byte[]bs,intstart,intend){
if(start<0||end>bs.length)
thrownewArrayIndexOutOfBoundsException("getCheckSum4JT808error:indexoutofbounds(start="+start
+",end="+end+",byteslength="+bs.length+")");
intcs=0;
for(inti=start;i=Integer.SIZE)
thrownewIndexOutOfBoundsException("maxindexis"+(Integer.SIZE-1)+",butend="+end);
return(number<>>Integer.SIZE-(end-start+1);
}
publicintgetBitAt(intnumber,intindex){
if(index<0)
thrownewIndexOutOfBoundsException("minindexis0,but"+index);
if(index>=Integer.SIZE)
thrownewIndexOutOfBoundsException("maxindexis"+(Integer.SIZE-1)+",but"+index);
return((1<>index;
}
publicintgetBitAtS(intnumber,intindex){
Strings=Integer.toBinaryString(number);
returnInteger.parseInt(s.charAt(index)+"");
}
@Deprecated
publicintgetBitRangeS(intnumber,intstart,intend){
Strings=Integer.toBinaryString(number);
StringBuildersb=newStringBuilder(s);
while(sb.length()
2.3和netty结合
2.3.1netty处理器链
importjava.util.concurrent.TimeUnit;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importcn.kkbc.tpms.tcp.service.TCPServerHandler;
importio.netty.bootstrap.ServerBootstrap;
importio.netty.buffer.Unpooled;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.SocketChannel;
importio.netty.channel.socket.nio.NioServerSocketChannel;
importio.netty.handler.codec.DelimiterBasedFrameDecoder;
importio.netty.handler.timeout.IdleStateHandler;
importio.netty.util.concurrent.Future;
publicclassTCPServer2{
privateLoggerlog=LoggerFactory.getLogger(getClass());
privatevolatilebooleanisRunning=false;
privateEventLoopGroupbossGroup=null;
privateEventLoopGroupworkerGroup=null;
privateintport;
publicTCPServer2(){
}
publicTCPServer2(intport){
this();
this.port=port;
}
privatevoidbind()throwsException{
this.bossGroup=newNioEventLoopGroup();
this.workerGroup=newNioEventLoopGroup();
ServerBootstrapserverBootstrap=newServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)//
.channel(NioServerSocketChannel.class)//
.childHandler(newChannelInitializer(){//
@Override
publicvoidinitChannel(SocketChannelch)throwsException{
//超过15分钟未收到客户端消息则自动断开客户端连接
ch.pipeline().addLast("idleStateHandler",
newIdleStateHandler(15,0,0,TimeUnit.MINUTES));
//ch.pipeline().addLast(newDecoder4LoggingOnly());
//1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
ch.pipeline().addLast(
newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(newbyte[]{0x7e}),
Unpooled.copiedBuffer(newbyte[]{0x7e,0x7e})));
ch.pipeline().addLast(newTCPServerHandler());
}
}).option(ChannelOption.SO_BACKLOG,128)//
.childOption(ChannelOption.SO_KEEPALIVE,true);
this.log.info("TCP服务启动完毕,port={}",this.port);
ChannelFuturechannelFuture=serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}
publicsynchronizedvoidstartServer(){
if(this.isRunning){
thrownewIllegalStateException(this.getName()+"isalreadystarted.");
}
this.isRunning=true;
newThread(()->{
try{
this.bind();
}catch(Exceptione){
this.log.info("TCP服务启动出错:{}",e.getMessage());
e.printStackTrace();
}
},this.getName()).start();
}
publicsynchronizedvoidstopServer(){
if(!this.isRunning){
thrownewIllegalStateException(this.getName()+"isnotyetstarted.");
}
this.isRunning=false;
try{
Future>future=this.workerGroup.shutdownGracefully().await();
if(!future.isSuccess()){
log.error("workerGroup无法正常停止:{}",future.cause());
}
future=this.bossGroup.shutdownGracefully().await();
if(!future.isSuccess()){
log.error("bossGroup无法正常停止:{}",future.cause());
}
}catch(InterruptedExceptione){
e.printStackTrace();
}
this.log.info("TCP服务已经停止...");
}
privateStringgetName(){
return"TCP-Server";
}
publicstaticvoidmain(String[]args)throwsException{
TCPServer2server=newTCPServer2(20048);
server.startServer();
//Thread.sleep(3000);
//server.stopServer();
}
}
2.3.2netty针对于JT808的消息处理器
packagecn.hylexus.jt808.service;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importcn.hylexus.jt808.server.SessionManager;
importcn.hylexus.jt808.service.codec.MsgDecoder;
importcn.hylexus.jt808.vo.PackageData;
importcn.hylexus.jt808.vo.Session;
importio.netty.buffer.ByteBuf;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelInboundHandlerAdapter;
importio.netty.handler.timeout.IdleState;
importio.netty.handler.timeout.IdleStateEvent;
importio.netty.util.ReferenceCountUtil;
publicclassTCPServerHandlerextendsChannelInboundHandlerAdapter{//(1)
privatefinalLoggerlogger=LoggerFactory.getLogger(getClass());
//一个维护客户端连接的类
privatefinalSessionManagersessionManager;
privateMsgDecoderdecoder=newMsgDecoder();
publicTCPServerHandler(){
this.sessionManager=SessionManager.getInstance();
}
@Override
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsInterruptedException{//(2)
try{
ByteBufbuf=(ByteBuf)msg;
if(buf.readableBytes()<=0){
//ReferenceCountUtil.safeRelease(msg);
return;
}
byte[]bs=newbyte[buf.readableBytes()];
buf.readBytes(bs);
PackageDatajt808Msg=this.decoder.queueElement2PackageData(bs);
//处理客户端消息
this.processClientMsg(jt808Msg);
}finally{
release(msg);
}
}
privatevoidprocessClientMsg(PackageDatajt808Msg){
//TODO更加消息ID的不同,分别实现自己的业务逻辑
if(jt808Msg.getMsgHeader().getMsgId()==0x900){
//TODO...
}elseif(jt808Msg.getMsgHeader().getMsgId()==0x9001){
//TODO...
}
//elseif(){}
//elseif(){}
//elseif(){}
//elseif(){}
//...
else{
logger.error("位置消息,消息ID={}",jt808Msg.getMsgHeader().getMsgId());
}
}
@Override
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause){//(4)
logger.error("发生异常:{}",cause.getMessage());
cause.printStackTrace();
}
@Override
publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
Sessionsession=Session.buildSession(ctx.channel());
sessionManager.put(session.getId(),session);
logger.debug("终端连接:{}",session);
}
@Override
publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{
finalStringsessionId=ctx.channel().id().asLongText();
Sessionsession=sessionManager.findBySessionId(sessionId);
this.sessionManager.removeBySessionId(sessionId);
logger.debug("终端断开连接:{}",session);
ctx.channel().close();
//ctx.close();
}
@Override
publicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{
if(IdleStateEvent.class.isAssignableFrom(evt.getClass())){
IdleStateEventevent=(IdleStateEvent)evt;
if(event.state()==IdleState.READER_IDLE){
Sessionsession=this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
logger.error("服务器主动断开连接:{}",session);
ctx.close();
}
}
}
privatevoidrelease(Objectmsg){
try{
ReferenceCountUtil.release(msg);
}catch(Exceptione){
e.printStackTrace();
}
}
}
2.3.3用到的其他类
packagecn.hylexus.jt808.server;
importjava.util.List;
importjava.util.Map;
importjava.util.Map.Entry;
importjava.util.Set;
importjava.util.concurrent.ConcurrentHashMap;
importjava.util.function.BiConsumer;
importjava.util.stream.Collectors;
importcn.hylexus.jt808.vo.Session;
publicclassSessionManager{
privatestaticvolatileSessionManagerinstance=null;
//netty生成的sessionID和Session的对应关系
privateMapsessionIdMap;
//终端手机号和netty生成的sessionID的对应关系
privateMapphoneMap;
publicstaticSessionManagergetInstance(){
if(instance==null){
synchronized(SessionManager.class){
if(instance==null){
instance=newSessionManager();
}
}
}
returninstance;
}
publicSessionManager(){
this.sessionIdMap=newConcurrentHashMap<>();
this.phoneMap=newConcurrentHashMap<>();
}
publicbooleancontainsKey(StringsessionId){
returnsessionIdMap.containsKey(sessionId);
}
publicbooleancontainsSession(Sessionsession){
returnsessionIdMap.containsValue(session);
}
publicSessionfindBySessionId(Stringid){
returnsessionIdMap.get(id);
}
publicSessionfindByTerminalPhone(Stringphone){
StringsessionId=this.phoneMap.get(phone);
if(sessionId==null)
returnnull;
returnthis.findBySessionId(sessionId);
}
publicsynchronizedSessionput(Stringkey,Sessionvalue){
if(value.getTerminalPhone()!=null&&!"".equals(value.getTerminalPhone().trim())){
this.phoneMap.put(value.getTerminalPhone(),value.getId());
}
returnsessionIdMap.put(key,value);
}
publicsynchronizedSessionremoveBySessionId(StringsessionId){
if(sessionId==null)
returnnull;
Sessionsession=sessionIdMap.remove(sessionId);
if(session==null)
returnnull;
if(session.getTerminalPhone()!=null)
this.phoneMap.remove(session.getTerminalPhone());
returnsession;
}
publicSetkeySet(){
returnsessionIdMap.keySet();
}
publicvoidforEach(BiConsumeraction){
sessionIdMap.forEach(action);
}
publicSet>entrySet(){
returnsessionIdMap.entrySet();
}
publicListtoList(){
returnthis.sessionIdMap.entrySet().stream().map(e->e.getValue()).collect(Collectors.toList());
}
}
3demo级别java示例
请移步:https://github.com/hylexus/jt-808-protocol
另请不要吝啬,在GitHub给个star让小可装装逼…………^_^
注
急急忙忙写的博客,先写个大致的思路,有疑问可以联系本人,联系方式:
emial:hylexus@163.com
到此这篇关于java解析JT808协议的实现代码的文章就介绍到这了,更多相关java解析JT808内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。