什么是Asynq
Asynq是一个go语言实现的分布式任务队列和异步处理库,基于redis,类似sidekiq和celery,他具有以下特点:
- 保证至少执行一次任务
- 持久化
- 失败重试
- worker崩溃自动恢复
- 优先队列
- 暂停队列
- 支持中间件
- 允许唯一任务
- 周期性任务
- 支持Redis Cluster实现自动分片
- 支持Redis Sentinels实现高可用
- 支持Prometheus metrics
- 提供web ui管理
- 提供cli管理
安装
代码语言:javascript复制# 代码库:
go get -u github.com/hibiken/asynq
# 命令行工具:
go get -u github.com/hibiken/asynq/tools/asynq
使用
1. 准备
先准备一个redis,单点或者集群都ok。
2. worker端(消费端)
main.go:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/hibiken/asynq"
"golang.org/x/sys/unix"
)
func main() {
// asynq server
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: ":6379",
Password: "Your password",
DB: 0,
},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
// some middlewares
mux.Use(func(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
// just record a log
fmt.Println(fmt.Printf("[%s] log - % v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))
return next.ProcessTask(ctx, t)
})
})
// some workers
mux.HandleFunc("msg", HandleMsg)
// start server
if err := srv.Start(mux); err != nil {
log.Fatalf("could not start server: %v", err)
}
// Wait for termination signal.
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM,
syscall.SIGQUIT,
//syscall.SIGUSR1, syscall.SIGUSR2,
)
for {
s := <- c
switch s {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
fmt.Println("Program Exit...", s)
srv.Shutdown()
srv.Stop()
return
//case syscall.SIGUSR1:
// fmt.Println("usr1 signal", s)
//case syscall.SIGUSR2:
// fmt.Println("usr2 signal", s)
default:
fmt.Println("other signal", s)
}
}
}
worker.go:
package main
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
// HandleMsg 处理msg
func HandleMsg(ctx context.Context, t *asynq.Task) error {
//fmt.Println("------HandleMsg start------")
log.Printf("type: %v, payload: %s", t.Type(), string(t.Payload()))
return nil
}
创建任务enqueue_test.go:
package main
import (
"fmt"
"os"
"testing"
"time"
"github.com/hibiken/asynq"
)
var c *asynq.Client
func TestMain(m *testing.M) {
r := asynq.RedisClientOpt{
Addr: ":6379",
Password: "Your password",
DB: 0,
}
c = asynq.NewClient(r)
ret := m.Run()
c.Close()
os.Exit(ret)
}
// 即时消费
func Test_Enqueue(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}
task := asynq.NewTask("msg", payload)
res, err := c.Enqueue(task)
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: % vn", res)
}
// 延时消费
func Test_EnqueueDelay(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
task := asynq.NewTask("msg", payload)
res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
// res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: % vn", res)
}
// 超时、重试、过期
func Test_EnqueueOther(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
task := asynq.NewTask("msg", payload)
// 10秒超时,最多重试3次,20秒后过期
res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: % vn", res)
}
测试:
先启动worker:
$ go run main.go worker.go
创建任务:
代码语言:javascript复制$ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1
=== RUN Test_Enqueue
Enqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475 0000 UTC ProcessAt:2021-06-11 18:41:49.017778 0800 CST m= 0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00 0800 CST}
--- PASS: Test_Enqueue (0.00s)
PASS
ok asynq_test 0.009s
监控与管理
命令行工具asynq
https://github.com/hibiken/asynq/tree/master/tools/asynq
代码语言:javascript复制$ asynq -p Yourpassword stats
Task Count by State
active pending scheduled retry archived
---------- -------- --------- ----- ----
0 0 0 0 0
Task Count by Queue
default
-------
0
Daily Stats 2021-06-11 UTC
processed failed error rate
--------- ------ ----------
4 0 0.00%
Redis Info
version uptime connections memory usage peak memory usage
------- ------ ----------- ------------ -----------------
6.2.0 0 days 5 16.04MB 16.14MB
Web UI
https://github.com/hibiken/asynqmon 启动:
代码语言:javascript复制./asynqmon --port=3000 --redis-addr=localhost:6380