ActiveMQ消息中间件简单配置

2023-11-27 14:50:04 浏览数 (2)

消息中间件

消息中间件

什么是消息中间件

概述

消息中间件可以理解成就是一个服务软件,保存信息的容器,比如生活中的快递云柜. 我们把数据放到消息中间件当中, 然后通知对应的服务进行获取 消息中间件是在消息的传输过程中保存信息的容器

消息中间件应用场景

  1. 使用消息服务器当做大的队列使用, 先进先出, 来处理高并发写入操作
  2. 使用消息服务器可以将业务系统的串行执行改为并行执行, 处理效率高, 更合理的榨取服务器的性能.

同步与异步技术

同步技术

dubbo是一中同步技术, 实时性高, controller调用service项目, 调用就执行, 如果service项目中的代码没有执行完, controller里面的代码一致等待结果.

异步技术

mq消息中间件技术(jms) 是一种异步技术, 消息发送方, 将消息发送给消息服务器, 消息服务器未必立即处理.什么时候去处理, 主要看消息服务器是否繁忙, 消息进入服务器后会进入队列中, 先进先出.实时性不高.

JMS

概述:

jms的全称叫做Java message service (Java消息服务) jms是jdk底层定义的规范 各大厂商都是实现这个规范的技术

jms消息服务器同类型技术

ActiveMQ:是apache的一个比较老牌的消息中间件, 它比较均衡, 既不是最安全的, 也不是最快的. RabbitMQ:是阿里巴巴的一个消息中间件, 更适合金融类业务, 它对数据的安全性比较高.能够保证数据不丢失. Kafka:Apache下的一个子项目。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;适合处理海量数据。

JMS中支持的消息类型:

TextMessage: 一个字符串对象 MapMessage:key-value ObjectMessage:一个序列化的 Java 对象 BytesMessage:一个字节的数据流 StreamMessage:Java 原始值的数据流

JMS中的两种发送模式

点对点模式

一个发送方, 一个接收方. 也可以多个发送方, 一个接收方, 主要是接收方必须是第一个.

订阅发布模式

一个发送方, 多个接收方. 发送方也可以是多个, 主要看接收方, 接收方必须是多个

ActiveMQ安装

链接: https://pan.baidu.com/s/1B0ZW3_Z3xcamUCniNjd10Q 提取码: avr2

  1. 将apache-activemq-5.12.0-bin.tar.gz 上传至Linux服务器
  1. 解压此文件 tar zxvf apache-activemq-5.12.0-bin.tar.gz
  2. 为apache-activemq-5.12.0目录赋权 chmod 777 apache-activemq-5.12.0
  3. 进入apache-activemq-5.12.0bin目录赋与执行权限 cd /usr/local/apache-activemq-5.12.0/bin chmod 755 activemq
  4. 启动 ./activemq start
  5. 在浏览器当中输入http://192.168.0.106:8161/ ( ip:8161)
  1. 进入管理页面
  1. 用户名和密码都是 admin
说明
  1. Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
  2. Number Of Consumers :消费者 这个是消费者端的消费者数量
  3. Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
  4. Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。

消息服务器小例子

  1. 创建普通maven Jar工程
  1. 引入pom依赖
代码语言:javascript复制
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>
    </dependencies>
点对点模式Queue

创建QueueProducer

代码语言:javascript复制
public class QueueProducer {
    public static void main(String[] args) throws Exception{
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象, 指定发送的队列名称, 队列名称可以随意起名, 但是发送到哪里, 就要从哪里去接收
        Queue queue = session.createQueue("test-queue");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("Hello ActiveMQ");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

创建QueueConsumer

代码语言:javascript复制
public class QueueConsumer {
    public static void main(String[] args) throws Exception{
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(queue);
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:" textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();

    }
}

运行QueueProducer后运行QueueConsumer

订阅发布模式Topic

TopicConsumer1

代码语言:javascript复制
public class TopicConsumer1 {
    public static void main(String[] args) throws Exception {
//1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(topic);
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:" textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();

    }
}

TopicConsumer2

代码语言:javascript复制
public class TopicConsumer2 {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(topic);

        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:" textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

TopicProducer

代码语言:javascript复制
public class TopicProducer {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("Hello Topic ActiveMQ");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();

    }
}

0 人点赞