详解redis是如何实现队列消息的ack
前言
由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。
原生的redis中通过L/RPUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列)做个小小的调整。
大体思路为在POP时将pop出的数据放到备份的地方,当有ACK请求(确认消息被消耗)后将备份的信息删除掉;每次在pop前需要检查备份队列中有没有过期的数据没有ack的,如果有则PUSH到list中后再从list中POP出来。
以下脚本使用lua实现,只需要在执行前加载到redis中即可。
消息本身需要包含id属性
push没什么问题,原生即可(此处以LPUSH为例)
pop时脚本
localnot_empty=function(x) return(type(x)=="table")and(notx.err)and(#x~=0) end localqName=ARGV[1]--队列名称 localcurrentTime=ARGV[2]--当前时间,这个需要从外部传入,不能使用redis自身时间,如果使用自身时间可能导致redis本身的backup在重放请求时出现不一致性 localconsiderAsFailMaxTimeSpan=ARGV[3]--超时时间设定,当消息超过一定时间还没有ack则认为此消息需要再次入队 localzsetName=qName..'BACKUP' localhashName=qName..'CONTEXT' localtmp=redis.call('ZRANGEBYSCORE',zsetName,'-INF',tonumber(currentTime)-tonumber(considerAsFailMaxTimeSpan),'LIMIT',0,1) if(not_empty(tmp))then redis.call('ZREM',zsetName,tmp[1])--此处拿出的为消息的唯一id redis.call('LPUSH',qName,redis.call('HGET',hashName,tmp[1])) end tmp=redis.call('RPOP',qName) if(tmp)then localmsg=cjson.decode(tmp) localid=msg['id'] redis.call('ZADD',zsetName,tonumber(currentTime),id) redis.call('HSET',hashName,id,tmp) end returntmp
ack时候比较简单,只需要将指定id从set和hash中删除即可
localkey=ARGV[1] localqName=ARGV[2] redis.call('ZREM',qName..'BACKUP',key) redis.call('HDEL',qName..'CONTEXT',key)
在程序中使用前需要显示load这两个脚本,后面直接调用这两个脚本的sha值即可执行。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。