- 基本介绍
- 交换机类型
- 公共参数说明
- 消息手动签收
- 消费者和生产者时间依赖关系
- 消费端获取消息模式
- 解决重复消费问题
- 死信队列
- 消息延时推送
基本介绍
- Broker(消息代理):接受客户端的链接,实现AMQP实体服务
- Producer:消息生产者
- Consumer:消息消费者
- Connection(连接):producer 和 consumer 与 broker的tcp连接
- Channel(网络信道):基于 connection 创建,消息读写都是在 channel 中进行。客户端可以建立多个channel,每个channel代表一个会话任务
- VirtualHost(虚拟主机) :一个broker里可以开设多个虚拟主机,用于进行逻辑隔离,最上层的消息路由。类似mysql的database
- Exchange(交换机) :接收消息,根据路由键转单消息到绑定队列
- Queue(消息队列) :是 RabbitMQ 的内部对象,用于存储消息。每个消息都会被投入到一个或多个队列。且多个消费者可以订阅同一个 Queue(这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理)
- Binding(绑定) :Exchange和队列Queue之间的虚拟链接。
- Routing Key(路由键) :消息发送给 Exchange时,消息将拥有一个路由键(默认为空), Exchange根据这个路由键将消息发送到匹配的队列中
- Binding Key(绑定键):将消息路由到所有绑定到该Exchange的Queue,但fanout下bindingkey会失效 图1-1
交换机类型
fanout
消息会发送到所有与该交换机绑定的队列中
图2-1
direct
消息通过RoutingKey精准匹配对应的队列中
图2-2
topic
消息通过RoutingKey模糊匹配到对应的队列中
- RoutingKey用"."分割字符串
- *:匹配一个单词
- #:匹配0个或多个单词 图2-3
headers
不依赖于路由键的匹配规则路由消息,根据发送的消息内容headers属性进行完全匹配(键值对形式)。性能差,基本不使用。
公共参数说明
队列参数
channel.QueueDeclare方法中arguments参数,队列一旦声明,参数将无法更改,添加,删除
参数名称 | 描述 | Features |
---|---|---|
x-message-ttl | 队列中的消息的生存周期,单位毫秒 | TTL |
x-expires | 队列在指定的时间内没有被使用(访问)就会被删除 | Exp |
x-max-length | 设置队列最大长度(先进先丢) | Lim |
x-max-length-bytes | 队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃 | Lim B |
x-overflow | 队列中的消息溢出时,(默认drop-head)丢弃队列头部的消息或(reject-publish)拒绝接收后面生产者发送过来的所有消息 | Ovfl |
x-single-active-consumer | 一次只能有一个消费者消费消息 | SAC |
x-dead-letter-exchange | 设置当前队列的死信交换机 | DLX |
x-dead-letter-routing-key | 设置死信交换机的路由key,死信交换机会根据该值去找到死信消息存放的队列 | DLK |
x-max-priority | 队列中的消息的优先级最大值,不设置的队列就不支持优先级发送消息 | Pri |
x-queue-mode | 懒人模式的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用。当消费者开始消费的时候才加载到内存中。 | Args |
x-queue-master-locator | master queue host 的分配策略:min-masters、client-local和random |
消息参数
参数名称 | 描述 |
---|---|
content_type | 消息内容的类型 |
content_encoding | 消息内容的编码格式 |
priority | 消息的优先级 |
correlation_id | 用于将RPC响应与请求相关联 |
reply_to | 回调队列 |
expiration | 消息过期时间,单位毫秒.该参数值优先级>队列参数设置中的消息生存期 |
message_id | 消息id |
timestamp | 消息的时间戳 |
type | 类型 |
user_id | 用户id |
app_id | 应用程序id |
cluster_id | 集群id |
消息手动签收
- 签收异常,没有调用basic.ack;当前会话处于连接状态时,消息转变为unacked状态,其他消费者消费不到,当前会话断开,unacked的消息会重新变为ready状态,其他消费者才能够重新消费
- 签收正常,成功调用basic.ack,队列中立即删除消息
- basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列
- basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到
- basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己
消费者和生产者时间依赖关系
- 消费者和生产者已知,消费者和生产者之间没有时间依赖关系
- 生产者已知,消费者未知,需要消费者订阅后才能接收消息
消费端获取消息模式
- **推模式:**消息中间件主动将消息推送给消费者,消费者需要设置一个缓冲区缓存消息,效率高,但缓冲区可能会溢出
- **拉模式:**消费者主动从消息中间件拉取消息,网络开销会增加消息延迟,降低系统吞吐量
拉模式适用场景
- 消费者在某个条件成立时才能消费消息
- 需要批量拉取消息进行处理,连续调用
basicGet
方法拉取多条消息,处理完毕一次性返回ACK
解决重复消费问题
- 利用数据库主键去重
- 利用Redis的原子性去实现
redis是单线程的,但是性能好也有很多原子性的命令,比如setnx命令,在接收到消息后将消息ID作为key去执行setnx命令,如果执行成功则表示没有执行过这条消息,可以进行消费(setnx命令特点:当且仅当key不存在,将key值设为value值;若key已存在该命令不做任何操作)
- 使用全局ID区分消息,解决幂等性
生产者在请求头设置messageId,可以用随机ID或业务逻辑唯一ID
死信队列
- 消息被拒(basicreject or basicnack)并且没有重新入队(requeue=false);
- 当前队列中的消息数量已经超过最大长度
- 消息在队列中过期
配置死信队列
代码语言:javascript复制 public static void SendMessage()
{
var exchangeA = "exchange";
var routeA = "routekey";
var queueA = "queue";
var exchangeD = "dlx.exchange";
var routeD = "dlx.route";
var queueD = "dlx.queue";
var connection = RabbitMQHelper.GetConnection();
{
var channel = connection.CreateModel();
{
// 创建死信交换机
channel.ExchangeDeclare(exchangeD, type: "fanout", durable: true, autoDelete: false);
// 创建死信队列
channel.QueueDeclare(queueD, durable: true, exclusive: false, autoDelete: false);
// 绑定死信交换机和队列
channel.QueueBind(queueD, exchangeD, routeD);
channel.ExchangeDeclare(exchangeA, type: "fanout", durable: true, autoDelete: false);
channel.QueueDeclare(queueA, durable: true, exclusive: false, autoDelete: false, arguments:
new Dictionary<string, object> {
{ "x-dead-letter-exchange",exchangeD}, //设置当前队列的DLX
{ "x-dead-letter-routing-key",routeD}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
// { "x-message-ttl",10000}, //设置消息的存活时间,即过期时间
{ "x-max-length",5}//设置队列最大长度
});
channel.QueueBind(queueA, exchangeA, routeA);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//发布消息
channel.BasicPublish(exchange: exchangeA,
routingKey: routeA,
basicProperties: properties,
body: Encoding.UTF8.GetBytes("hello rabbitmq message"));
}
}
}
重试失败特定次数后放入死信队列
代码语言:javascript复制 private static string _exchangeNormal = "Exchange.Normal"; //定义一个用于接收 正常 消息的交换机
private static string _exchangeRetry = "Exchange.Retry"; //定义一个用于接收 重试 消息的交换机
private static string _exchangeFail = "Exchange.Fail"; //定义一个用于接收 失败 消息的交换机
private static string _queueNormal = "Queue.Noraml"; //定义一个用于接收 正常 消息的队列
private static string _queueRetry = "Queue.Retry"; //定义一个用于接收 重试 消息的队列
private static string _queueFail = "Queue.Fail"; //定义一个用于接收 失败 消息的队列
public static void Test()
{
var connection = RabbitMQHelper.GetConnection();
var channel = connection.CreateModel();
//声明交换机
channel.ExchangeDeclare(_exchangeNormal, "topic", true, false, null);
channel.ExchangeDeclare(_exchangeRetry, "topic", true, false, null);
channel.ExchangeDeclare(_exchangeFail, "topic", true, false, null);
//定义队列参数
var queueNormalArgs = new Dictionary<string, object>();
{
queueNormalArgs.Add("x-dead-letter-exchange", _exchangeFail); //指定死信交换机,用于将 Normal 队列中失败的消息投递给 Fail 交换机
}
var queueRetryArgs = new Dictionary<string, object>();
{
queueRetryArgs.Add("x-dead-letter-exchange", _exchangeNormal); //指定死信交换机,用于将 Retry 队列中超时的消息投递给 Normal 交换机
queueRetryArgs.Add("x-message-ttl", 6000); //定义 queueRetry 的消息最大停留时间 (原理是:等消息超时后由 broker 自动投递给当前绑定的死信交换机) //定义最大停留时间为防止一些 待重新投递 的消息、没有定义重试时间而导致内存溢出
}
var queueFailArgs = new Dictionary<string, object>();
{
}
//声明队列
channel.QueueDeclare(queue: _queueNormal, durable: true, exclusive: false, autoDelete: false, arguments: queueNormalArgs);
channel.QueueDeclare(queue: _queueRetry, durable: true, exclusive: false, autoDelete: false, arguments: queueRetryArgs);
channel.QueueDeclare(queue: _queueFail, durable: true, exclusive: false, autoDelete: false, arguments: queueFailArgs);
//为队列绑定交换机
channel.QueueBind(queue: _queueNormal, exchange: _exchangeNormal, routingKey: "#");
channel.QueueBind(queue: _queueRetry, exchange: _exchangeRetry, routingKey: "#");
channel.QueueBind(queue: _queueFail, exchange: _exchangeFail, routingKey: "#");
#region 创建一个普通消息消费者
{
var consumer = new EventingBasicConsumer(channel);
consumer.Received = (sender, e) =>
{
var _sender = (EventingBasicConsumer)sender; //消息传送者
var _channel = _sender.Model; //消息传送通道
var _message = (BasicDeliverEventArgs)e; //消息传送参数
var _headers = _message.BasicProperties.Headers; //消息头
var _content = Encoding.UTF8.GetString(_message.Body.ToArray()); //消息内容
var _death = default(Dictionary<string, object>); //死信参数
if (_headers != null && _headers.ContainsKey("x-death"))
_death = (Dictionary<string, object>)(_headers["x-death"] as List<object>)[0];
try
#region 消息处理
{
Console.WriteLine();
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}t(1.0)消息接收:rnt[deliveryTag={_message.DeliveryTag}]rnt[consumerID={_message.ConsumerTag}]rnt[exchange={_message.Exchange}]rnt[routingKey={_message.RoutingKey}]rnt[content={_content}]");
throw new Exception("模拟消息处理失败效果。");
//处理成功时
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}t(1.1)处理成功:rnt[deliveryTag={_message.DeliveryTag}]");
//消息确认 (销毁当前消息)
_channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
}
#endregion
catch (Exception ex)
#region 消息处理失败时
{
var retryCount = (long)(_death?["count"] ?? default(long)); //查询当前消息被重新投递的次数 (首次则为0)
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}t(1.2)处理失败:rnt[deliveryTag={_message.DeliveryTag}]rnt[retryCount={retryCount}]");
if (retryCount >= 2)
#region 投递第3次还没消费成功时,就转发给 exchangeFail 交换机
{
//消息拒绝(投递给死信交换机,也就是上边定义的 ("x-dead-letter-exchange", _exchangeFail))
_channel.BasicNack(deliveryTag: _message.DeliveryTag, multiple: false, requeue: false);
}
#endregion
else
#region 否则转发给 exchangeRetry 交换机
{
var interval = (retryCount 1) * 10; //定义下一次投递的间隔时间 (单位:秒)
//定义下一次投递的间隔时间 (单位:毫秒)
_message.BasicProperties.Expiration = (interval * 1000).ToString();
//将消息投递给 _exchangeRetry (会自动增加 death 次数)
_channel.BasicPublish(exchange: _exchangeRetry, routingKey: _message.RoutingKey, basicProperties: _message.BasicProperties, body: _message.Body);
//消息确认 (销毁当前消息)
_channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
}
#endregion
}
#endregion
};
channel.BasicConsume(queue: _queueNormal, autoAck: false, consumer: consumer);
}
#endregion
}
消息延时推送
- 过期队列 死信交换机
- RabbitMQ 3.6.x 开始可以使用延迟插件,交换机类型选择x-delayed-message(延迟将数据放入队列)
public static void ConsumerMessage()
{
var connection = RabbitMQHelper.GetConnection();
var channel = connection.CreateModel();
var exchangeArgumets = new Dictionary<string, object>
{
{ "x-delayed-type", "topic" } //延迟交换机的类型
};
channel.ExchangeDeclare("delay_exchange", "x-delayed-message", true, false, exchangeArgumets);
// 创建队列
string queueName1 = "delay_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "delay_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "delay_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 绑定到交互机
channel.QueueBind(queue: queueName1, exchange: "delay_exchange", routingKey: "delayed-direct1");
channel.QueueBind(queue: queueName2, exchange: "delay_exchange", routingKey: "delayed-direct2");
channel.QueueBind(queue: queueName3, exchange: "delay_exchange", routingKey: "delayed-direct3");
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 标记消息持久化
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received = (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var routingKey = ea.RoutingKey;
var _headers = ea.BasicProperties.Headers; //消息头
int delay = 0;
if (_headers == null)
{
ea.BasicProperties.Headers = new Dictionary<string, object>();
}
else if ( _headers.ContainsKey("x-delay"))
{
delay = Convert.ToInt32(ea.BasicProperties.Headers["x-delay"]);
delay = delay 20000;
}
ea.BasicProperties.Headers["x-delay"] = delay; //消息头设置消息延迟的时间
Console.WriteLine($" {DateTime.Now}=={delay}");
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
channel.BasicPublish(ea.Exchange, ea.RoutingKey, basicProperties: ea.BasicProperties, body);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName3,
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}