消息中间件工作队列 — RabbitMQ

2020-11-25 12:00:13 浏览数 (1)

工作队列

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。

RabbitMQ分发策略:轮询和公平分发。

轮询分发:

如果现在有两个消费者,生产者产生的消息会轮流分发给两个消费者。

公平分发:

比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,在同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

代码语言:javascript复制
channel.basic_qos(prefetch_count=1)

关于队列大小

如果所有的工作者都处理繁忙状态,你的队列就会被填满。你需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

点对点进行发布:

生产者代码:

代码语言:javascript复制
import pika
import json

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'mq-test', durable=True)

for i in range(10):
    message=json.dumps({'CID':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange = '',routing_key = 'mq-test',body = message)
    print(message)
connection.close()

消费者代码:

代码语言:javascript复制
import pika

credentials = pika.PlainCredentials('xuan', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'mq-test', durable = True)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())

# 告诉rabbitmq,用callback来接收消息
channel.basic_consume('mq-test',callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

一对多进行发布:

生产者代码:

代码语言:javascript复制
#!/usr/bin/env python
import pika
import sys

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

消费者代码:

代码语言:javascript复制
#!/usr/bin/env python
import pika
import time

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL C')

# 当工作者(worker)完成了任务,就发送一个响应。
#下面的代码,我们发现即使使用CTRL C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

#可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

0 人点赞