RabbitMQ消息队列的发布-消费

2023-10-21 18:41:36 浏览数 (2)

1. 生产者 

RabbitMQ_Producer

代码语言:javascript复制
    static void Main(string[] args)
        {
            string path = AppDomain.CurrentDomain.BaseDirectory;
            string tag = path.Split('/', '\').Last(s => !string.IsNullOrEmpty(s));
            Console.WriteLine($"这里是 {tag} 启动了。。");



            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。
            factory.UserName = "guest";//默认用户名,用户可以在服务端自定义创建,有相关命令行
            factory.Password = "guest";//默认密码

            using (var connection = factory.CreateConnection())//连接服务器,即正在创建终结点。
            {
                //创建一个通道,这个就是Rabbit自己定义的规则了,如果自己写消息队列,这个就可以开脑洞设计了
                //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
                using (var channel = connection.CreateModel())
                {
                    //删除队列
                    channel.QueueDelete("ProducerWrites");
                    //删除交换机
                    channel.ExchangeDelete("ProducerWritesExChange");
                    

                    //创建队列
                    channel.QueueDeclare(queue: "ProducerWrites", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //创建交换机
                    channel.ExchangeDeclare(exchange: "ProducerWritesExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //交换机和队列绑定
                    channel.QueueBind(queue: "ProducerWrites", exchange: "ProducerWritesExChange", routingKey: "advanced", arguments: null);
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine($"生产者{tag}已准备就绪~~~");

                    // 1. 生产消息
                    for (int i = 0; true; i  )
                    {
                        IBasicProperties basicProperties = channel.CreateBasicProperties();
                        basicProperties.Persistent = true;
                        //basicProperties.DeliveryMode = 2;
                        string message = $"{tag}:生产者生产消息_{i   1}";
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "ProducerWritesExChange",
                                          routingKey: "advanced",
                                          basicProperties: basicProperties,
                                          body: body);
                        Console.WriteLine($"{message} 已发送~");
                        Thread.Sleep(500);
                    }            
                
                }
            }
          
        }

2. 消费者

RabbitMQ_Consumer

代码语言:javascript复制
  static void Main(string[] args)
        {
            string path = AppDomain.CurrentDomain.BaseDirectory;
            string tag = path.Split('/', '\').Last(s => !string.IsNullOrEmpty(s));
            Console.WriteLine($"这里是 {tag} 启动了。。");

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        channel.QueueDeclare(queue: "ProducerWrites", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.ExchangeDeclare(exchange: "ProducerWritesExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        channel.QueueBind(queue: "ProducerWrites", exchange: "ProducerWritesExChange", routingKey: "advanced", arguments: null);

                        //2. 消费消息
                        //rabbitMq消费消息是通过事件驱动的:
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received  = (model, ea) =>  //如果有消息进入到Rabbitmq,就会触发这个事件来完成消息的消费;
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"{tag}接受消息: {message}");
                        };
                        channel.BasicConsume(queue: "ProducerWrites",
                                     autoAck: true,
                                     consumer: consumer);
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            }
        }

3. 准备1个生产者,2个消费者效果图

0 人点赞