go使用消息队列优化接口

2024-07-22 20:33:31 浏览数 (3)

前言

在我们编写后端接口时,通常有些接口对于实时性的要求并不是那么高,但其中有些函数却相当占用接口调用时间,如调用第三方接口、发送短信、发送邮件等等。为了提升用户的体验感、系统的稳定性,此时我们就可以使用消息队列对于接口进行优化,对于实时性要求不高的接口使用消息队列来进行处理,提高api响应速度,优化用户体验。本文将以go语言使用rabbitMQ来演示如何对于一个接口进行优化。

RabbitMQ安装

此处给出docker-compose文件方便各位安装rabbtMQ环境,当然也可以选择自行安装

代码语言:yaml复制
services:
  rabbitMQ:
    container_name: learn-rabbitMQ
    image: rabbitmq:3.13-management
    ports:
      - "15672:15672"
      - "5672:5672"
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    restart: always

定义好docker-compose.yaml文件填入以上代码,然后使用docker-compose命令快速启动 rabbitMQ

代码语言:shell复制
docker-compose up -d

Go环境安装

使用到的Go依赖包

Gin: 本文将使用Gin来进行api注册官方教程

RabbitMQ: Rabbit官方提供的Go依赖包官方教程

代码语言:go复制
go get github.com/rabbitmq/amqp091-go
go get -u github.com/gin-gonic/gin

前置代码

为方便演示,我们先使用Gin实现一个速度较慢的接口

代码语言:go复制
package main

import (
	"fmt"
	"time"

	"github.com/gin-gonic/gin"
)

func SendMessage(c *gin.Context) {
	message := c.Param("message")
    // 假设此为必要操作需要700ms
	time.Sleep(700 * time.Millisecond)

	fmt.Println("hello,", message)
	c.JSON(200, "success")
}

func main() {
	server := gin.Default()

	server.POST("/send/:message", SendMessage)

	server.Run(":8080")
}

通过以上代码我们实现了一个至少耗时700ms,且返回参数固定的一个接口。在实际情况中可能由于某些各种各样的原因导致接口较慢,此处为方便演示直接使用了sleep函数。

让我们使用Postman调用一下接口

耗时700ms ,确实很慢

RabbitMQ函数

接下来我们先来写出使用rabbitMQ进行收发消息的函数

RabbitMQ连接函数

代码语言:go复制
package main

import (
	"fmt"
	"strings"

	amqp "github.com/rabbitmq/amqp091-go"
)

var RabbitMQ *amqp.Connection

// 根据自己的RabbitMQ环境进行填写
var (
	RabbitMQConnection = "amqp"
	RabbitMQUser       = "admin"
	RabbitMQPassword   = "123456"
	RabbitMQHost       = "127.0.0.1"
	RabbitMQPort       = "5672"
)
// 需要发送的队列名称
var RabbitMQSendMessageQueue = "test-send-message-queue"

func InitRabbitMQ() {
	connString := strings.Join([]string{RabbitMQConnection, "://", RabbitMQUser, ":", RabbitMQPassword, "@", RabbitMQHost, ":", RabbitMQPort, "/"}, "")
	conn, err := amqp.Dial(connString)
	if err != nil {
		panic(fmt.Sprintf("Failed to connect to RabbitMQ: %s", err))
	}
	RabbitMQ = conn
}

RabbitMQ发送消息函数

代码语言:go复制
func SendMessageToMQ(ctx context.Context, queueName string, body []byte) (err error) {
	ch, err := RabbitMQ.Channel()
	if err != nil {
		return
	}
    // 设置为消息持久化,方便重启时消息不丢失
	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
	err = ch.PublishWithContext(ctx, "", q.Name, false, false, amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType:  "application/json",
		Body:         body,
	})
	if err != nil {
		return
	}
	return
}

RabbitMQ接收消息函数

代码语言:go复制
func ConsumerMessage(ctx context.Context, queueName string) (msgs <-chan amqp.Delivery, err error) {
	ch, err := RabbitMQ.Channel()
	if err != nil {
		return nil, err
	}
	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
	// RabbitMQ负载均衡
	err = ch.Qos(1, 0, false)
	if err != nil {
		return nil, err
	}
	return ch.Consume(q.Name, "", false, false, false, false, nil)
}

具体优化实现

首先我们先将业务逻辑过慢的代码抽离,并通过发送消息队列的方法,发送给指定函数进行具体业务异步处理。

代码语言:go复制
type SendMessageReq struct {
	Message string `json:"message"`
}

func SendMessage(c *gin.Context) {
	message := c.Param("message")

	err := sendMessageToMQ(message)
	if err != nil {
		c.JSON(200, "error")
		return
	}

	c.JSON(200, "success")
}
// 发送消息给对应的消息队列
func sendMessageToMQ(message string) error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
	defer cancel()
	sendMessageReq := SendMessageReq{
		Message: message,
	}

	body, err := json.Marshal(sendMessageReq)
	if err != nil {
		return err
	}
	err = SendMessageToMQ(ctx, RabbitMQSendMessageQueue, body)
	if err != nil {
		return err
	}
	return nil
}

定义一个新函数从消息队列中读取消息并进行具体业务处理

代码语言:go复制
func SendConfirmEmailSync(ctx context.Context) {
	err := RunSendConfirmEmail(ctx)
	if err != nil {
		log.Println(err)
	}
}

func RunSendConfirmEmail(ctx context.Context) error {
    // 获取消息队列,并开始监听消费
	msgs, err := ConsumerMessage(ctx, RabbitMQSendMessageQueue)
	if err != nil {
		return err
	}
	var forever chan struct{}

	go func() {
		for msg := range msgs {
			sendMessageReq := SendMessageReq{}
			err = json.Unmarshal(msg.Body, &sendMessageReq)

            // 假设此为具体业务逻辑
			time.Sleep(700 * time.Millisecond)
			fmt.Println("hello,", sendMessageReq.Message)

			msg.Ack(false)
		}
	}()

	<-forever
	return nil
}

初始化

最后,让我们将具体函数使用Goroutine运行起来

代码语言:go复制
func loadScript() {
	ctx := context.Background()
	go SendConfirmEmailSync(ctx)
}

func main() {
	InitRabbitMQ()
	loadScript()

	server := gin.Default()

	server.POST("/send/:message", SendMessage)

	server.Run(":8080")
}

调用接口

让我们使用postman再次调用一下我们优化完成的接口

我们可以发现,现在调用接口仅需2ms!!!

我们成功了!!!

结尾&完整代码示例

虽然使用消息队列可以大幅度优化接口响应时间,但是我们还是需要根据具体业务需求、逻辑进行相对应的优化,以免变成了负面优化,写出了屎山代码。

如果各位想尝试一下接口优化,可以试试优化我的邮箱API接口,如果想知道我是如何进行邮件接口优化的,可以来学习或者参与进我的开源项目。

愿这篇文章能帮助到你!!!

完整代码

代码语言:go复制
// main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/gin-gonic/gin"
)

type SendMessageReq struct {
	Message string `json:"message"`
}

func SendMessage(c *gin.Context) {
	message := c.Param("message")

	err := sendMessageToMQ(message)
	if err != nil {
		c.JSON(200, "error")
		return
	}

	c.JSON(200, "success")
}

func sendMessageToMQ(message string) error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
	defer cancel()
	sendMessageReq := SendMessageReq{
		Message: message,
	}

	body, err := json.Marshal(sendMessageReq)
	if err != nil {
		return err
	}
	err = SendMessageToMQ(ctx, RabbitMQSendMessageQueue, body)
	if err != nil {
		return err
	}
	return nil
}

func SendConfirmEmailSync(ctx context.Context) {
	err := RunSendConfirmEmail(ctx)
	if err != nil {
		log.Println(err)
	}
}

func RunSendConfirmEmail(ctx context.Context) error {
	msgs, err := ConsumerMessage(ctx, RabbitMQSendMessageQueue)
	if err != nil {
		return err
	}
	var forever chan struct{}

	go func() {
		for msg := range msgs {
			sendMessageReq := SendMessageReq{}
			err = json.Unmarshal(msg.Body, &sendMessageReq)

			time.Sleep(700 * time.Millisecond)
			fmt.Println("hello,", sendMessageReq.Message)

			msg.Ack(false)
		}
	}()

	<-forever
	return nil
}

func loadScript() {
	ctx := context.Background()
	go SendConfirmEmailSync(ctx)
}

func main() {
	InitRabbitMQ()
	loadScript()

	server := gin.Default()

	server.POST("/send/:message", SendMessage)

	server.Run(":8080")
}
代码语言:go复制
// rabbtmq.go
package main

import (
	"context"
	"fmt"
	"strings"

	amqp "github.com/rabbitmq/amqp091-go"
)

var RabbitMQ *amqp.Connection

// 根据自己的RabbitMQ环境进行填写
var (
	RabbitMQConnection = "amqp"
	RabbitMQUser       = "admin"
	RabbitMQPassword   = "123456"
	RabbitMQHost       = "127.0.0.1"
	RabbitMQPort       = "5672"
)

var RabbitMQSendMessageQueue = "test-send-message-queue"

func InitRabbitMQ() {
	connString := strings.Join([]string{RabbitMQConnection, "://", RabbitMQUser, ":", RabbitMQPassword, "@", RabbitMQHost, ":", RabbitMQPort, "/"}, "")
	conn, err := amqp.Dial(connString)
	if err != nil {
		panic(fmt.Sprintf("Failed to connect to RabbitMQ: %s", err))
	}
	RabbitMQ = conn
}

func SendMessageToMQ(ctx context.Context, queueName string, body []byte) (err error) {
	ch, err := RabbitMQ.Channel()
	if err != nil {
		return
	}

	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
	err = ch.PublishWithContext(ctx, "", q.Name, false, false, amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType:  "application/json",
		Body:         body,
	})
	if err != nil {
		return
	}
	return
}

func ConsumerMessage(ctx context.Context, queueName string) (msgs <-chan amqp.Delivery, err error) {
	ch, err := RabbitMQ.Channel()
	if err != nil {
		return nil, err
	}
	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
	// RabbitMQ负载均衡
	err = ch.Qos(1, 0, false)
	if err != nil {
		return nil, err
	}
	return ch.Consume(q.Name, "", false, false, false, false, nil)
}

0 人点赞