一 、说明
使用Python操作RabbitMQ的书籍以及例子,少之又少。翻遍了网上所有的例子,发现十个有9个半不能运行的,这半个你还得修改。 原因很简单,要么例子的Python版本太低了,要么例子的RabbitMQ的版本太低了。所以造成了一系列文字。 让我很痛苦,决定下笔写一篇关于这个的文章。
Python3.x RabbitMQ Docker Centos
二、安装RabbitMQ
为了此篇文章只突出Python RabbitMQ,就单独写了一篇文章给大家: Centos7.x Docker部署RabbitMQ
三、编写操作的代码
6种模式
这里我们使用pika来操作RabbitMQ pip install pika
(一)、简单的RabbitMQ消息队列(不安全,不能持久化)
发送端 send.py
代码语言:javascript复制import pika
#你的RabbitMQ的地址
host = "替换成自己的RabbitMQ服务器的IP"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
#指定队列的名字
queueName="hello"
#说明使用的队列,如果没有会自动创建
channel.queue_declare(queueName)
#发送的msg消息
msg = "Hello TrueDei"
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='', routing_key=queueName, body=msg)
print(" [x] Sent 'Hello TrueDei'")
connection.close()
发送结果:
可以再web上看到,也收到了
接收端 resv.py
代码语言:javascript复制import pika
#你的RabbitMQ的地址
host = "替换成自己的IP"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
#指定队列的名字
queueName="hello"
#说明使用的队列,如果没有会自动创建
channel.queue_declare(queueName)
#将ReceivedMessage添加到队列中,同时替换通道实现。
#返回的结果,会返回到这里面,如果有兴趣可以点开basic_consume方法去看看源代码
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
#从服务器队列消费。
# no_ack=True ,是需要是否确定消息的处理了,告诉服务端
# no_ack=False ,默认是False,可以不写
channel.basic_consume(queueName,callback,False)
print(' [*] Waiting for messages. To exit press CTRL C')
channel.start_consuming()
Python收消息: 注意:接收到处于死循环,一直在等待接收,发送一个数据,就收到一个数据
(二)、深入理解消息队列
1、当有1个生产者,n个消费者时
代码语言:javascript复制基于上面的代码不做任何修改
把上面的消费者开N个就是想要的结果。
如下:
运行3个消费者,生产者生成的消息队列依次被接收者接收
2、处理消息安全问题(缺持久化)
代码语言:javascript复制基于上面代码,如果消费者出问题了,消息发送将无人接收。
即便再次启动消费者,之前发生的消息将一直存在队列中
生产者 send_msg_safe.py
代码语言:javascript复制import pika
import time
#你的RabbitMQ的地址
host = "替换成自己的IP"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
message = "Hello World! %s" % time.time()
print("产生的消息:" ,message)
channel.basic_publish(exchange='',
routing_key='task_queue', #
body=message,
# properties=pika.BasicProperties(
# delivery_mode=2, # make message persistent
# )
)
print(" [x] Sent %r" % message)
connection.close()
消费者 recv_msg_safe.py
代码语言:javascript复制import pika, time
#你的RabbitMQ的地址
host = "替换成自己的IP"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(20)
print(" [x] Done")
print("method.delivery_tag",method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag) # 再进行手动确认
channel.basic_consume('task_queue',callback,False)
print(' [*] Waiting for messages. To exit press CTRL C')
channel.start_consuming()
存在问题:
代码语言:javascript复制在于消费者:消费者处理好的消息,需要给服务端回信息
# no_ack=True ,是需要是否确定消息的处理了,告诉服务端
# no_ack=False ,默认是False,可以不写
# callback 函数后面需要添加 ch.basic_ack(delivery_tag=method.delivery_tag) # 再进行手动确认
3、处理消息安全且持久化
代码语言:javascript复制基于上面的代码,如果重启了rabbitmq,则存在的消息就消失。需要做到消息持久化。(消息安全且持久化)
生产者 send_msg.py
代码语言:javascript复制import pika
#你的RabbitMQ的地址
host = "替换成自己的IP"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
# durable=True:在代理重新启动后仍然存在
channel.queue_declare(queue='hello10',durable=True)
channel.basic_publish(exchange='',
routing_key='hello10',
body='Hello World!',
properties = pika.BasicProperties( #消息持久化
delivery_mode = 2,)
)
print(" [x] Sent 'Hello World!'")
# 关闭队列
connection.close()
消费者 recv_msg.py
代码语言:javascript复制import pika,time
#你的RabbitMQ的地址
host = "替换成自己的IP"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
channel.queue_declare(queue='hello10',durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(10)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume('hello10',callback,False)
print(' [*] Waiting for messages. To exit press CTRL C')
channel.start_consuming()
存在问题:
代码语言:javascript复制问题再于生产者的消息需要被持久化
durable=True:功能是,告诉服务,重启后消息依然存在
channel.queue_declare(queue='hello10',durable=True)
properties = pika.BasicProperties(
delivery_mode = 2,)
)
(三)、消息的发布、订阅以及广播模式
代码语言:javascript复制之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,
但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
三种最常用的交换机
代码语言:javascript复制direct:“直接连接交换机”
topic:“主题路由匹配交换机”
fanout:“无路由交换机”
1、fanout交换类型
代码语言:javascript复制fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
代码语言:javascript复制上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,
并最终被两个消费者(C1与C2)消费。
2、direct交换类型
代码语言:javascript复制direct类型的Exchange路由规则也很简单,
它会把消息路由到那些binding key与routing key完全匹配的Queue中。
代码语言:javascript复制以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。
3、topic交换类型
代码语言:javascript复制前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,
但这种严格的匹配方式在很多情况下不能满足实际业务需求。
topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,
也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
- routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key与routing key一样也是句点号“. ”分隔的字符串
- binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。
1、广播模式(fanout,直接连接交换机),发送一个消息,无论有多少接收端,只要在,就能收到,不在就不能收到
生产者 send.py
代码语言:javascript复制import pika
#你的RabbitMQ的地址
host = "你的RabbitMQ的地址"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
#设置交换器的名字和要使用的交换类型
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#设置消息
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='', #不指定,留空
body=message)
print(" [x] Sent %r" % message)
connection.close()
消费者 recv.py
代码语言:javascript复制import pika
#你的RabbitMQ的地址
host = "你的RabbitMQ的地址"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
#设置交换器的名字和要使用的交换类型
channel.exchange_declare(exchange='logs',exchange_type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(queue='',exclusive=True)
#获取生成的队列名字
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue_name,callback, True)
channel.start_consuming()
2、组播模式(direct,直接连接交换机)。
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。 send端根据关键字指定发送内容 可发送info,warning,error
recv端根据关键字指定接收内容 可接收info,warning,error
生产者 send.py
代码语言:javascript复制import pika
import sys #sys模块再运行的时候,可以接受用户输入的值
#你的RabbitMQ的地址
host = "你的RabbitMQ的地址"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#如果运行(python test.py hello) 的话,sys.argv[1]就可以拿到这个hello这个词
severity = sys.argv[1] if len(sys.argv) > 1 else 'info' #如果运行的时候,后面跟了数据,就替换info这个值,否则默认就是info
# severity = 'info' #如果运行的时候,后面跟了数据,就替换info这个值,否则默认就是info
#消息体
message = ' '.join(sys.argv[2:]) or 'Hello World!' #拿到后面跟的第二个值,默认是Hello World!
#绑定交换的类型,绑定队列,绑定发送的消息
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
消费者 recv.py
代码语言:javascript复制import pika
import sys
#你的RabbitMQ的地址
host = "你的RabbitMQ的地址"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind( queue_name, 'direct_logs', routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume( queue_name, callback, True)
channel.start_consuming()
结果:建议放大看
说明: 在上图中可以看到运行的命令:
代码语言:javascript复制send.py
执行示例,需说明发送的内容级别
根据Python代码可以给出公式:python send.py 模式 发送的消息
python send.py info TestInfo1
python send.py warning TestWarning1
python send.py error TestError1
recv.py
执行示例,需说明接收代码内容
执行示例,需说明发送的内容级别
根据Python代码可以给出公式:python recv.py 模式
python recv.py info
python recv.py warning
python recv.py error
3、组播模式之基于组播模式(topic,主题路由匹配交换机),实现更细致的划分不同种类的信息。
形象点可以如下图:
生产者 topic_send.py
代码语言:javascript复制import pika
import sys
#你的RabbitMQ的地址
host = "你的RabbitMQ的地址"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
#指定使用的交换类型和交换器
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
#如果有输入,就拿到输入的第一个数据为队列,否则默认为:anonymous.info(匿名的,当然了,可以随便修改哦)
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
#如果输入的数据,存在第二个,那么就把第二个当作消息,发送出去。否则默认消息就是:Hello World!
message = ' '.join(sys.argv[2:]) or 'Hello World!'
#绑定交换的类型,绑定队列,绑定发送的消息
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
#提示
print(" [x] Sent %r:%r" % (routing_key, message))
#关闭
connection.close()
# python topic_send.py python.error test 发送了一条python的错误信息,错误内容为test
# python topic_send.py mysql.info hello 发送了一条mysql的信息,信息内容为hello
消费者 topic_recv.py
代码语言:javascript复制import pika
import sys
#你的RabbitMQ的地址
host = "你的RabbitMQ的地址"
#RabbitMQ端口号
post = 5672
#创建的账号,当然了也可以使用默认的guest账号,密码也是guest
username = "admin"
#账号的密码
password = "123456"
# 创建一个有凭证的新实例
credentials = pika.PlainCredentials(username, password)
# 使用凭证连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host,post,credentials=credentials))
#声明一个管道
channel = connection.channel()
#指定使用的交换类型和交换器
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
#自动产生一个队列,exclusive=True:自动销毁
result = channel.queue_declare(queue='',exclusive=True)
#获取自己产生的队列名字
queue_name = result.method.queue
#拿到输入的第一个数为key,就在这个上面监听
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume( queue_name,callback,True)
channel.start_consuming()
# python topic_recvive.py # 使用" # "号收所有
# python topic_recvive.py mysql.* 使用"mysql.* "号收来自mysql的信息
# python topic_recvive.py mysql.error.* 使用"mysql.error.* "号收来自mysql的错误信息
# python topic_recvive.py *.django.* 使用"*.django.* "号收来自所有Django的信息
测试结果1
测试结果2
还有监听全部的可以收到
测试结果3
总结: 发送端1
代码语言:javascript复制python topic_send.py mysql.info ThisMysqlInfoMsg
[x] Sent 'mysql.info':'ThisMysqlInfoMsg'
python topic_send.py mysql.error.Insert ThisMysqlErrorInsertMsg
[x] Sent 'mysql.error.Insert':'ThisMysqlErrorInsertMsg'
python topic_send.py python.data Pythonaaaa
[x] Sent 'python.data':'Pythonaaaa'
接收端1
代码语言:javascript复制python topic_recv.py #
[*] Waiting for logs. To exit press CTRL C
[x] 'mysql.info':b'ThisMysqlInfoMsg'
[x] 'mysql.error.Insert':b'ThisMysqlErrorInsertMsg'
[x] 'python.data':b'Pythonaaaa'
接收端2
代码语言:javascript复制python topic_recv.py mysql.*
[*] Waiting for logs. To exit press CTRL C
[x] 'mysql.info':b'ThisMysqlInfoMsg'
接收端3
代码语言:javascript复制python topic_recv.py mysql.error.*
[*] Waiting for logs. To exit press CTRL C
[x] 'mysql.error.Insert':b'ThisMysqlErrorInsertMsg'
接收端4
代码语言:javascript复制python topic_recv.py python.*
[*] Waiting for logs. To exit press CTRL C
[x] 'python.data':b'Pythonaaaa'
四、问题集整理以及常见的错误
1、错误码403
账号密码错误
2、错误码404
出现404,大多数就是连接的地址有问题,或者断网了也会造成
3、错误码405
出现这个405,肯定是有已经在运行的程序了,被占用了。要先结束掉,才可以运行这个
4、新版与老版本的常见问题
第一处:关于callback与queue_name的位置
老版本:callback与queue_name的位置换了
代码语言:javascript复制channel.basic_consume( callback,queue_name ,True)
新版本:callback与queue_name的位置换了
代码语言:javascript复制channel.basic_consume( queue_name,callback,True)
第二处:关于队列名
老版本:可以不指定队列,就会自动生成
代码语言:javascript复制result = channel.queue_declare(exclusive=True)
新版本:必须指定一个空的就行
代码语言:javascript复制result = channel.queue_declare(queue='',exclusive=True)
参考文章
https://blog.csdn.net/fwk19840301/article/details/92986072 https://blog.csdn.net/banzhi8397/article/details/101392965 https://blog.csdn.net/Da___Vinci/article/details/100105451 https://www.cnblogs.com/shenyixin/p/9084249.html