上一篇基于 Redis 的 list 实现了一个简单的消息队列: 玩转 Redis - 简单消息队列
源码地址 使用 demo
产品经理经常说的一句话, 我们不光要有 X 功能, 还要 Y 功能, 这样客户才能更满意. 同样的, 只有简单消息队列是不够的, 还要有延时消息队列才能算是一个完整的消息队列.
看看 Redis 的命令, 放眼望去, 的有序集合 (sorted set) 就是一个很好用的命令, 完全可以用他做一个延时消息队列
Redis 有序集合(sorted set)
Redis 有序集合, 每个元素都会关联一个 double 类型的分数. Redis 正是通过分数来为集合中的成员进行从小到大的排序.
有序集合的成员是唯一的, 但分数 (score) 却可以重复.
简单操作
添加数据
- 127.0.0.1:6379> ZADD testSet1 5 a
- (integer) 1
- 127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
- (integer) 3
读取
- 127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
- 1) "b"
- 127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
- 1) "b"
- 2) "a"
也可以把 score 打出来
- 127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
- 1) "b"
- 2) "1"
- 3) "a"
- 4) "5"
查出所有的数据
- 127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
- 1) "b"
- 2) "a"
- 3) "d"
- 4) "c"
删除数据
ZREMRANGEBYSCORE testSet1 0 2
延时队列的实现思路
总体的思路很简单, 就是每一个 value 的 score 保存的是时间, 也就是说, 在添加一个元素时他的 score 是当前时间 + 延时的时间. 轮循获取数据时, 查找小于或等于当前时间的数据项, 就是具体的延时消息.
还有一个问题, 就是 ZRANGEBYSCORE 和 list 的 pop 不同, pop 是取出元素并且会把元素在 list 中删除. ZRANGEBYSCORE 只会取出数据不会把数据从 sorted set 中删除. 解决方法 1, 利用 Redis 的事务, 先 ZRANGEBYSCORE 取出数据, 然后再用 ZREMRANGEBYSCORE 把数据删除.
具体实现 - code
添加延时消息, 参数 delay 就是我们要延时多久:
- func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
- if delay <= 0 {
- return errors.New("delay need great than zero")
- }
- tm := time.Now().Add(delay)
- msg := NewMessage("", body)
- msg.DelayTime = tm.Unix()
- sendData, _ := JSON.Marshal(msg)
- return p.redisCmd.ZAdd(topicName+zsetSuffix, Redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err()
- }
使用, 比如我们想过 1 秒再处理
producer.PublishDelayMsg(topicName, body, time.Second)
读取消息并处理
这就比较简单了, 就是在一个 ticker 里循环读取小于或等于当前时间的数据:
- func (s *consumer) startGetDelayMessage() {
- go func() {
- ticker := time.NewTicker(s.options.RateLimitPeriod)
- defer func() {
- log.Println("stop get delay message.")
- ticker.Stop()
- }()
- topicName := s.topicName + zsetSuffix
- for {
- currentTime := time.Now().Unix()
- select {
- case <-s.ctx.Done():
- log.Printf("context Done msg: %#v \n", s.ctx.Err())
- return
- case <-ticker.C:
- var valuesCmd *Redis.ZSliceCmd
- _, err := s.redisCmd.TxPipelined(func(pip Redis.Pipeliner) error {
- valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime)
- pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10))
- return nil
- })
- if err != nil {
- log.Printf("zset pip error: %#v \n", err)
- continue
- }
- rev := valuesCmd.Val()
- for _, revBody := range rev {
- msg := &Message{}
- JSON.Unmarshal([]byte(revBody.Member.(string)), msg)
- if s.handler != nil {
- s.handler.HandleMessage(msg)
- }
- }
- }
- }
- }()
- }
来源: https://www.cnblogs.com/li-peng/p/12697110.html