golang实现redis的延时消息队列功能示例
前言
在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现。本次主要采用了redis中zset中的zadd,zrangebyscore和zdel来实现一个小demo。
提前准备安装redis,redis-go
因为用的是macOS,直接
$brewinstallredis $gogetgithub.com/garyburd/redigo/redis
又因为比较懒,生成任务的唯一id时,直接采用了bson中的objectId,所以:
$gogetgopkg.in/mgo.v2/bson
唯一id不是必须有,但如果之后有实际应用需要携带,便于查找相应任务。
生产者
通过一个for循环生成10w个任务,每一个任务有不同的时间
funcproducer(){ count:=0 //生成100000个任务 forcount<100000{ count++ dealTime:=int64(rand.Intn(5))+time.Now().Unix() uuid:=bson.NewObjectId().Hex() redis.Client.AddJob(&job.JobMessage{ Id:uuid, DealTime:dealTime, },+int64(dealTime)) } }
其中AddJob函数在另一个包中,将上一个函数中随机生成的时间作为需要处理的时间戳.
//添加任务 func(client*RedisClient)AddJob(msg*job.JobMessage,dealTimeint64){ conn:=client.Get() deferconn.Close() key:="JOB_MESSAGE_QUEUE" conn.Do("zadd",key,dealTime,util.JsonEncode(msg)) }
消费者
消费者处理流程分为两个步骤:
- 获取小于等于当前时间戳的任务
- 通过删除当前任务来判断谁获得了当前任务
因为在获取小于等于当前时间戳的任务时,可能有多个goroutine同时读到了当前任务,而只有一个任务可以来处理当前任务。因此我们需要通过一个方案来判断究竟由谁来处理这个任务(当然如果只有一个消费者可以读到就直接处理):这个时候可以通过redis的删除操作来获取,因为删除指定value时只有成功的操作才会返回不为0,所以我们可以认为删除当前队列成功的那个goroutine拿到了当前的任务。
下面是代码:
//消费者 funcconsumer(){ //启动10个goroutine一起去拿 count:=0 forcount<10{ gofunc(){ for{ jobs:=redis.Client.GetJob() iflen(jobs)<=0{ time.Sleep(time.Second*1) continue } currentJob:=jobs[0] //如果当前抢redis队列成功, ifredis.Client.DelJob(currentJob)>0{ varjobMessagejob.JobMessage util.JsonDecode(currentJob,&jobMessage)//自定义的json解析函数 handleMessage(&jobMessage) } } }() count++ } } //处理任务用函数 funchandleMessage(msg*job.JobMessage){ fmt.Printf("dealjob:%s,requiretime:%d\n",msg.Id,msg.DealTime) gofunc(){ countChan<-true }() }
redis部分的代码,获取任务和删除任务
//获取任务 func(client*RedisClient)GetJob()[]string{ conn:=client.Get() deferconn.Close() key:="JOB_MESSAGE_QUEUE" timeNow:=time.Now().Unix() ret,err:=redis.Strings(conn.Do("zrangebyscore",key,0,timeNow,"limit",0,1)) iferr!=nil{ panic(err) } returnret } //删除当前任务,用来判断是否抢到了当前任务 func(client*RedisClient)DelJob(valuestring)int{ conn:=client.Get() deferconn.Close() key:="JOB_MESSAGE_QUEUE" ret,err:=redis.Int(conn.Do("zrem",key,value)) iferr!=nil{ panic(err) } returnret }
代码大抵如此。最后跑起来之后,大概每3-4秒钟能够处理掉1w个任务,速度上确实是...
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。