Go每日一库之112:asynq

2023-09-30 08:35:33 浏览数 (2)

什么是Asynq

Asynq是一个go语言实现的分布式任务队列和异步处理库,基于redis,类似sidekiq和celery,他具有以下特点:

  • 保证至少执行一次任务
  • 持久化
  • 失败重试
  • worker崩溃自动恢复
  • 优先队列
  • 暂停队列
  • 支持中间件
  • 允许唯一任务
  • 周期性任务
  • 支持Redis Cluster实现自动分片
  • 支持Redis Sentinels实现高可用
  • 支持Prometheus metrics
  • 提供web ui管理
  • 提供cli管理

安装

代码语言:javascript复制
# 代码库:
go get -u github.com/hibiken/asynq

# 命令行工具:
go get -u github.com/hibiken/asynq/tools/asynq

使用

1. 准备

先准备一个redis,单点或者集群都ok。

2. worker端(消费端)

main.go:

代码语言:javascript复制
package main

import (
 "context"
 "fmt"
 "log"
 "os"
 "os/signal"
 "time"

 "github.com/hibiken/asynq"
 "golang.org/x/sys/unix"
)

func main() {
    // asynq server
    srv := asynq.NewServer(
        asynq.RedisClientOpt{
            Addr:     ":6379",
            Password: "Your password",
            DB:       0,
        },
        asynq.Config{Concurrency: 20},
    )

    mux := asynq.NewServeMux()

    // some middlewares
    mux.Use(func(next asynq.Handler) asynq.Handler {
        return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
            // just record a log
            fmt.Println(fmt.Printf("[%s] log - % v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))

            return next.ProcessTask(ctx, t)
        })
    })

    // some workers
    mux.HandleFunc("msg", HandleMsg)

    // start server
    if err := srv.Start(mux); err != nil {
        log.Fatalf("could not start server: %v", err)
    }

    // Wait for termination signal.
	c := make(chan os.Signal, 1)

	signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM,
		syscall.SIGQUIT,
		//syscall.SIGUSR1, syscall.SIGUSR2,
	)

	for  {
		s := <- c
		switch s {
		case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
			fmt.Println("Program Exit...", s)
			srv.Shutdown()
			srv.Stop()
			return
		//case syscall.SIGUSR1:
		//	fmt.Println("usr1 signal", s)
		//case syscall.SIGUSR2:
		//	fmt.Println("usr2 signal", s)
		default:
			fmt.Println("other signal", s)
		}
	}
}

worker.go:

代码语言:javascript复制
package main

import (
 "context"
 "fmt"

 "github.com/hibiken/asynq"
)

// HandleMsg 处理msg
func HandleMsg(ctx context.Context, t *asynq.Task)  error {
	//fmt.Println("------HandleMsg start------")

	log.Printf("type: %v, payload: %s", t.Type(), string(t.Payload()))

	return nil
}

创建任务enqueue_test.go:

代码语言:javascript复制
package main

import (
 "fmt"
 "os"
 "testing"
 "time"

 "github.com/hibiken/asynq"
)

var c *asynq.Client

func TestMain(m *testing.M) {
    r := asynq.RedisClientOpt{
        Addr:     ":6379",
        Password: "Your password",
        DB:       0,
    }
    c = asynq.NewClient(r)
    ret := m.Run()
    c.Close()
    os.Exit(ret)
}

// 即时消费
func Test_Enqueue(t *testing.T) {
    payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}
    task := asynq.NewTask("msg", payload)
    res, err := c.Enqueue(task)
    if err != nil {
        t.Errorf("could not enqueue task: %v", err)
        t.FailNow()
    }
    fmt.Printf("Enqueued Result: % vn", res)
}

// 延时消费
func Test_EnqueueDelay(t *testing.T) {
    payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
    task := asynq.NewTask("msg", payload)
    res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
    // res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
    if err != nil {
        t.Errorf("could not enqueue task: %v", err)
        t.FailNow()
    }
    fmt.Printf("Enqueued Result: % vn", res)
}

// 超时、重试、过期
func Test_EnqueueOther(t *testing.T) {
    payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
    task := asynq.NewTask("msg", payload)
    // 10秒超时,最多重试3次,20秒后过期
    res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
    if err != nil {
        t.Errorf("could not enqueue task: %v", err)
        t.FailNow()
    }
    fmt.Printf("Enqueued Result: % vn", res)
}

测试: 先启动worker:

代码语言:javascript复制
$ go run main.go worker.go

创建任务:

代码语言:javascript复制
$ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1

=== RUN   Test_Enqueue
Enqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475  0000 UTC ProcessAt:2021-06-11 18:41:49.017778  0800 CST m= 0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00  0800 CST}
--- PASS: Test_Enqueue (0.00s)
PASS
ok   asynq_test 0.009s

监控与管理

命令行工具asynq

https://github.com/hibiken/asynq/tree/master/tools/asynq

代码语言:javascript复制
$ asynq -p Yourpassword stats
Task Count by State
active      pending   scheduled  retry  archived
----------  --------  ---------  -----  ----
0           0         0          0      0

Task Count by Queue
default
-------
0

Daily Stats 2021-06-11 UTC
processed  failed  error rate
---------  ------  ----------
4          0       0.00%

Redis Info
version  uptime  connections  memory usage  peak memory usage
-------  ------  -----------  ------------  -----------------
6.2.0    0 days  5            16.04MB       16.14MB

Web UI

https://github.com/hibiken/asynqmon 启动:

代码语言:javascript复制
./asynqmon --port=3000 --redis-addr=localhost:6380

0 人点赞