1. RabbitMQ优先级队列介绍:
RabbitMQ3.5.0之后官方版本已经实现了优先级队列。数值越大则优先级越高。
创建优先级队列,需要增加x-max-priority参数,指定一个优先级的数值大小,这里最好是0~10之间,用来表示这个queue的最大优先级。(备注:因为生产者和消费者都需要对queue进行声明,所以它们都需要设置这个参数)
生产者在发送消息的时候,需要设置priority属性,最好不要超过上面指定的最大的优先级,一旦超过了这个优先级,发送设置的优先级就不再生效了。在这个范围内的优先级,数字越大,优先级越高。
优先级队列处理的场景,是针对的生产者生产快,消费者消费慢,反之没有意义,毕竟只有queue中有消息堆积的时候,才会需要根据优先级策略进行调度。
创建了rabbitmq的优先级队列之后,界面查看的变化:
2. RabbitMQ优先级队列实现例子
下面是优先级队列实现的代码,生产者一次性创建6个消息,其中奇数优先级为2,偶数优先级为1,并阻塞到RabbitMQ上面。
消费者启动之后,会先消费优先级为2的那些消息,然后才消费优先级是1的那些消息,对于同等优先级的消息,则是按照先进先出的顺序进行消费。
1)生产者代码:
代码语言:javascript复制package main
import (
"fmt"
"log"
"os"
"strings"
"strconv"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hoge"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
// step1: 作为生产者与amqp server 建立一个连接,rabbitmq的提供的默认端口是5672
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// step2: 在这个连接上面创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// step3: 创建一个名字叫做hello的queue
var args amqp.Table
args = amqp.Table{"x-max-priority":int32(10)} // 设置优先级列表的最初优先级
q, err := ch.QueueDeclare(
"priqueue", //name
true, //durable
false, //delete when unused
false, //exclusive
false, //no wait
args, //arguments
)
failOnError(err, "Failed to declare q queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
// step4: 向rabbitmq server发送"Hello"消息
for i:=0;i<6;i {
body := bodyFrom(os.Args)
body = strconv.Itoa(i)
pri := i%2 1
err = ch.Publish(
"", //exchange
q.Name, // routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
Priority: uint8(pri),
Body : []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
return
}
执行结果:
代码语言:javascript复制$ ./send_priority world
2020/05/29 16:21:07 [x] Sent world0
2020/05/29 16:21:07 [x] Sent world1
2020/05/29 16:21:07 [x] Sent world2
2020/05/29 16:21:07 [x] Sent world3
2020/05/29 16:21:07 [x] Sent world4
2020/05/29 16:21:07 [x] Sent world5
2)消费者代码:
代码语言:javascript复制package main
import (
"fmt"
"log"
"time"
"bytes"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "Failed to connect to server")
defer conn.Close();
ch, err := conn.Channel()
failOnError(err, "Failed to connect to channel")
defer ch.Close()
var args amqp.Table
args = amqp.Table{"x-max-priority":int32(10)} // 设置优先级列表的最初优先级
q, err := ch.QueueDeclare(
"priqueue", //name
true, //durable
false, //delete when usused
false, // exclusive
false, //no-wait
args, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs{
log.Printf("Received a message : %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t* time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages, To exit press CTRL C")
<-forever
return
}
执行结果:
代码语言:javascript复制$ ./receive2
2020/05/29 16:21:12 [*] Waiting for messages, To exit press CTRL C
2020/05/29 16:21:12 Received a message : world1
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world3
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world5
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world0
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world2
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world4
2020/05/29 16:21:12 Done
参考文档:
https://www.jianshu.com/p/2a439db39687
https://www.rabbitmq.com/extensions.html