一。背景
最近项目中有个需求要实现限制某个用户在2个小时内只能访问3次的效果,所以就简单地用redis实现了一个限流方案。
二。 设计实现
1. 方法定义
代码语言:txt复制// RateLimiter redis的实现限流器
// userId 限流用户id
// funcName 需要被限流方法
// period 窗口的周期,单位为秒
// limit 窗口的大小
func RateLimiter(redisClient *redis.Client, userId string, funcName string, period int64,limit int64) (ok bool)
2. 实现方案
1. SortedSet pipeline实现
使用redis的SortedSet 数据结构实现,实现逻辑如下:
代码语言:txt复制func RateLimiter(redisClient *redis.Client, userId string, funcName string, period int64, limit int64) bool {
pipeline := redisClient.Pipeline()
defer pipeline.Close()
key := "rate_limit:" userId ":" funcName
// step1。获取当前时刻的微妙数
currMs := time.Now().UnixNano() / 10e3
// step2。把微妙数放入redis中
pipeline.ZAdd(key, redis.Z{Score: float64(currMs), Member: currMs})
// step3。 移除窗口外的member
pipeline.ZRemRangeByScore(key, "0", strconv.FormatInt(currMs-10e6*period, 10))
// step4。 设置窗口过期时间
pipeline.Expire(key, time.Duration(period 1)*time.Second)
// step5。 计算窗口内member个数
pipeline.ZCard(key)
// 提交命令
cmder, err := pipeline.Exec()
if err != nil {
fmt.Errorf(err.Error())
return false
}
// 获取当前窗口的member数
currentCount, err := cmder[3].(*redis.IntCmd).Result()
if err != nil {
fmt.Errorf(err.Error())
return false
}
return currentCount <= limit
}
这种方式有个问题就是不管用户请求次数是否达到了limit都会做插入操作,极端场景会导致redis内存过大。主要是因为pipeline无法依赖前面命令返回结果。所以我决定用lua来做一轮改造。
2. 使用SortedSet lua脚本改进
实现逻辑基本和原来一样,只是在插入前多判断了一次窗口是否已经满了,满课的话就直接返回false, 实现逻辑如下:
代码语言:txt复制// 生成lua脚本
func createScript() *redis.Script {
script := redis.NewScript(`
local key = KEYS[1]
local cur_ms = ARGV[1]
local period = tonumber(ARGV[2]) or 0
local limit = tonumber(ARGV[3]) or 0
local t1 = redis.call('ZREMRANGEBYSCORE', key, 0, cur_ms-(10e6*period))
local t2 = redis.call('EXPIRE',key, period 1)
local cur_count = redis.call('ZCARD',key)
if tonumber(cur_count) < limit then
local t3 = redis.call('ZADD',key,cur_ms,cur_ms)
return true
end
return false
`)
return script
}
// RateLimiter redis的实现限流器
// userId 限流用户id
// funcName 限流方法
// period 窗口的周期,单位为秒
// limit 窗口的大小
func RateLimiter2(client *redis.Client, userId string, funcName string, period int64, limit int64) bool {
script := createScript()
sha, err := script.Load(client).Result()
if err != nil {
fmt.Print(err.Error())
}
key := "rate_limit:" userId ":" funcName
// step1。获取当前时刻的微妙数
currMs := time.Now().UnixNano() / 10e3
ret := client.EvalSha(sha, []string{
key,
}, currMs, period, limit)
// step2。获取执行后的结果
result, err := ret.Result()
if err != nil {
fmt.Printf("Execute Redis fail: %v", err.Error())
return false
} else {
fmt.Printf("userid: %s, funcName:%s, result: %d", userId, funcName, result)
}
if result == int64(1) {
return true
}
return false
}
三。测试
使用goconvey对已经写好的模块进行单元测试,测试组件功能是否ok
代码语言:txt复制package examples
import (
"github.com/go-redis/redis"
"time"
// 引入go convey库
. "github.com/smartystreets/goconvey/convey"
"testing"
)
/**
* @Description
* @Author guirongguo
* @Email 3095764372@qq.com
* @Date 2021/8/20 09:35
**/
func TestRateLimiter(t *testing.T) {
Convey("test utils rate_limiter", t, func() {
redisClient := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
IdleTimeout: 1 * time.Minute,
IdleCheckFrequency: 30 * time.Second,
PoolSize: 30, // 连接池最大socket连接数,默认为4倍CPU数, 4 * runtime.NumCPU
MinIdleConns: 10,
})
type Args struct {
userId, funcName string
period, limit int64
want bool
}
args := []Args{
{"ggr", "test1", 30, 1, true},
{"ggr", "test1", 30, 1, false},
{"ggr", "test2", 15, 2, true},
{"ggr", "test2", 15, 2, true},
{"ggr", "test2", 15, 2, false},
{"ggr", "test21", 3600, 1, true},
{"ggr", "test21", 3600, 1, false},
{"ggr", "test31", 3600, 2, true},
{"ggr", "test31", 3600, 2, true},
{"ggr", "test31", 3600, 2, false},
}
for i := 0; i < len(args); i {
/*ok1 := RateLimiter(redisClient, args[i].userId, args[i].funcName, args[i].period, args[i].limit)
So(ok1, ShouldEqual, args[i].want)*/
ok2 := RateLimiter2(redisClient, args[i].userId, args[i].funcName, args[i].period, args[i].limit)
So(ok2, ShouldEqual, args[i].want)
}
})
}