1
概念说明
Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。是Rabbitmq的内部对象,用于存储消息 Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来。 Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。 Vhost:虚拟主机,一个Broker里可以开设多个Vhost,用作不同用户的权限分离。 Producer:消息生产者,就是投递消息的程序。 Consumer:消息消费者,就是接受消息的程序。 Channel:消息通道,在客户端的每个连接里,可建立多个Channel,每个Channel代表一个会话任务。
2
看看Rabbitmq里面的消息长什么样子
如下截图所示:
Mesages=2 表示展示出两条数据。
3
Rabbitmq处理消息简单模式
大致五个步骤: step1:获取Rabbitmq服务的连接 step2:创建一个信道 step3:声明一个队列(与发消息程序的声明保持一致) step4:定义一个回调函数,用于接收和处理队列中的消息 step5:队列与回归函数绑定 step6:开始消费消息
代码语言:javascript复制import pika
#接收消息,并写入文件,这也算是持久化了
def write_file(message):
with open("msg.txt","a ") as f:
f.write(message)
def consumer():#消息消费者
# 获取与rabbitmq 服务的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672,credentials=pika.PlainCredentials('guest', 'guest')))
# 创建一个 AMQP 信道(Channel)
channel = connection.channel()
# 声明消息队列tester,durable=False 表示不持久化
channel.queue_declare(queue='tester', durable=False)
# 定义一个回调函数来处理消息队列中的消息,这里是将消息写入文件,你也可以入库。
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag) # 告诉生成者,消息处理完成
write_file(body.decode())
#告诉rabbitmq在tester列表里面收消息,收到就调用callback函数
channel.basic_consume('tester', callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()
if __name__=="__main__":
consumer()
Tips: callback回调函数将消息直接写入文件
如下图所示:
4
查看Rabbitmq界面消息是否处理完成
如下截图所示:
友情提示:“无量测试之道”原创著作,欢迎关注交流,禁止第三方不显示文章来源时转载。