RabbitMQ集群整合SpringBoot2.x

2019-08-01 18:32:35 浏览数 (1)

RabbitMQ相信大家已经再熟悉不过了,作为业界四大主流消息中间件之一(Apache RocketMQ、Apache Kafka、Apache ActiveMQ、RabbitMQ),它具有非常好的性能和可靠性的集群模式,不仅仅在各大互联网大厂中广泛使用(比如同程艺龙、美团点评等),而且在互联网金融行业也常常被作为首选!

同时MQ技术也是晋级技术骨干、团队核心的必备技术。

SpringBoot作为互联网开发利器已经不需要我再过多介绍什么,接下来我们一起从零开始构建RabbitMQ、并且与SpringBoot2.x的整合吧!

一、安装RabbitMQ集群十步走!(3.6.5版本、采用rpm安装方式,我们要安装3个节点的集群,192.168.11.71 192.168.11.72 192.168.11.73、以71节点为例:)

第一步:下载所需依赖包

Rabbitmq基于erlang语言编写,所以必须要有。

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

socat包为秘钥包,如果没有会提示缺少此依赖。

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

rabbitmq-server-3.6.5核心服务包。

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

第二步:安装并配置

安装命令:

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

安装成功后:修改配置

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件

修改:loopback_users 中的 <<"guest">>,只保留guest

第三步:3个节点同时进行前两步骤操作:(71、72、73)

第四步:启停单点服务与开启管控台

/etc/init.d/rabbitmq-server start | stop | status | restart

分别启动三个节点,然后执行命令启动控制台插件如下:

rabbitmq-plugins enable rabbitmq_management最后使用 guest/guest登录成功即可!

第五步:接下来进行集群构建:

71、72、73任意一个节点为Master(这里选择71为Master)

也就是说我们需要把71的Cookie文件同步到72、73节点上去,先停止所有服务器:/etc/init.d/rabbitmq-server stop

然后进入制定目录(/var/lib/rabbitmq/)并远程copy 文件到72、73节点,如下操作:scp /var/lib/rabbitmq/.erlang.cookie到192.168.11.72和192.168.11.73中

第六步:组成集群

首先启动三个节点:rabbitmq-server -detached

然后把72和73分别加入到71中,组成集群 [--ram]为节点以什么方式加入到集群中

ram为内存 存储 默认不加为disk磁盘存储,操作如下:

node72:rabbitmqctl stop_app

node72:rabbitmqctl join_cluster [--ram] rabbit@bhz71

node72:rabbitmqctl start_app

第七步:修改集群名称:

rabbitmqctl set_cluster_name rabbitmq_cluster1

第八步:查看集群状态:

rabbitmqctl cluster_status ,如下所示表示集群构建OK!

第九步:构建镜像队列,任意节点执行命令如下:

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

第十步:查看管控台发现集群已经构建成功

二.RabbitMQ与SpringBoot2.x整合

生产者端:

第一步:pom.xml配置如下

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bfxy</groupId>
    <artifactId>rabbitmq-springboot-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitmq-springboot-producer</name>
    <description>rabbitmq-springboot-producer</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

第二步:application.properties配置文件

代码语言:javascript复制
spring.rabbitmq.addresses=192.168.11.71:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true  #confirm模式
spring.rabbitmq.publisher-returns=true   #return机制
spring.rabbitmq.template.mandatory=true  #与return机制结合配置次属性

第三步:编写RabbitSender生产端代码

代码语言:javascript复制
package com.bfxy.springboot.producer;

import java.util.Map;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;  

    //回调函数: confirm确认
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: "   correlationData);
            System.err.println("ack: "   ack);
            if(!ack){
                System.err.println("异常处理....");
            }
        }
    };

    //回调函数: return返回
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("return exchange: "   exchange   ", routingKey: " 
                  routingKey   ", replyCode: "   replyCode   ", replyText: "   replyText);
        }
    };

    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id   时间戳 全局唯一 
        CorrelationData correlationData = new CorrelationData("1234567890");
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }

}

消费者:

第一步:pom文件同生产者一致

第二步:application.properties配置文件

代码语言:javascript复制
spring.rabbitmq.addresses=192.168.11.76:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.listener.simple.acknowledge-mode=manual #手工签收
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

第三步:RabbitRecever消费端代码

代码语言:javascript复制
package com.bfxy.springboot.conusmer;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", 
            durable="true"),
            exchange = @Exchange(value = "exchange-1", 
            durable="true", 
            type= "topic", 
            ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: "   message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}

生产端测试:

代码语言:javascript复制
package com.bfxy.springboot;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.bfxy.springboot.producer.RabbitSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Test
    public void contextLoads() {
    }

    @Autowired
    private RabbitSender rabbitSender;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    @Test
    public void testSender1() throws Exception {
         Map<String, Object> properties = new HashMap<>();
         properties.put("number", "12345");
         properties.put("send_time", simpleDateFormat.format(new Date()));
         rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
    }

}

0 人点赞