【BCVP】实现基于 Redis 的消息队列

2022-04-11 15:47:05 浏览数 (1)

聆听自己的声音

如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯。

话说上回书我们说到了,Redis的使用修改《【BCVP更新】StackExchange.Redis的异步开发方式》,通过异步的时候,基本上会解决StackExRedis组件使用过程中,可能在并发的时候遇到的问题,而且该组件也是微软官方推荐的(参考微软微服务框架eShopOnContainers),如果一定要抬杠说不好用,其实是没必要的。那今天我们继续往下说,简单说下如何基于Redis实现消息队列。

目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ等这几种。当然常见的还是基于RabbitMQ来实现的,Redis份额稍微小了一点,但是因为Redis的仓储、缓存等多个方面的好处,使得Redis也是很火。

1 什么是消息队列

这个其实我今天不打算重点讲,因为我详细每个人能看这篇文章,肯定都知道消息队列的相关内容,但是为了不那么突兀,我就从网上粘贴几块基本概念,了解一二:

基本概念:

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。

消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

最终可以实现解耦的目的。

下面通过一个简单的架构模型来解释:

  • Producer:消息生产者,负责产生和发送消息到Broker。
  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个Queue。
  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理。

有哪些优缺点:

从上边的定义中,我们可以看出来,优点主要是三块:

异步、流量削峰与流控、解耦

这三个优点在高并发等三高场景还是很有必要的,甚至说是十分必要的。

典型的广播模式,一个消息可以发布到多个消费者;

消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息;

比如我们某宝下订单,或某6抢车票,那都是放到队列里缓冲的,要是都用服务端等待,可能早就崩了,当然实际上比这个复杂的多。 而且,通过订阅发布的模式,异步执行,这样就会大大缓解时间压力。

但是,随之而来的弊端也是有的: 比如为了异步,就是接收者必须轮询消息队列,才能收到最近的消息。然后还有就是不能达到实时性,说白了就是用空间换时间,从而降低瓶颈。

消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。不能保证每个消费者接收的时间是一致的。若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。

五种常见模式

简单模式Hello World 功能:一个生产者P发送消息到队列Q,一个消费者C接收

工作队列模式Work Queue 功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

发布/订阅模式Publish/Subscribe 功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

路由模式Routing 说明:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key

通配符(主题)模式Topic 说明:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;

更多具体的内容呢,自己感兴趣多去搜索下吧,肯定还是有很多其他问题的,我这里就不铺开了讲了,下边咱们就说说,如何在Blog.Core里添加队列吧。

2 订阅发布相关配置案例

案例有很多,自己可以根据情况自定义。

那既然要讲东西,肯定不能随便放一个算法,肯定是需要一个小demo,一个应用场景,这样更有助于初学者去理解,之前考虑了很多,一直没有想好在BlogCore里边使用什么案例场景来说一说消息队列,最后实在是没办法,只能说日志了,万事不决就说日志,好像软件开发都是这么举例的。

这里说一下,假设我们自定义了一个日志记录的方法,就是在txt里写数据,其实我现在也是这么用的,平时肯定会一边查一边写,如果并发高一下,肯定就会出现死锁或者异常的出现,那我们就可以把写日志放到消息队列里,缓冲一下,然后在写一个订阅者,专门来“盯着”队列,一有消息传过来,就写到日志文件里,这样就能很好的实现相应的目的。如果不缓冲下,有时候日志可能高达几万条,瞬间爆炸。

那说了这个小场景,接下来就简单的模拟一下吧。

1、定义消息队列操作类与接口

既然要发布和订阅消息,肯定就需要有相应的操作方法,在上一篇文章中,我新建了一个RedisBasketRepository.cs的操作类,那我们还继续在这个类文件里写吧,注意,这个实现类和接口,已经注册到服务容器了,如果你第一次操作,可以参考文章开头上篇文章内容:

代码语言:javascript复制
 /// <summary>
 /// 根据key获取RedisValue
 /// </summary>
 /// <typeparam name="T"></typeparam>
 /// <param name="redisKey"></param>
 /// <returns></returns>
 public async Task<RedisValue[]> ListRangeAsync(string redisKey)
 {
     return await _database.ListRangeAsync(redisKey);
 }

 /// <summary>
 /// 在列表头部插入值。如果键不存在,先创建再插入值
 /// </summary>
 /// <param name="redisKey"></param>
 /// <param name="redisValue"></param>
 /// <returns></returns>
 public async Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1)
 {
     return await _database.ListLeftPushAsync(redisKey, redisValue);
 }
 
 /// <summary>
 /// 在列表尾部插入值。如果键不存在,先创建再插入值
 /// </summary>
 /// <param name="redisKey"></param>
 /// <param name="redisValue"></param>
 /// <returns></returns>
 public async Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1)
 {
     return await _database.ListRightPushAsync(redisKey, redisValue);
 }

 /// <summary>
 /// 移除并返回存储在该键列表的第一个元素  反序列化
 /// </summary>
 /// <param name="redisKey"></param>
 /// <returns></returns>
 public async Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class
 {
     return JsonConvert.DeserializeObject<T>(await _database.ListLeftPopAsync(redisKey));
 }

我这里只是简单的Copy出来几个做例子,总的一共有12个,当然你也可以自定义增加或删除某些不必要的,核心的可以看出来,都是根据redisKey来操作的

代码语言:javascript复制

  Task<RedisValue[]> ListRangeAsync(string redisKey);
  Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1);
  Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1);
  Task<long> ListRightPushAsync(string redisKey, IEnumerable<string> redisValue, int db = -1);
  Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class;
  Task<T> ListRightPopAsync<T>(string redisKey, int db = -1) where T : class;
  Task<string> ListLeftPopAsync(string redisKey, int db = -1);
  Task<string> ListRightPopAsync(string redisKey, int db = -1);
  Task<long> ListLengthAsync(string redisKey, int db = -1);
  Task<IEnumerable<string>> ListRangeAsync(string redisKey, int db = -1);
  Task<IEnumerable<string>> ListRangeAsync(string redisKey, int start, int stop, int db = -1);
  Task<long> ListDelRangeAsync(string redisKey, string redisValue, long type = 0, int db = -1);
  Task ListClearAsync(string redisKey, int db = -1);

2、如何发布消息与接收消息

上边定义好了相应的操作方法以后,就很简单了,我们来发布一条消息来试试:

代码语言:javascript复制
 [HttpGet]
 [AllowAnonymous]
 public async Task RedisMq()
 {
     var msg = "这里是一条日志";
     await _redisBasketRepository.ListLeftPushAsync(RedisMqKey.Loging, msg);
 }

就是这么简单,构造函数注入以后,直接调用相应的方法,就把消息msg推送到了队列里了,这里的redisKey,我用了常量定义,具体可操作Blog.Core源代码。

现在是发布消息特别简单,只需要一行接口,那如何去获取呢,在上边的获取方法中,我们定义的是:

代码语言:javascript复制
Task<RedisValue[]> ListRangeAsync(string redisKey);

这个方法也是可以的,只不过我们需要对其进行转换,毕竟存的msg是字符串string类型的,但是这里的返回类型的RedisValue[],所以需要劈里啪啦转化一下。

但是这里有一个问题,就是如何去定时获取呢,也就是如何设计一个订阅者进行消费消息呢,这需要思考下,当然比较简单的就是while(true){},可能平时就是这么使用的,不过还是不是那么爽快,可以写一个组件来处理,简单快捷,正好,有一个大佬已经封装好了,我们可以直接拿来用,如果你有什么问题,可以给他提issue。

3、InitQ组件来订阅消息

在nuget中,可以直接安装组个组件:

代码语言:javascript复制
<PackageReference Include="InitQ" Version="1.0.0.4" />

他的开源地址是:

https://github.com/wmowm/Initq

使用方法很简单,可以参考他的README里的介绍:

1、先添加服务

代码语言:javascript复制
 /// <summary>
/// Redis 消息队列 启动服务
/// </summary>
public static class RedisInitMqSetup
{
    public static void AddRedisInitMqSetup(this IServiceCollection services)
    {
        if (services == null) throw new ArgumentNullException(nameof(services));

        services.AddInitQ(m =>
        {
            //时间间隔
            m.SuspendTime = 5000;
            //redis服务器地址
            m.ConnectionString = "127.0.0.1:6379";
            //对应的订阅者类,需要new一个实例对象,当然你也可以传参,比如日志对象
            m.ListSubscribe = new List<IRedisSubscribe>() { new RedisSubscribe()};
            //显示日志
            m.ShowLog = false;
        });
    }
}

2、定义订阅者

代码语言:javascript复制
 public class RedisSubscribe : IRedisSubscribe
 {
     [Subscribe(RedisMqKey.Loging)]
     private async Task SubRedisLoging(string msg)
     {
         Console.WriteLine($"队列{RedisMqKey.Loging} 消费到/接受到 消息:{msg}");

         await Task.CompletedTask;
     }
 }

整体很简单,继承接口,然后添加上特性,这个特性里的参数,就是我们消息发布的时候的那个key,然后方法的参数,就是对应的消息msg,是不是很简单。

当然这里你可以传递一个日志的对象实例,这样就把日志信息分流到了队列里,然后队列走到这个订阅者里,由这里进行缓冲,然后把日志填充到日志文件,从而达到减峰的目的。

最终的效果可以看看:

好啦,今天的redis消息队列已经说完了,还是很简单的,其中重点还是那五种模式要自己好好了解下,然后整体过程自己把握把握,至于RabbitMQ,这个以后再说吧。

END

0 人点赞