Python之Rabbitmq处理消息

2022-07-04 16:54:47 浏览数 (1)

1

概念说明


Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。是Rabbitmq的内部对象,用于存储消息 Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来。 Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。 Vhost:虚拟主机,一个Broker里可以开设多个Vhost,用作不同用户的权限分离。 Producer:消息生产者,就是投递消息的程序。 Consumer:消息消费者,就是接受消息的程序。 Channel:消息通道,在客户端的每个连接里,可建立多个Channel,每个Channel代表一个会话任务。

2

看看Rabbitmq里面的消息长什么样子


如下截图所示:

Mesages=2 表示展示出两条数据。

3

Rabbitmq处理消息简单模式


大致五个步骤: step1:获取Rabbitmq服务的连接 step2:创建一个信道 step3:声明一个队列(与发消息程序的声明保持一致) step4:定义一个回调函数,用于接收和处理队列中的消息 step5:队列与回归函数绑定 step6:开始消费消息

代码语言:javascript复制
import pika

#接收消息,并写入文件,这也算是持久化了
def write_file(message):
    with open("msg.txt","a ") as f:
        f.write(message)


def consumer():#消息消费者
    # 获取与rabbitmq 服务的连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672,credentials=pika.PlainCredentials('guest', 'guest')))
    # 创建一个 AMQP 信道(Channel)
    channel = connection.channel()
    # 声明消息队列tester,durable=False 表示不持久化
    channel.queue_declare(queue='tester', durable=False)
    # 定义一个回调函数来处理消息队列中的消息,这里是将消息写入文件,你也可以入库。
    def callback(ch, method, properties, body):
        ch.basic_ack(delivery_tag=method.delivery_tag) # 告诉生成者,消息处理完成
        write_file(body.decode())
    #告诉rabbitmq在tester列表里面收消息,收到就调用callback函数
    channel.basic_consume('tester', callback)
    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
    channel.start_consuming()

if __name__=="__main__":
    consumer()

Tips: callback回调函数将消息直接写入文件

如下图所示:

4

查看Rabbitmq界面消息是否处理完成


如下截图所示:

友情提示:“无量测试之道”原创著作,欢迎关注交流,禁止第三方不显示文章来源时转载。

0 人点赞