Java利用Redis实现消息队列的示例代码
本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下:
应用场景
为什么要用redis?
二进制存储、java序列化传输、IO连接数高、连接频繁
一、序列化
这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象;主要是用到了ByteArrayOutputStream和ByteArrayInputStream;注意:每个需要序列化的对象都要实现Serializable接口;
其代码如下:
packageUtils; importjava.io.*; /** *CreatedbyKinglfon2016/10/17. */ publicclassObjectUtil{ /** *对象转byte[] *@paramobj *@return *@throwsIOException */ publicstaticbyte[]object2Bytes(Objectobj)throwsIOException{ ByteArrayOutputStreambo=newByteArrayOutputStream(); ObjectOutputStreamoo=newObjectOutputStream(bo); oo.writeObject(obj); byte[]bytes=bo.toByteArray(); bo.close(); oo.close(); returnbytes; } /** *byte[]转对象 *@parambytes *@return *@throwsException */ publicstaticObjectbytes2Object(byte[]bytes)throwsException{ ByteArrayInputStreamin=newByteArrayInputStream(bytes); ObjectInputStreamsIn=newObjectInputStream(in); returnsIn.readObject(); } }
二、消息类(实现Serializable接口)
packageModel; importjava.io.Serializable; /** *CreatedbyKinglfon2016/10/17. */ publicclassMessageimplementsSerializable{ privatestaticfinallongserialVersionUID=-389326121047047723L; privateintid; privateStringcontent; publicMessage(intid,Stringcontent){ this.id=id; this.content=content; } publicintgetId(){ returnid; } publicvoidsetId(intid){ this.id=id; } publicStringgetContent(){ returncontent; } publicvoidsetContent(Stringcontent){ this.content=content; } }
三、Redis的操作
利用redis做队列,我们采用的是redis中list的push和pop操作;
结合队列的特点:
只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或pop的对象仅需要转换成byte[]即可
java采用Jedis进行Redis的存储和Redis的连接池设置
上代码:
packageUtils; importredis.clients.jedis.Jedis; importredis.clients.jedis.JedisPool; importredis.clients.jedis.JedisPoolConfig; importjava.util.List; importjava.util.Map; importjava.util.Set; /** *CreatedbyKinglfon2016/10/17. */ publicclassJedisUtil{ privatestaticStringJEDIS_IP; privatestaticintJEDIS_PORT; privatestaticStringJEDIS_PASSWORD; privatestaticJedisPooljedisPool; static{ //Configuration自行写的配置文件解析类,继承自Properties Configurationconf=Configuration.getInstance(); JEDIS_IP=conf.getString("jedis.ip","127.0.0.1"); JEDIS_PORT=conf.getInt("jedis.port",6379); JEDIS_PASSWORD=conf.getString("jedis.password",null); JedisPoolConfigconfig=newJedisPoolConfig(); config.setMaxActive(5000); config.setMaxIdle(256); config.setMaxWait(5000L); config.setTestOnBorrow(true); config.setTestOnReturn(true); config.setTestWhileIdle(true); config.setMinEvictableIdleTimeMillis(60000L); config.setTimeBetweenEvictionRunsMillis(3000L); config.setNumTestsPerEvictionRun(-1); jedisPool=newJedisPool(config,JEDIS_IP,JEDIS_PORT,60000); } /** *获取数据 *@paramkey *@return */ publicstaticStringget(Stringkey){ Stringvalue=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); value=jedis.get(key); }catch(Exceptione){ jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ close(jedis); } returnvalue; } privatestaticvoidclose(Jedisjedis){ try{ jedisPool.returnResource(jedis); }catch(Exceptione){ if(jedis.isConnected()){ jedis.quit(); jedis.disconnect(); } } } publicstaticbyte[]get(byte[]key){ byte[]value=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); value=jedis.get(key); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnvalue; } publicstaticvoidset(byte[]key,byte[]value){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.set(key,value); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } publicstaticvoidset(byte[]key,byte[]value,inttime){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.set(key,value); jedis.expire(key,time); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } publicstaticvoidhset(byte[]key,byte[]field,byte[]value){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.hset(key,field,value); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } publicstaticvoidhset(Stringkey,Stringfield,Stringvalue){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.hset(key,field,value); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } /** *获取数据 * *@paramkey *@return */ publicstaticStringhget(Stringkey,Stringfield){ Stringvalue=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); value=jedis.hget(key,field); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnvalue; } /** *获取数据 * *@paramkey *@return */ publicstaticbyte[]hget(byte[]key,byte[]field){ byte[]value=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); value=jedis.hget(key,field); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnvalue; } publicstaticvoidhdel(byte[]key,byte[]field){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.hdel(key,field); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } /** *存储REDIS队列顺序存储 *@paramkeyreids键名 *@paramvalue键值 */ publicstaticvoidlpush(byte[]key,byte[]value){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.lpush(key,value); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } /** *存储REDIS队列反向存储 *@paramkeyreids键名 *@paramvalue键值 */ publicstaticvoidrpush(byte[]key,byte[]value){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.rpush(key,value); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } /** *将列表source中的最后一个元素(尾元素)弹出,并返回给客户端 *@paramkeyreids键名 *@paramdestination键值 */ publicstaticvoidrpoplpush(byte[]key,byte[]destination){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.rpoplpush(key,destination); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } /** *获取队列数据 *@paramkey键名 *@return */ publicstaticListlpopList(byte[]key){ Listlist=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); list=jedis.lrange(key,0,-1); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnlist; } /** *获取队列数据 *@paramkey键名 *@return */ publicstaticbyte[]rpop(byte[]key){ byte[]bytes=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); bytes=jedis.rpop(key); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnbytes; } publicstaticvoidhmset(Objectkey,Maphash){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.hmset(key.toString(),hash); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } publicstaticvoidhmset(Objectkey,Maphash,inttime){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.hmset(key.toString(),hash); jedis.expire(key.toString(),time); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } publicstaticListhmget(Objectkey,String...fields){ Listresult=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); result=jedis.hmget(key.toString(),fields); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnresult; } publicstaticSethkeys(Stringkey){ Setresult=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); result=jedis.hkeys(key); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnresult; } publicstaticListlrange(byte[]key,intfrom,intto){ Listresult=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); result=jedis.lrange(key,from,to); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnresult; } publicstaticMaphgetAll(byte[]key){ Mapresult=null; Jedisjedis=null; try{ jedis=jedisPool.getResource(); result=jedis.hgetAll(key); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnresult; } publicstaticvoiddel(byte[]key){ Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.del(key); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } } publicstaticlongllen(byte[]key){ longlen=0; Jedisjedis=null; try{ jedis=jedisPool.getResource(); jedis.llen(key); }catch(Exceptione){ //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally{ //返还到连接池 close(jedis); } returnlen; } }
四、Configuration主要用于读取Redis的配置信息
packageUtils; importjava.io.IOException; importjava.io.InputStream; importjava.util.Properties; /** *CreatedbyKinglfon2016/10/17. */ publicclassConfigurationextendsProperties{ privatestaticfinallongserialVersionUID=-2296275030489943706L; privatestaticConfigurationinstance=null; publicstaticsynchronizedConfigurationgetInstance(){ if(instance==null){ instance=newConfiguration(); } returninstance; } publicStringgetProperty(Stringkey,StringdefaultValue){ Stringval=getProperty(key); return(val==null||val.isEmpty())?defaultValue:val; } publicStringgetString(Stringname,StringdefaultValue){ returnthis.getProperty(name,defaultValue); } publicintgetInt(Stringname,intdefaultValue){ Stringval=this.getProperty(name); return(val==null||val.isEmpty())?defaultValue:Integer.parseInt(val); } publiclonggetLong(Stringname,longdefaultValue){ Stringval=this.getProperty(name); return(val==null||val.isEmpty())?defaultValue:Integer.parseInt(val); } publicfloatgetFloat(Stringname,floatdefaultValue){ Stringval=this.getProperty(name); return(val==null||val.isEmpty())?defaultValue:Float.parseFloat(val); } publicdoublegetDouble(Stringname,doubledefaultValue){ Stringval=this.getProperty(name); return(val==null||val.isEmpty())?defaultValue:Double.parseDouble(val); } publicbytegetByte(Stringname,bytedefaultValue){ Stringval=this.getProperty(name); return(val==null||val.isEmpty())?defaultValue:Byte.parseByte(val); } publicConfiguration(){ InputStreamin=ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml"); try{ this.loadFromXML(in); in.close(); }catch(IOExceptionioe){ } } }
五、测试
importModel.Message; importUtils.JedisUtil; importUtils.ObjectUtil; importredis.clients.jedis.Jedis; importjava.io.IOException; /** *CreatedbyKinglfon2016/10/17. */ publicclassTestRedisQueue{ publicstaticbyte[]redisKey="key".getBytes(); static{ try{ init(); }catch(IOExceptione){ e.printStackTrace(); } } privatestaticvoidinit()throwsIOException{ for(inti=0;i<1000000;i++){ Messagemessage=newMessage(i,"这是第"+i+"个内容"); JedisUtil.lpush(redisKey,ObjectUtil.object2Bytes(message)); } } publicstaticvoidmain(String[]args){ try{ pop(); }catch(Exceptione){ e.printStackTrace(); } } privatestaticvoidpop()throwsException{ byte[]bytes=JedisUtil.rpop(redisKey); Messagemsg=(Message)ObjectUtil.bytes2Object(bytes); if(msg!=null){ System.out.println(msg.getId()+"----"+msg.getContent()); } } }
每执行一次pop()方法,结果如下:
1----这是第1个内容
2----这是第2个内容
3----这是第3个内容
4----这是第4个内容
总结
至此,整个Redis消息队列的生产者和消费者代码已经完成
1.Message需要传送的实体类(需实现Serializable接口)
2.ConfigurationRedis的配置读取类,继承自Properties
3.ObjectUtil将对象和byte数组双向转换的工具类
4.Jedis通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。