RabbitMQ-消息队列
其他主流的MQ还有:ZeroMQ 和 ActiveMQ ,就知道一下好了。
安装RabbitMQ
我是在CentOS7上安装的,直接用yum安装,安装起来就比较简单了。
安装epel源
首先你得有EPEL源,没有的话可以安装一下:
代码语言:javascript复制$ yum epel-release
安装rabbitmq-server
有了源之后就可以用yum一步安装上RabbitMQ了
代码语言:javascript复制$ yum rabbitmq-server
好了,这样就安装完了。 其实,rabbitmq是用erlang语言实现的,这里用yum安装,把有依赖关系的erlang也一起安装好了。
启动rabbitmq
启动rabbitmq服务:
代码语言:javascript复制$ systemctl start rabbitmq-server
如果你还想开启自动启动,就再enable一下。如果就是玩一下的话,每次开机自己手动start一下也行,就不要enable了。
代码语言:javascript复制$ systemctl enable rabbitmq-server
开放防火墙端口
首先会用到5672端口,就先开放一下5672端口,如下
代码语言:javascript复制$ firewall-cmd --permanent --add-port=5672/tcp
$ firewall-cmd --reload
安装第三方模块
最后我们还要在python上安装一个pika模块 好了所有准备工作算是完成了
准备开始学习
上面都是准备工作,现在可以开始了。 这里是官网:http://www.rabbitmq.com/getstarted.html 这个页面里有6个上手的例子和说明,下面就是把这6个例子拿来练练手,熟练使用方法。
简单的队列通信
官网上的 1 "Hello World!" 先写一个send端:
代码语言:javascript复制import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.3.108')) # 建立连接
channel = connection.channel() # 声明一个管道
channel.queue_declare(queue='hello') # 声明一个队列
# 下面一句就是发消息了
# exchange:暂时不知道
# routing_key:你的队列的名字
# body:你的消息的内容
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print("Send: 'Hello World!'") # 本地打印一下
connection.close() # 关闭,关闭队列就好了,管道不用关闭
# 关闭同时主要还是为了刷新缓冲区,把消息真正的传递给RabbitMQ
然后是recv端:
代码语言:javascript复制import pika
# 建立连接、管道、队列,和send端一样
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.3.108'))
channel = connection.channel()
# 声明队列,只要声明一次就好了,之前send端也声明过了,但是重复声明并没有问题
# 不声明也没有关系,只要你确认这个队列已经声明过了
# 但是,我们并不确定之前是否声明了队列,所以每次都声明一下才是好的做法
channel.queue_declare(queue='hello')
# 准备一个回调函数,下面是一个标准的声明回调函数的格式,带4个参数
def callback(ch, method, properties, body):
print(ch) # 管道,就是channel
print(method) # 一般不用,不过能看到send端的basic_publish里的信息
print(properties) # 暂时不知道,后面会讲
print("Recv: %r" % body) # 注意,收到的是bytes格式
# 声明接收消息的语法,并没有开始收
# 声明回调函数,如果收到消息,就调用这个函数
# 从哪个队列里收消息
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print('等待接收消息,按 CTRL C 退出')
# 这里开始持续接收消息,没有消息就阻塞
channel.start_consuming()
send端是执行一次就发一条消息然后结束。
recv端看最后一句 channel.start_consuming()
,一旦start就会持续接收消息,已有消息就会调用回调函数,这里我们会把消息打印出来。
如果send端没有打开,则recv会一直阻塞等待消息。如果recv端没有打开,每次send的消息都会存在服务器上,直到有recv端接入接收消息。
消息分发轮训
还是上面的send端和recv端,试着开启多个recv端。查看send端发送的消息被哪个recv端接收到了。
每send一次消息,每次只会有一个recv端接收到。这是一个轮训机制,也就是负载均衡,把消息依次分发给recv端。
期间还可以再增加或者减少recv端的数量。
recv端的 channel.basic_consume
下有一个no_ack参数,默认是False。这个参数是控制recv端是否在调用完成回调函数后给send端一个确认的,默认是要开启确认的,之前我们都关掉了。就是执行后不确认,也就是服务端把一个消息分发出去后就不管了。客户端接收之后可能没能正常执行完毕,下面来模拟一下。
把no_ack参数设为False,或者删掉,默认就是False。然后回调函数里一定要加上一句表示确认消息处理完毕的语句 ch.basic_ack(delivery_tag=method.delivery_tag)
。
手动给回调函数加上一个time.sleep,让一条消息需要处理一段时间。我们在recv端开始处理消息但是没处理完之前把这个程序停了,观察其他recv端的情况。新的recv端如下:
import pika, time
# 建立连接、管道、队列,和send端一样
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.246.11'))
channel = connection.channel()
# 声明队列,只要声明一次就好了,之前send端也声明过了,但是重复声明并没有问题
# 不声明也没有关系,只要你确认这个队列已经声明过了
# 但是,我们并不确定之前是否声明了队列,所以每次都声明一下才是好的做法
channel.queue_declare(queue='hello')
# 准备一个回调函数,下面是一个标准的声明回调函数的格式,带4个参数
def callback(ch, method, properties, body):
for i in range(10):
print('r%d' % i, end='', flush=True)
time.sleep(1)
print("rTime's up")
print("Recv: %r" % body) # 注意,收到的是bytes格式
# 如果没有确认,服务端就不认为消息处理完成了
# 所以,依然会将这条消息留在队列里,准备分发给别的recv端处理
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息处理完毕
# 声明接收消息的语法,并没有开始收
# 声明回调函数,如果收到消息,就调用这个函数
# 从哪个队列里收消息
channel.basic_consume(callback,
queue='hello',) # no_ack参数默认值是False,就是需要确认
print('等待接收消息,按 CTRL C 退出')
# 这里开始持续接收消息,没有消息就阻塞
channel.start_consuming()
默认的no_ack=False的模式,如果一个消息没有确认,但是连接断了,那么这个消息还会有别的recv端重新处理。只有在recv端确认了之后,才会从服务器的队列中清除。
消息持久化
官网上的 2 Work queues 先查看一下rabbitmq服务器上的信息,使用这个命令可以查看服务器上的队列:
代码语言:javascript复制$ rabbitmqctl list_queues
此时应该可以看到之前的hello这个队列,后面有个数字,表示这个队列里还有多少条消息。因为是0,表示消息都收完了。如果还有消息,可以启动recv端把消息收下来,然后再确认一下服务端队列的情况。还有一种情况是no_ack=False之后,回调函数里没有加确认,那么所有需要确认的消息都会留在队列中,记得在recv端加上确认的语句把消息收完。 服务器中的队列会在服务重启后丢失,包括队列和队列中的消息。重启服务:
代码语言:javascript复制$ systemctl restart rabbitmq-server
此时再启动send端发两条消息,不用收。 然后修改一下send端,修改声明队列的那一句,把队列名字改掉,加上durable参数设为True,后面一句发消息的语句里routing_key记得也把队列名字改了。还是发几条消息出来不收。
代码语言:javascript复制channel.queue_declare(queue='hello2', durable=True)
此时再去确认一下服务器上的两个队列,以及队列中的消息数量,都应该有。再次重启服务后,检查队列信息。现在没有durable的队列彻底没了,durable设为True的队列,只是队列还在,但是消息还是没有,现在只是队列持久化了。还需要在发消息的语句里加上一个参数,把消息也声明为持久化。新的消息持久化的send端如下:
代码语言:javascript复制import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.246.11'))
channel = connection.channel()
channel.queue_declare(queue='hello2', durable=True) # 声明一个持久化的队列
channel.basic_publish(exchange='',
routing_key='hello2',
body='Hello2 World!',
properties=pika.BasicProperties(delivery_mode=2)
) # 声明这条消息也是要持久化的
print("Send: 'Hello World!'")
connection.close()
此时发出的消息,会在服务器上实现持久化,就是重启服务或者关机,恢复后队列和消息都不会丢失,使用recv端可以继续收消息。 注意:重复声明队列的时候,队列的属性需要与已有的队列一致,否则运行到声明的语句会报错。
消息公平分发
上面已经试过了,消息可以轮训的分发到每一个recv端,实现负载均衡。但是现实环境中可能每个recv端的性能是不一样的,如果只是这种简单的轮训,那么处理快的机器可能是空闲状态,但是处理慢的机器可能会积累很多消息来不及处理。 通过在recv端加如下的一句语句:
代码语言:javascript复制channel.basic_qos(prefetch_count=1)
表示这个recv端一次只处理一条消息,如果1条消息没处理完,就不会在给它发新的消息处理。这个数字还可能调整。
广播(消息发布订阅)
官网上的 3 Publish/Subscribe send端就是消息发布,recv端就是订阅。 之前的例子都是一对一发送消息的,消息只能发送到指定的队列中,用的是默认的exchange设置。通过定义exchange来实现更多的效果,比如广播效果。 Exchange在定义的时候有类型的,以决定到底是哪些Queue符合条件,可以接收消息:
- fanout: 所有bind到此exchange的queue都可以接收消息,就是所有队列都能收(广播)
- direct: 通过routingKey和exchange决定哪个queue可以接收消息,默认就是这个但是还可以再设置
- topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息,就是部分队列可以收
- headers: 通过headers 来决定把消息发给哪些queue,这个号称用的少
消息发布的send端:
代码语言:javascript复制import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.246.11'))
channel = connection.channel()
# 定义一个广播模式的exchange,需要名字和类型
channel.exchange_declare(exchange='broadcast',
exchange_type='fanout')
# 不需要声明队列了,因为所有队列都能收
channel.basic_publish(exchange='broadcast',
routing_key='',
body='Hello World!')
print("Send: 'Hello World!'")
connection.close()
订阅的recv端:
代码语言:javascript复制import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.246.11'))
channel = connection.channel()
# 定义广播的exchange,和send端一样,重复声明,确认exchange一定存在的情况下可以不要
channel.exchange_declare(exchange='broadcast',
exchange_type='fanout')
# 下面声明队列的时候,不指定队列名字,服务器会随机分配一个名字
# exclusive设为True,断开后服务器会将这个queue删除
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue # 获取自动分配的队列名字,下面收消息的时候要名字
channel.queue_bind(exchange='broadcast',
queue=queue_name) # 将队列绑定到exchange转发器
def callback(ch, method, properties, body):
print("Recv: %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
print('等待接收消息,按 CTRL C 退出')
channel.start_consuming()
这里send端比较简单,只需要定义好exchange是广播模式,然后把消息发出就好了。这里其实消息广播出去后就没了,把消息广播到所有的队列。现在是还没队列,那也算广播出去了,就是没了。 recv端,也是先和send端一样声明一个广播模式的exchange。这里依然是重复声明,得保证这个exchange已经存在。 recv接收消息必须要通过队列,这里使用了自动分配的队列名,并且一旦断开队列也会被服务器删除,然后获取到这个队列名保存到变量中之后还要使用。 将队列名和exchange绑定,绑定后这个队列就能收到exchange的消息了。后面接收消息就一样了。
RabbitMQ收发消息的机制
讲到这里,可以先来理解下一RabbitMQ收发消息的机制了。下面是图,左边是生产者客户端就是send端,有边是消费者客户端就是recv端。中间是服务器包括exchange和queue。
我们得保证exchange和queue都得已经存在,尝试连接一个不存在的会报错,一般都是send和recv两边都声明一下,重复声明是正确的做法,声明后服务器上就会建立对应的exchange或queue。 send端负责建exchange,将消息发送到exchange,可以指定多个exchange recv端则是负责建queue,把queue绑定bind到exchange收取消息,可以指定多个queue exchange如何将消息传递给queue,就需要我名来绑定了,也可以多次绑定(不是一对一的)。 第一个例子中,貌似直接把消息发到队列里了。RabbitMQ中的消息是无法直接发送到queue中的,总是要通过一个exchange才能把消息传到queue中。这里用空字符串表示一个默认的exchange,这是一个特殊的exchange,可以精确的消息发送到一个指定的queue中。然后recv端是多个客户端连接一个队列,一个客户端把消息收掉后,其他客户端旧收不到了。 广播的例子中,是正常定义好了exchange,把消息发到exchange里,此时还没有绑定到任何队列,所有的消息都会丢失。然后recv端会建立自己的queue,然后绑定到exchange,此时exchange里的消息就会复制一份到绑定过的queue了。每个recv端连接的是不同的队列,每个队列里有一份消息,所以不会影响别的recv端收消息。
有选择的接收消息
官网上的 4 Routing exchanges中的消息还需要匹配routing_key。只有exchange和routing_key都匹配,才能收到消息。 sent端如下,每一条消息发送的时候,都指定了exchange和routing_key:
代码语言:javascript复制import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.3.108'))
channel = connection.channel()
# 声明exchange
channel.exchange_declare(exchange='direct_logs') # 默认类型就是'direct'
# 发送3条消息,每条的routing_key不同
severity_list = ['error', 'warning', 'info']
for severity in severity_list:
msg = "%s: %s" % (severity, 'Hello World!')
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=msg)
print("Send: %s" % msg) # 本地打印一下
connection.close() # 关闭,关闭队列就好了,管道不用关闭
recv端,你需要把队列精确的匹配exchang和routing_key进行绑定,可以多次绑定
代码语言:javascript复制import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.3.108'))
channel = connection.channel()
# 声明exchange
channel.exchange_declare(exchange='direct_logs')
# 声明自动分配的队列名
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 多次绑定多个routing_key
severity_list = ['error', 'warning', 'info'] # 使用这个列表可以收到3条消息
# severity_list = ['error'] # 使用这个列表,只能收到error的一条消息
for severity in severity_list:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
def callback(ch, method, properties, body):
print(method.exchange, method.routing_key) # method里的一些详细信息
print("Recv: %r" % body)
# 声明接收消息的语法
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
print('等待接收消息,按 CTRL C 退出')
channel.start_consuming()
例子中的 severity_list
可以控制收消息的范围。只会收到 severity_list
和 routing_key
匹配的消息。
更细致的消息过滤
官网上的 5 Topics
send端和上面的Routing的模式基本一样,只是exchange的type变成了topic。然后你的routeing_key可以写的更加复杂一点,规则如下:由1个或多个关键字组成,关键字之间以点分隔。
recv端接收的时候还是去匹配exchange和routing_key,只是这次的routing_key不是精确匹配了,而是可以有通配符,通配符就#和*,规则也很简单:
# : 匹配0个或多个关键字
* : 匹配1个关键字
比如,anonymous.info
,可以被 *.info
或 anonymous.*
匹配到,但如果是 a.anonymous.info.b
就无法被匹配到了,因为*号只能匹配一个关键字。
再比如, #.key_word.#
就可以匹配到所有含有 key_word
的消息(包括 routingkey='key_word'
),无论前后是否有别的关键字或者多个关键字,但是 key_word 必须是作为单独的关键字出现。比如 key_words (这里多了个s)就会被认为是另外一个关键字了。总之关键字之间要用点分隔开。
最后,用 #
,就是收所有了,就相当于广播了。
再不清楚试一下就明白了,send端:
import pika
def send(routing_key):
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.246.11'))
channel = connection.channel()
# 声明exchange
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic') # 只是类型变了
msg = "%s: %s" % (routing_key, 'Hello World!')
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=msg)
print("Send: %s" % msg)
connection.close()
if __name__ == '__main__':
routing_key = 'anonymous.info' # 修改你的routing_key
send(routing_key)
recv端:
代码语言:javascript复制import pika
def recv(binding_key):
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.246.11'))
channel = connection.channel()
# 声明exchange
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
# 声明自动分配的队列名
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
def callback(ch, method, properties, body):
print("Recv: %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
print('等待接收消息,按 CTRL C 退出')
channel.start_consuming()
if __name__ == '__main__':
binding_key = '*.info' # # 修改你的binding_key以匹配routing_key
recv(binding_key)
Remote procedure call (RPC)
官网上的 6 RPC 上面都是单向的消息流,send端只有发,recv端只有收。使用RPC,recv端可以再把消息返回给send端。 这里先准备一个服务端,接收数据,把收到的数据转成数字,计算出一个结果(这里算一个之前用到过的斐波那契数列)。算出结果后还要发回给客户端。要往回发消息,就需要在回调函数里再调用一个发消息的方法。
代码语言:javascript复制import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.246.11'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
# 这个函数不是这里的重点,
# 只是通过客户端发来的数,计算后把结果回复给客户端
def fib(n):
"""计算斐波那契数列"""
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) fib(n-2)
def on_request(ch, method, props, body):
"""回调函数
先把消息转成数字
根据数字计算出结果
把计算结果发回去
回复一个任务已经完成
"""
n = body.isdigit() and int(body) or 0 # body必须是数字,否则按0处理
print(" [.] fib(%s)" % n)
response = fib(n)
# 下面是给吧结果回复给客户端
# 方法就是和send端发送数据的方法一样
# 但是这里不需要生成一个channel了,而是用接收数据已有的ch
# 回复的时候处理有计算结果,还要把收到的id发回去
# 客户端通过correlation_id把id发过来,服务的通过props.correlation_id接收
# 然后回复计算结果是再通过correlation_id把id发回给客户端
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag) # 回复任务已经处理完毕
channel.basic_qos(prefetch_count=1) # 一次只处理一个任务
# 从队列'rpc_queue'接收数据,然后调用回调函数'on_request'处理
channel.basic_consume(on_request,
queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
然后我们准备客户端,向服务端发起请求,获取计算后得到的结果。这里的客户端比较复杂,写成了一个类。 使用前先实例化,然后调用call方法把要发送的数据传入。发送的消息中还要包括用户接收计算结果的队列以及这条消息的id。 发送后,进入一个循环,反复调用一个非阻塞版的start_consuming。这个consume在实例化的时候,就已经生成了。一旦有消息进来会先调用回调函数,检查id和之前发出去的id是否一样,一样就赋值给 self.response 。此时满足了循环退出的条件,跳出循环返回接收到的计算结果。例子如下:
代码语言:javascript复制import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.246.11'))
self.channel = self.connection.channel()
# 自动生成一个队列,这个队列是用于接收计算结果的
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
# 从哪个队列收消息,这个是client,发起一个send,然后结果要通过recv收计算结果
# 队列在上面声明好了,回调函数后面写,不需要确认收到
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
"""接收计算结果的回调函数
比较本地id和接收到消息里的id是否一致
一致就确认是回复的计算结果
"""
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
"""把客户端的请求发送给服务端
就是send一条消息
消息send后是要等待回复的,所以消息中有接收回复的参数
"""
self.response = None # 存放收到的计算结果
self.corr_id = str(uuid.uuid4()) # 生成一个id随消息一同发出
# 下面是发送数据,比之前多了2个参数
# reply_to告知服务端用于回复计算结果的队列
# correlation_id告知服务端这个请求的id,服务端回复的时候会把这个id发出来,确认是这条消息的计算结果
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
# 消息发出后,就等待服务端回复了,下面是一个非阻塞的方法,确认消息是否收到
while self.response is None:
# 因为是非阻塞,所以这里会循环无数次
# 这个connection在实例化的时候就已经生成了接收数据的consume
# 下面这句就是一个非阻塞版的start_consuming
self.connection.process_data_events() # 非阻塞
# 计算结果会通过构造函数中定义的队列发回来
# 收到后会执行回调函数on_response
# 比较和之前的id一致后,确认是请求对应的回复
# 此时self.response会在回调函数中被赋值,下面就返回结果
return int(self.response)
if __name__ == '__main__':
fibonacci_rpc = FibonacciRpcClient() # 生成一个实例
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30) # 调用call方法,就是把消息发出去
print(" [.] Got %r" % response)
作业
基于RabbitMQ rpc实现的主机管理: 可以对指定(多台)机器异步的执行多个命令 例子:
代码语言:javascript复制>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334
>>: check_task 45334
>>:
注意:每执行一条命令,即立刻生成一个任务ID,不需等待结果返回,通过命令check_task TASK_ID来得到任务结果