前言
在我们编写后端接口时,通常有些接口对于实时性的要求并不是那么高,但其中有些函数却相当占用接口调用时间,如调用第三方接口、发送短信、发送邮件等等。为了提升用户的体验感、系统的稳定性,此时我们就可以使用消息队列对于接口进行优化,对于实时性要求不高的接口使用消息队列来进行处理,提高api响应速度,优化用户体验。本文将以go语言使用rabbitMQ来演示如何对于一个接口进行优化。
RabbitMQ安装
此处给出docker-compose文件方便各位安装rabbtMQ环境,当然也可以选择自行安装
代码语言:yaml复制services:
rabbitMQ:
container_name: learn-rabbitMQ
image: rabbitmq:3.13-management
ports:
- "15672:15672"
- "5672:5672"
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=123456
restart: always
定义好docker-compose.yaml文件填入以上代码,然后使用docker-compose命令快速启动 rabbitMQ
代码语言:shell复制docker-compose up -d
Go环境安装
使用到的Go依赖包
Gin: 本文将使用Gin来进行api注册官方教程
RabbitMQ: Rabbit官方提供的Go依赖包官方教程
代码语言:go复制go get github.com/rabbitmq/amqp091-go
go get -u github.com/gin-gonic/gin
前置代码
为方便演示,我们先使用Gin实现一个速度较慢的接口
代码语言:go复制package main
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
)
func SendMessage(c *gin.Context) {
message := c.Param("message")
// 假设此为必要操作需要700ms
time.Sleep(700 * time.Millisecond)
fmt.Println("hello,", message)
c.JSON(200, "success")
}
func main() {
server := gin.Default()
server.POST("/send/:message", SendMessage)
server.Run(":8080")
}
通过以上代码我们实现了一个至少耗时700ms,且返回参数固定的一个接口。在实际情况中可能由于某些各种各样的原因导致接口较慢,此处为方便演示直接使用了sleep函数。
让我们使用Postman调用一下接口
耗时700ms ,确实很慢
RabbitMQ函数
接下来我们先来写出使用rabbitMQ进行收发消息的函数
RabbitMQ连接函数
代码语言:go复制package main
import (
"fmt"
"strings"
amqp "github.com/rabbitmq/amqp091-go"
)
var RabbitMQ *amqp.Connection
// 根据自己的RabbitMQ环境进行填写
var (
RabbitMQConnection = "amqp"
RabbitMQUser = "admin"
RabbitMQPassword = "123456"
RabbitMQHost = "127.0.0.1"
RabbitMQPort = "5672"
)
// 需要发送的队列名称
var RabbitMQSendMessageQueue = "test-send-message-queue"
func InitRabbitMQ() {
connString := strings.Join([]string{RabbitMQConnection, "://", RabbitMQUser, ":", RabbitMQPassword, "@", RabbitMQHost, ":", RabbitMQPort, "/"}, "")
conn, err := amqp.Dial(connString)
if err != nil {
panic(fmt.Sprintf("Failed to connect to RabbitMQ: %s", err))
}
RabbitMQ = conn
}
RabbitMQ发送消息函数
代码语言:go复制func SendMessageToMQ(ctx context.Context, queueName string, body []byte) (err error) {
ch, err := RabbitMQ.Channel()
if err != nil {
return
}
// 设置为消息持久化,方便重启时消息不丢失
q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
err = ch.PublishWithContext(ctx, "", q.Name, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
})
if err != nil {
return
}
return
}
RabbitMQ接收消息函数
代码语言:go复制func ConsumerMessage(ctx context.Context, queueName string) (msgs <-chan amqp.Delivery, err error) {
ch, err := RabbitMQ.Channel()
if err != nil {
return nil, err
}
q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
// RabbitMQ负载均衡
err = ch.Qos(1, 0, false)
if err != nil {
return nil, err
}
return ch.Consume(q.Name, "", false, false, false, false, nil)
}
具体优化实现
首先我们先将业务逻辑过慢的代码抽离,并通过发送消息队列的方法,发送给指定函数进行具体业务异步处理。
代码语言:go复制type SendMessageReq struct {
Message string `json:"message"`
}
func SendMessage(c *gin.Context) {
message := c.Param("message")
err := sendMessageToMQ(message)
if err != nil {
c.JSON(200, "error")
return
}
c.JSON(200, "success")
}
// 发送消息给对应的消息队列
func sendMessageToMQ(message string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
sendMessageReq := SendMessageReq{
Message: message,
}
body, err := json.Marshal(sendMessageReq)
if err != nil {
return err
}
err = SendMessageToMQ(ctx, RabbitMQSendMessageQueue, body)
if err != nil {
return err
}
return nil
}
定义一个新函数从消息队列中读取消息并进行具体业务处理
代码语言:go复制func SendConfirmEmailSync(ctx context.Context) {
err := RunSendConfirmEmail(ctx)
if err != nil {
log.Println(err)
}
}
func RunSendConfirmEmail(ctx context.Context) error {
// 获取消息队列,并开始监听消费
msgs, err := ConsumerMessage(ctx, RabbitMQSendMessageQueue)
if err != nil {
return err
}
var forever chan struct{}
go func() {
for msg := range msgs {
sendMessageReq := SendMessageReq{}
err = json.Unmarshal(msg.Body, &sendMessageReq)
// 假设此为具体业务逻辑
time.Sleep(700 * time.Millisecond)
fmt.Println("hello,", sendMessageReq.Message)
msg.Ack(false)
}
}()
<-forever
return nil
}
初始化
最后,让我们将具体函数使用Goroutine运行起来
代码语言:go复制func loadScript() {
ctx := context.Background()
go SendConfirmEmailSync(ctx)
}
func main() {
InitRabbitMQ()
loadScript()
server := gin.Default()
server.POST("/send/:message", SendMessage)
server.Run(":8080")
}
调用接口
让我们使用postman再次调用一下我们优化完成的接口
我们可以发现,现在调用接口仅需2ms!!!
我们成功了!!!
结尾&完整代码示例
虽然使用消息队列可以大幅度优化接口响应时间,但是我们还是需要根据具体业务需求、逻辑进行相对应的优化,以免变成了负面优化,写出了屎山代码。
如果各位想尝试一下接口优化,可以试试优化我的邮箱API接口,如果想知道我是如何进行邮件接口优化的,可以来学习或者参与进我的开源项目。
愿这篇文章能帮助到你!!!
完整代码
代码语言:go复制// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/gin-gonic/gin"
)
type SendMessageReq struct {
Message string `json:"message"`
}
func SendMessage(c *gin.Context) {
message := c.Param("message")
err := sendMessageToMQ(message)
if err != nil {
c.JSON(200, "error")
return
}
c.JSON(200, "success")
}
func sendMessageToMQ(message string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
sendMessageReq := SendMessageReq{
Message: message,
}
body, err := json.Marshal(sendMessageReq)
if err != nil {
return err
}
err = SendMessageToMQ(ctx, RabbitMQSendMessageQueue, body)
if err != nil {
return err
}
return nil
}
func SendConfirmEmailSync(ctx context.Context) {
err := RunSendConfirmEmail(ctx)
if err != nil {
log.Println(err)
}
}
func RunSendConfirmEmail(ctx context.Context) error {
msgs, err := ConsumerMessage(ctx, RabbitMQSendMessageQueue)
if err != nil {
return err
}
var forever chan struct{}
go func() {
for msg := range msgs {
sendMessageReq := SendMessageReq{}
err = json.Unmarshal(msg.Body, &sendMessageReq)
time.Sleep(700 * time.Millisecond)
fmt.Println("hello,", sendMessageReq.Message)
msg.Ack(false)
}
}()
<-forever
return nil
}
func loadScript() {
ctx := context.Background()
go SendConfirmEmailSync(ctx)
}
func main() {
InitRabbitMQ()
loadScript()
server := gin.Default()
server.POST("/send/:message", SendMessage)
server.Run(":8080")
}
代码语言:go复制// rabbtmq.go
package main
import (
"context"
"fmt"
"strings"
amqp "github.com/rabbitmq/amqp091-go"
)
var RabbitMQ *amqp.Connection
// 根据自己的RabbitMQ环境进行填写
var (
RabbitMQConnection = "amqp"
RabbitMQUser = "admin"
RabbitMQPassword = "123456"
RabbitMQHost = "127.0.0.1"
RabbitMQPort = "5672"
)
var RabbitMQSendMessageQueue = "test-send-message-queue"
func InitRabbitMQ() {
connString := strings.Join([]string{RabbitMQConnection, "://", RabbitMQUser, ":", RabbitMQPassword, "@", RabbitMQHost, ":", RabbitMQPort, "/"}, "")
conn, err := amqp.Dial(connString)
if err != nil {
panic(fmt.Sprintf("Failed to connect to RabbitMQ: %s", err))
}
RabbitMQ = conn
}
func SendMessageToMQ(ctx context.Context, queueName string, body []byte) (err error) {
ch, err := RabbitMQ.Channel()
if err != nil {
return
}
q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
err = ch.PublishWithContext(ctx, "", q.Name, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
})
if err != nil {
return
}
return
}
func ConsumerMessage(ctx context.Context, queueName string) (msgs <-chan amqp.Delivery, err error) {
ch, err := RabbitMQ.Channel()
if err != nil {
return nil, err
}
q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
// RabbitMQ负载均衡
err = ch.Qos(1, 0, false)
if err != nil {
return nil, err
}
return ch.Consume(q.Name, "", false, false, false, false, nil)
}