RabbitMQ 是一个开源的消息代理和队列服务器,它允许应用程序通过共享服务或消息队列进行异步通信。在这篇文章中,我们将探讨如何在 Go 应用程序中使用 RabbitMQ 来实现分布式事务,着重讲解如何进行连接配置。
1. 安装 RabbitMQ 客户端
Go 的 RabbitMQ 客户端库是 amqp
,你可以使用 go get
命令来安装:
go get github.com/streadway/amqp
2. 连接到 RabbitMQ 服务器
要连接到 RabbitMQ 服务器,我们需要创建一个 amqp.Connection
对象。在创建这个对象时,需要提供一个连接字符串(URL),它包含了 RabbitMQ 服务器的地址、端口、用户名和密码。
下面是一个创建连接的示例:
代码语言:javascript复制conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
在这个示例中,我们使用的是默认的用户名(guest
)和密码(guest
),并假设 RabbitMQ 服务器运行在本机的默认端口(5672
)上。在实际应用中,需要根据实际环境进行修改。
3. 创建一个 Channel
在 RabbitMQ 中,所有的操作都是在 Channel(信道)中进行的。因此,在发送或接收消息前,我们需要先创建一个 Channel:
代码语言:javascript复制ch, err := conn.Channel()
if err != nil {
log.Fatalf("failed to open a channel: %v", err)
}
defer ch.Close()
4. 声明一个 Queue
在 RabbitMQ 中,消息是存储在 Queue(队列)中的。因此,我们需要声明一个 Queue,以便生产者可以发送消息到这个 Queue,消费者可以从这个 Queue 中接收消息:
代码语言:javascript复制q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("failed to declare a queue: %v", err)
}
5. 发送和接收消息
下面是一个发送消息的示例:
代码语言:javascript复制body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("failed to publish a message: %v", err)
}
下面是一个接收消息的示例:
代码语言:javascript复制msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("failed to register a consumer: %v", err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL C")
<-forever
完整代码
下面是使用Go和RabbitMQ实现分布式事务的示例代码:
- 服务器端(Producer)
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
2. 客户端(Consumer)
代码语言:javascript复制package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name,
"",
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
time.Sleep(2 * time.Second) //模拟耗时操作
d.Ack(false)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL C")
<-forever
}
RabbitMQ 的 Quality of Service(服务质量)
ch.Qos
方法是用来设置 RabbitMQ 的 Quality of Service(服务质量)参数的。该函数的原型如下:
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
prefetchCount
:这是一个消息预取数设置。当你设置为1时,意味着在一个消费者处理完一个消息并且对该消息进行了确认前,不会分派新的消息给消费者。也就是说,消费者在同一时间只会处理一条消息。这样可以实现更公平的消息分发,防止某些消费者一直忙于处理消息,而其他消费者则什么也没做。prefetchSize
:这是预取大小设置,单位为字节。如果设置为非零值,服务器将会试图保证在为消费者分派新消息之前,至少会有这么多字节的消息已经在消费者的网络缓冲区中。然而,这个设置在 RabbitMQ 的当前实现中并没有实际效果,因为它并没有实现对这个参数的支持。所以,通常我们将它设置为0。global
:这是一个标志位,用来指明上述设置是只对当前的 Channel 有效(如果设置为false
),还是对整个 Connection 有效(如果设置为true
)。
例如,以下代码设置了预取计数为1,这样在同一时间,每个消费者最多只会处理一条消息:
代码语言:javascript复制err := ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
log.Fatal(err)
}
总结
在这篇文章中,我们了解了如何在 Go 程序中使用 RabbitMQ 来实现分布式事务,包括如何安装 RabbitMQ 客户端库、如何连接到 RabbitMQ 服务器、如何创建 Channel 和 Queue,以及如何发送和接收消息。我们还详细讲解了如何进行连接配置。希望这篇文章对你有所帮助!