Go之RabbitMQ(三)优先级队列

2020-06-03 09:23:19 浏览数 (1)

1. RabbitMQ优先级队列介绍:

RabbitMQ3.5.0之后官方版本已经实现了优先级队列。数值越大则优先级越高。

创建优先级队列,需要增加x-max-priority参数,指定一个优先级的数值大小,这里最好是0~10之间,用来表示这个queue的最大优先级。(备注:因为生产者和消费者都需要对queue进行声明,所以它们都需要设置这个参数)

生产者在发送消息的时候,需要设置priority属性,最好不要超过上面指定的最大的优先级,一旦超过了这个优先级,发送设置的优先级就不再生效了。在这个范围内的优先级,数字越大,优先级越高。

优先级队列处理的场景,是针对的生产者生产快,消费者消费慢,反之没有意义,毕竟只有queue中有消息堆积的时候,才会需要根据优先级策略进行调度。

创建了rabbitmq的优先级队列之后,界面查看的变化:

2. RabbitMQ优先级队列实现例子

下面是优先级队列实现的代码,生产者一次性创建6个消息,其中奇数优先级为2,偶数优先级为1,并阻塞到RabbitMQ上面。

消费者启动之后,会先消费优先级为2的那些消息,然后才消费优先级是1的那些消息,对于同等优先级的消息,则是按照先进先出的顺序进行消费。

1)生产者代码:

代码语言:javascript复制
package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "strconv"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string){
    if err != nil {
        log.Fatalf("%s:%s", msg, err)
        panic(fmt.Sprintf("%s:%s", msg, err))
    }
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hoge"
    } else {
        s = strings.Join(args[1:], " ")
    }

    return s
}

func main() {
    // step1: 作为生产者与amqp server 建立一个连接,rabbitmq的提供的默认端口是5672
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // step2: 在这个连接上面创建channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // step3: 创建一个名字叫做hello的queue
    var args amqp.Table
    args = amqp.Table{"x-max-priority":int32(10)} // 设置优先级列表的最初优先级
    q, err := ch.QueueDeclare(
    "priqueue", //name
    true,  //durable
    false,  //delete when unused
    false,  //exclusive
    false,  //no wait
    args,    //arguments
    )
    failOnError(err, "Failed to declare q queue")

    err = ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
    )
    failOnError(err, "Failed to set QoS")
    // step4: 向rabbitmq server发送"Hello"消息
    for i:=0;i<6;i   {
        body := bodyFrom(os.Args)
        body  = strconv.Itoa(i)
        pri := i%2   1
        err = ch.Publish(
            "",     //exchange
            q.Name,     // routing key
            false,  //mandatory
            false, //immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Priority: uint8(pri),
                Body :      []byte(body),
            })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s", body)
    }

    return
}

执行结果:

代码语言:javascript复制
$ ./send_priority world
2020/05/29 16:21:07  [x] Sent world0
2020/05/29 16:21:07  [x] Sent world1
2020/05/29 16:21:07  [x] Sent world2
2020/05/29 16:21:07  [x] Sent world3
2020/05/29 16:21:07  [x] Sent world4
2020/05/29 16:21:07  [x] Sent world5

2)消费者代码:

代码语言:javascript复制
package main

import (
    "fmt"
    "log"
    "time"
    "bytes"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string){
    if err != nil {
        log.Fatalf("%s:%s", msg, err)
        panic(fmt.Sprintf("%s:%s", msg, err))
    }
}

func main() {

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
    failOnError(err, "Failed to connect to server")
    defer conn.Close();

    ch, err := conn.Channel()
    failOnError(err, "Failed to connect to channel")
    defer ch.Close()
    var args amqp.Table
    args = amqp.Table{"x-max-priority":int32(10)} // 设置优先级列表的最初优先级
    q, err := ch.QueueDeclare(
        "priqueue",    //name
        true,      //durable
        false,      //delete when usused
        false,      // exclusive
        false,      //no-wait
        args,        // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name,     // queue
        "",         // consumer
        false,       // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // arguments
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)
    go func(){
        for d:= range msgs{
            log.Printf("Received a message : %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t* time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages, To exit press CTRL C")
    <-forever
    return
}

执行结果:

代码语言:javascript复制
$ ./receive2
2020/05/29 16:21:12  [*] Waiting for messages, To exit press CTRL C
2020/05/29 16:21:12 Received a message : world1
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world3
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world5
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world0
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world2
2020/05/29 16:21:12 Done
2020/05/29 16:21:12 Received a message : world4
2020/05/29 16:21:12 Done

参考文档:

https://www.jianshu.com/p/2a439db39687

https://www.rabbitmq.com/extensions.html

0 人点赞