golang基于redis lua封装的优先级去重队列

前言:

 前两天由于某几个厂商的api出问题,导致后台任务大量堆积,又因为我这边任务流系统会重试超时任务,所以导致队列中有大量的重复任务。这时候我们要临时解决两个事情,一件事情,让一些高质量的任务优先执行; 另一件事情, 要有去重。 rabbitmq不能很好的针对这类情况去重、分优先级。  

 这时候我又想到了我最爱的redis…   去重?  list + set 就可以解决, 优先级,zset + zrange + zrem 也可以解决…  但问题这几个命令非原子,那么怎么让他们原子?   写模块 or redis lua script .   首先在 redis 4.x 写了个简单的module,但写完了发现一件颇为重要的事情,我们线上的是3.2 ….  然后又花了点时间改成redis lua的版本。项目本身的功能实现很简单,复杂的是创意 !!! 

项目名: redis_unique_queue, 项目地址,https://github.com/rfyiamcool/redis_unique_queue

主要功能介绍:

使用redis lua script 封装的去重及优先级队列方法, 达到了组合命令的原子性和节省来往的io请求的目的.

去重队列:

不仅能保证FIFO, 而且去重.

优先级去重队列:

按照优先级获取任务, 并且去重.

使用方法:

# xiaorui.cc

PriorityQueue

NewPriorityQueue(priority int, unique bool, r *redis.Pool)

Push(q string, body string, pri int) (int, error)

Pop(q string) (resp string, err error)
UniqueQueue

NewUniqueQueue(r *redis.Pool) *UniqueQueue

UniquePush(q string, body string) (int, error)

UniquePop(q string) (resp string, err error)

more..

下面是优先级去重队列的例子:

package main

// xiaorui.cc

import (
    "fmt"

    "github.com/rfyiamcool/redis_unique_queue"
)

func main() {
    fmt.Println("start")
    redis_client_config := unique_queue.RedisConfType{
        RedisPw:          "",
        RedisHost:        "127.0.0.1:6379",
        RedisDb:          0,
        RedisMaxActive:   100,
        RedisMaxIdle:     100,
        RedisIdleTimeOut: 1000,
    }
    redis_client := unique_queue.NewRedisPool(redis_client_config)

    qname := "xiaorui.cc"
    body := "message from xiaorui.cc"

    u := unique_queue.NewPriorityQueue(3, true, redis_client)
    // 3: 3个优先级,从1-3级
    // true: 开启unique set

    u.Push(qname, body, 2)
    // 2, 优先级

    fmt.Println(u.Pop(qname))
}

单单使用 去重队列的例子:

package main

import (
    "fmt"

    "github.com/rfyiamcool/redis_unique_queue"
)

func main() {
    fmt.Println("start")
    redis_client_config := unique_queue.RedisConfType{
        RedisPw:          "",
        RedisHost:        "127.0.0.1:6379",
        RedisDb:          0,
        RedisMaxActive:   100,
        RedisMaxIdle:     100,
        RedisIdleTimeOut: 1000,
    }
    redis_client := unique_queue.NewRedisPool(redis_client_config)


    qname := "xiaorui.cc"
    u := unique_queue.NewUniqueQueue(redis_client)
    for i := 0; i < 100; i++ {
        u.UniquePush(qname, "body...")
    }

    fmt.Println(u.Length(qname))

    for i := 0; i < 100; i++ {
        u.UniquePop(qname)
    }

    fmt.Println(u.Length(qname))


    fmt.Println("end")
}

需要改进地址也是很多, 比如 加入批量操作, 对于redis连接池引入方法改进等.

END.

标签:LUAGolangRedis 发布于:2019-11-04 20:55:42