RabbitMQ 封装

2021-12-05 18:01:22 浏览数 (1)

参考Abp事件总线的用法,对拷贝的Demo进行简单封装

定义 RabbitMQOptions 用于配置

代码语言:javascript复制
{ 
  "MyRabbitMQOptions": {
    "UserName": "admin",
    "Password": "admin",
    "Host": "192.168.124.220",
    "Port": 5672,
    "ExchangeName": "PerryExchange"
  }
}
代码语言:javascript复制
    public class MyRabbitMQOptions
    {
        public string UserName { get; set; }
        public string Password { get; set; }
        public string Host { get; set; }
        public int Port { get; set; }
        public string ExchangeName { get; set; } = "";
    }

定义 QueueNameAttribute 控制队列名字

代码语言:javascript复制
    /// <summary>
    /// 定义队列名字,优先级高于类完整名
    /// </summary>
    [AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
    public class QueueNameAttribute : Attribute
    {
        public string QueueName { get; }
        public QueueNameAttribute(string queueName)
        {
            QueueName = queueName;
        }
    }

定义 IMyPublisher<T>,通过注入某个类型的 IMyPublisher<T>,自动序列化对象并发布到配置好的MQ里

代码语言:javascript复制
    /// <summary>
    /// 用于注入使用
    /// </summary>
    public interface IMyPublisher<T> where T : class
    {
        Task PublishAsync(T data, Encoding encoding = null);
    }
代码语言:javascript复制
    public class MyPublisher<T> : IMyPublisher<T>, IDisposable where T : class
    {
        private readonly MyRabbitMQOptions _myOptions;
        private readonly IConnection _connection;
        private readonly IModel _channel;
        private readonly string _queueName;


        /// <summary>
        /// 非注入时使用此构造方法
        /// </summary>
        public MyPublisher(IConnection connection)
        {
            _connection = connection;
        }

        /// <summary>
        /// 依赖注入自动走这个构造方法
        /// </summary>
        /// <param name="optionsMonitor"></param>
        /// <param name="factory"></param>
        public MyPublisher(IOptionsMonitor<MyRabbitMQOptions> optionsMonitor, ConnectionFactory factory)
        {
            _myOptions = optionsMonitor.CurrentValue;
            _connection = factory.CreateConnection();

            // 创建通道
            _channel = _connection.CreateModel();

            // 声明一个Exchange
            _channel.ExchangeDeclare(_myOptions.ExchangeName, ExchangeType.Direct, false, false, null);


            var type = typeof(T);
            // 获取类上的QueueNameAttribute特性,如果不存在则使用类的完整名
            var attr = type.GetCustomAttribute<QueueNameAttribute>();
            _queueName = string.IsNullOrWhiteSpace(attr?.QueueName) ? type.FullName : attr.QueueName;

            // 声明一个队列 
            _channel.QueueDeclare(_queueName, false, false, false, null);

            //将队列绑定到交换机
            _channel.QueueBind(_queueName, _myOptions.ExchangeName, _queueName, null);
        }

        /// <summary>
        /// 发布消息
        /// </summary>
        public Task PublishAsync(T data, Encoding encoding = null)
        {
            // 对象转 object[] 发送
            var msg = JsonConvert.SerializeObject(data);
            byte[] bytes = (encoding ?? Encoding.UTF8).GetBytes(msg);
            _channel.BasicPublish(_myOptions.ExchangeName, _queueName, null, bytes);

            return Task.CompletedTask;
        }

        public void Dispose()
        {
            // 结束
            _channel.Close();
            _connection.Close();
        }
    }

定义 IMyEventHandler<T> ,供 NetCore 项目注入使用,配置后,可以在程序启动的时候,找到该接口所有的实现类,并开启消费者

代码语言:javascript复制
    /// <summary>
    /// Handler的配置
    /// </summary>
    public class MyEventHandlerOptions
    {
        /// <summary>
        /// 禁用 byte[] 解析
        /// </summary>
        public bool DisableDeserializeObject { get; set; } = false;
        /// <summary>
        /// 配置Encoding
        /// </summary>
        public Encoding Encoding { get; set; } = Encoding.UTF8;
    }
代码语言:javascript复制
    public abstract class MyEventHandler<T> : IMyEventHandler<T> where T : class
    {
        private IModel _channel;
        private string _queueName;
        private EventingBasicConsumer _consumer;
        public MyEventHandlerOptions Options = new()
        {
            DisableDeserializeObject = false
        };

        public void Begin(IConnection connection)
        {
            var type = typeof(T);
            // 获取类上的QueueNameAttribute特性,如果不存在则使用类的完整名
            var attr = type.GetCustomAttribute<QueueNameAttribute>();
            _queueName = string.IsNullOrWhiteSpace(attr?.QueueName) ? type.FullName : attr.QueueName;

            //创建通道
            _channel = connection.CreateModel();

            _consumer = new EventingBasicConsumer(_channel);
            _consumer.Received  = MyReceivedHandler;
            //消费者
            _channel.BasicConsume(_queueName, false, _consumer);
        }

        // 收到消息后
        private void MyReceivedHandler(object sender, BasicDeliverEventArgs e)
        {
            try
            {
                // 如果未配置禁用则不解析,后面抽象方法的data参数会始终为空
                if (!Options.DisableDeserializeObject)
                {
                    T data = null;
                    // 反序列化为对象
                    var message = Options.Encoding.GetString(e.Body);
                    data = JsonConvert.DeserializeObject<T>(message);
                    OnReceivedAsync(data, message).Wait();

                    // 确认该消息已被消费
                    _channel?.BasicAck(e.DeliveryTag, false);
                }
            }
            catch (Exception ex)
            {
                OnConsumerException(ex);
            }
        }

        /// <summary>
        /// 收到消息 
        /// </summary>
        /// <param name="data">解析后的对象</param>
        /// <param name="message">消息原文</param> 
        /// <remarks>Options.DisableDeserializeObject为true时,data始终为null</remarks>
        public abstract Task OnReceivedAsync(T data, string message);

        /// <summary>
        /// 异常
        /// </summary>
        /// <param name="ex">派生类不重写的话,异常被隐藏</param>
        public virtual void OnConsumerException(Exception ex)
        {

        }
    }

给依赖注入写一些拓展方法

代码语言:javascript复制
    public static class MyRabbiteMQExtensions
    {
        /// <summary>
        /// 初始化消息队列,并添加Publisher到IoC容器
        /// </summary>
        /// <remarks>从Configuration读取"MyRabbbitMQOptions配置项"</remarks>
        public static IServiceCollection AddMyRabbitMQ(this IServiceCollection services, IConfiguration configuration)
        {
            #region 配置项
            // 从Configuration读取"MyRabbbitMQOptions配置项
            var optionSection = configuration.GetSection("MyRabbitMQOptions");

            // 这个myOptions是当前方法使用
            MyRabbitMQOptions myOptions = new();
            optionSection.Bind(myOptions);

            // 加了这行,才可以注入IOptions<MyRabbitMQOptions>或者IOptionsMonitor<MyRabbitMQOptions>
            services.Configure<MyRabbitMQOptions>(optionSection);
            #endregion

            // 加了这行,才可以注入任意类型参数的 IMyPublisher<> 使用
            services.AddTransient(typeof(IMyPublisher<>), typeof(MyPublisher<>));

            // 创建一个工厂对象,并配置单例注入
            services.AddSingleton(new ConnectionFactory
            {
                UserName = myOptions.UserName,
                Password = myOptions.Password,
                HostName = myOptions.Host,
                Port = myOptions.Port
            });

            return services;
        }

        /// <summary>
        /// IServiceCollection的拓展方法,用于发现自定义的EventHandler并添加到服务容器
        /// </summary> 
        /// <param name="types">包含了自定义Handler的类集合,可以使用assembly.GetTypes()</param> 
        /// <remarks>遍历所有types,将继承自IMyEventHandler的类注册到容器</remarks>
        public static IServiceCollection AddMyRabbitMQEventHandlers(this IServiceCollection services, Type[] types)
        {
            var baseType = typeof(IMyEventHandler);

            foreach (var type in types)
            {
                // baseType可以放type,并且type不是baseType
                if (baseType.IsAssignableFrom(type) && baseType != type)
                {
                    // 瞬态注入配置
                    services.AddTransient(typeof(IMyEventHandler), type);
                }
            }

            return services;
        }

        /// <summary>
        /// 给app拓展方法
        /// </summary>
        /// <remarks>
        /// 在IoC容器里获取到所有继承自IMyEvetnHandler的实现类,并开启消费者
        /// </remarks>
        public static IApplicationBuilder UseMyEventHandler(this IApplicationBuilder app)
        {
            var handlers = app.ApplicationServices.GetServices(typeof(IMyEventHandler));
            var factory = app.ApplicationServices.GetService<ConnectionFactory>();

            // 遍历调用自定义的Begin方法
            foreach (var h in handlers)
            {
                var handler = h as IMyEventHandler;
                handler?.Begin(factory.CreateConnection());
            }

            return app;
        }
    }

在Net6 WebApi中使用

program.cs

代码语言:javascript复制
    var builder = WebApplication.CreateBuilder(args);
    builder.Services.AddControllers();


    // 添加MyRabbitMQ到services
    builder.Services.AddMyRabbitMQ(builder.Configuration);
    builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());

    var app = builder.Build();

    // 使用 MyEventHandler
    app.UseMyEventHandler();


    app.MapControllers();
    app.Run();

定义ETO

代码语言:javascript复制
    [QueueName("perry.test")]
    public class PerryTest
    {
        public Guid Id { get; set; }
        public string? Name { get; set; }
        public int Count { get; set; }
        public string? Remark { get; set; }
    }

定义EventHandler

代码语言:javascript复制
    public class PerryTestEventHandler : MyEventHandler<PerryTest>
    {
        public override Task OnReceivedAsync(PerryTest data, string message)
        {
            Console.WriteLine(message);
            return Task.CompletedTask;
        }

        public override void OnConsumerException(Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

控制器中检查是否可以正常使用

代码语言:javascript复制
    [Route("api")]
    [ApiController]
    public class TestController : ControllerBase
    {
        public IMyPublisher<PerryTest> TestPublisher { get; }

        public TestController(IMyPublisher<PerryTest> testPublisher)
        {
            TestPublisher = testPublisher;
        }
        [HttpGet("test")]
        public async Task<string> TestAsync()
        {
            var data = new PerryTest()
            {
                Id = Guid.NewGuid(),
                Name = "AAA",
                Count = 123,
                Remark = "哈哈哈"
            };

            await TestPublisher.PublishAsync(data);

            return "发送了一个消息";
        }
    }

运行截图

参考 .NET Core 使用RabbitMQ,拷贝了一些Demo

文章里的生产者Demo

代码语言:javascript复制
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用户名
        Password = "admin",//密码
        HostName = "192.168.157.130"//rabbitmq ip
    };

    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();
    //声明一个队列
    channel.QueueDeclare("hello", false, false, false, null);

    Console.WriteLine("nRabbitMQ连接成功,请输入消息,输入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish("", "hello", null, sendBytes);
    } while (input.Trim().ToLower()!="exit");
    channel.Close();
    connection.Close();

文章里的消费者Demo

代码语言:javascript复制
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用户名
        Password = "admin",//密码
        HostName = "192.168.157.130"//rabbitmq ip
    };

    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();

    //事件基本消费者
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

    //接收到消息事件
    consumer.Received  = (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body);
        Console.WriteLine($"收到消息: {message}");
        //确认该消息已被消费
        channel.BasicAck(ea.DeliveryTag, false);
    };
    //启动消费者 设置为手动应答消息
    channel.BasicConsume("hello", false, consumer);
    Console.WriteLine("消费者已启动");
    Console.ReadKey();
    channel.Dispose();
    connection.Close();

0 人点赞