我写了一点逗比代码,让在每次使用 NewLife 的 RocketMQ 发送消息时,都创建一个新的 Producer 生产者。此时我发现了在我的消费者里面,无论开多少个消费者实例进程,每次都只有一个消费者进行消费
本文记录的问题,和 NewLife 的 RocketMQ 库的设计毫无相关,仅仅只是我的逗比问题。还请大家放心使用 NewLife 的 RocketMQ 库
我在对 NewLife 的 RocketMQ 的 Producer 的逗比封装,让每次发送的时候,都不得不新建一个 Producer 实例。而有经过一些玄学的原因,如果每次的 Producer 都是新建出来的,将会导致只能有一个消费者实例去消费此消息内容
本文不去讨论玄学的原因,咱回到我的逗比代码
以下是我对 NewLife 的 RocketMQ 的 Producer 的逗比封装
代码语言:javascript复制 public class RocketProducer
{
public string Group { get; }
public string Topic { get; }
public string NameServerAddress { get; }
private NewLife.RocketMQ.Producer Producer { get; }
public RocketProducer(RocketMQConfiguration configuration) : this(configuration.Group, configuration.Topic,
configuration.NameServerAddress)
{
}
private RocketProducer(string @group, string topic, string nameServerAddress)
{
Group = @group;
Topic = topic;
NameServerAddress = nameServerAddress;
Producer = new NewLife.RocketMQ.Producer
{
NameServerAddress = NameServerAddress,
Topic = Topic,
Group = Group
};
Producer.Start();
}
public SendResult Send(string tag, string body)
{
var message = new Message
{
Topic = Topic,
Keys = $"Doubi_{DateTime.Now:yyMMddHHmmssfff}_{Guid.NewGuid().ToString().Substring(0, 8)}",
Tags = tag,
BodyString = body
};
var sendResult = Producer.Publish(message);
Producer.Dispose();
return sendResult;
}
}
大家是否能看出来锅在哪
可以看到在每次发送完成之后,就调用了 Producer.Dispose 方法释放了生产者
因此为了使用以上逗比的封装,就需要每次都创建一个 RocketProducer 的实例去发送一条消息。简化的测试代码大概如下
代码语言:javascript复制 /// <summary>
/// 生产者
/// </summary>
/// 以下是逗比代码
private void RegisterMqTaskProducerDoubi()
{
var group = _rocketMqConfiguration["Group"];
var nameServerAddress = _rocketMqConfiguration["NameServerAddress"];
var topic = _rocketMqConfiguration["Topic"];
var rocketMqConfiguration =
new RocketMqConfiguration(@group, nameServerAddress);
_logger.LogInformation($"RegisterMqTaskProducer NameServerAddress={nameServerAddress};Group={group};Topic={topic}");
Task.Run(async () =>
{
int n = 0;
while (_foo)
{
// 每次都新建一个,用来挖坑
var producer = new NewLife.RocketMQ.Producer
{
NameServerAddress = rocketMqConfiguration.NameServerAddress,
Topic = topic,
Group = rocketMqConfiguration.Group,
};
try
{
_logger.LogInformation("Start Producer");
producer.Start();
_logger.LogInformation("Finish Start Producer");
}
catch (Exception e)
{
_logger.LogWarning(e, e.ToString());
return;
}
var message = $"Message{n}";
_logger.LogInformation($"StartSend Topic={topic} Message={message}");
var result = producer.Publish(new Message()
{
Keys = $"Key{n}",
Topic = topic,
Tags = "Tag",
BodyString = message
});
producer.Dispose();
_logger.LogInformation($"FinishSend Result={result.Status};MessageId={result.MsgId};BrokerName={result.Queue.BrokerName};QueueOffset={result.QueueOffset}");
await Task.Delay(TimeSpan.FromSeconds(1));
n ;
if (n > 10000)
{
break;
}
}
});
}
private bool _foo = true;
大概就是每次都新建一个 Producer 用来发送
消费者的代码如下
代码语言:javascript复制 class FakeConsumer
{
public FakeConsumer(ILogger<MqHostedService> logger, IConfigurationSection rocketMqConfiguration, string name)
{
_logger = logger;
_rocketMqConfiguration = rocketMqConfiguration;
Name = name;
}
/// <summary>
/// 消费者
/// </summary>
public void RegisterMqTaskConsumer()
{
var group = _rocketMqConfiguration["Group"];
var nameServerAddress = _rocketMqConfiguration["NameServerAddress"];
var topic = _rocketMqConfiguration["Topic"];
_logger.LogInformation($"RegisterMqTaskConsumer NameServerAddress={nameServerAddress};Group={group};Topic={topic}");
var consumer = new Consumer
{
Group = group,
NameServerAddress = nameServerAddress,
Topic = topic,
};
consumer.BatchSize = 1; // 一次消费一个任务
consumer.AutoSchedule = true;
consumer.ConsumerInterval = 1000; // 消费间隔:1s
consumer.OnConsume = OnConsume;
consumer.Start();
}
private bool OnConsume(MessageQueue messageQueue, MessageExt[] messages)
{
try
{
_logger.CCloudInfo($" {Name} Message={messages[0].BodyString}",
string.Empty);
}
catch (Exception e)
{
_logger.CCloudInfo($"{messageQueue.QueueId} {messageQueue.BrokerName} MessageCount={messages.Length}",
e);
}
Task.Delay(TimeSpan.FromSeconds(5)).Wait();
_logger.CCloudInfo($"--{Name}-- Message={messages[0].BodyString}",
string.Empty);
// 返回 true 表示这个消息被消费成功
// 返回 false 表示这个消息消费失败,将会再次被投到消费者,但不一定再次被这个实例收到
//return _random.Next(10) > 5;
return true;
}
private Random _random = new Random();
private readonly ILogger<MqHostedService> _logger;
public string Name { get; }
private IConfigurationSection _rocketMqConfiguration;
}
在配置文件里面写上具体的配置,大概代码如下,请将具体的配置修改为你的消息队列服务器配置
代码语言:javascript复制{
"Logging":
{
"LogLevel":
{
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"RocketMq":
{
"Group": "DoubiLindexi",
"NameServerAddress": "doubi-test-rockermq-1.gz.lindexi.com",
"Topic": "Foo"
},
"AllowedHosts": "*"
}
执行此进程两次,只让单个进程调用到 RegisterMqTaskProducerDoubi 方法用来生产消息。此时可以看到只有单个进程可以收到消息,而另一个进程不能收到消息
更改生产者代码作为用一个 NewLife.RocketMQ.Producer 创建消息,如下面代码
代码语言:javascript复制 /// <summary>
/// 生产者
/// </summary>
private void RegisterMqTaskProducer()
{
var group = _rocketMqConfiguration["Group"];
var nameServerAddress = _rocketMqConfiguration["NameServerAddress"];
var topic = _rocketMqConfiguration["Topic"];
var rocketMqConfiguration =
new RocketMqConfiguration(@group, nameServerAddress);
_logger.LogInformation($"RegisterMqTaskProducer NameServerAddress={nameServerAddress};Group={group};Topic={topic}");
var producer = new NewLife.RocketMQ.Producer
{
NameServerAddress = rocketMqConfiguration.NameServerAddress,
Topic = topic,
Group = rocketMqConfiguration.Group,
};
try
{
_logger.LogInformation("Start Producer");
producer.Start();
_logger.LogInformation("Finish Start Producer");
}
catch (Exception e)
{
_logger.LogWarning(e, e.ToString());
return;
}
Task.Run(async () =>
{
while (_foo)
{
// 用来让单个进程生产消息,加入断点,然后拖到下一个语句
await Task.Delay(1000);
}
int n = 0;
while (_foo)
{
var message = $"Message{n}";
_logger.LogInformation($"StartSend Topic={topic} Message={message}");
var result = producer.Publish(new Message()
{
Keys = $"Key{n}",
Topic = topic,
Tags = "Tag",
BodyString = message
});
_logger.LogInformation($"FinishSend Result={result.Status};MessageId={result.MsgId};BrokerName={result.Queue.BrokerName};QueueOffset={result.QueueOffset}");
await Task.Delay(TimeSpan.FromSeconds(1));
n ;
if (n > 10000)
{
break;
}
}
});
}
此时可以看到,多个进程都能收到消息
所以如果消息队列的消息只有被有限个消费者进行消费,请了解自己的代码,是否每次发送消息都使用独立的生产者
本文会经常更新,请阅读原文: https://blog.lindexi.com/post/NewLife-的-RocketMQ-的生产者每次都是新实例将只由一个消费者消费.html ,以避免陈旧错误知识的误导,同时有更好的阅读体验。
本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。欢迎转载、使用、重新发布,但务必保留文章署名林德熙(包含链接: https://blog.lindexi.com ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。