ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
MQ:message queue,称之为消息中间件,又叫消息队列。 以双方约定好的方式,接收发送消息,用于网络间的传输;
核心特性:
1.解耦
2.异步
3.降流
MQ的同类产品:
kafka: 大数据
RabbitMQ:Erlang语言,协议是AMQP(Advanced Message Queueing Protoco)
RocketMQ:阿里封装
ActiveMQ :Java语言,协议是JMS(Java Messaging Service )
官网地址-http://activemq.apache.org
下载Linux版本,用于支持高可用
1.上传至/opt目录下,解压
2.配置环境变量
3.启动命令:./activemq start
4:默认后端端口号61616,程序8161
5.进入网页控制台:localhost:8161/admin user:admin pw:admin
安装成功!
从上面的信息简单看下:首页|队列|主题|订阅|连接|网络|调度|发送
用于显示消息的管理。
MQ-HelloWord
引入坐标
代码语言:javascript复制<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
消息:
创建提供者:
代码语言:javascript复制package com.atkk.Controller;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author zhaokkstart
* @create 2019-06-23 16:17
*/
public class MQProduce {
private static final String ACTIVEMQ_URL="tcp://192.168.235.128:61616";
private static final String ACTIVEMQ_name="zhaokk";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_name);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 3; i ) {
TextMessage textMessage = session.createTextMessage("mq消息" i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("---消息发布");
}
}
消息的消费者:
代码语言:javascript复制package com.atkk.Controller;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author zhaokkstart
* @create 2019-06-23 16:17
*/
public class MQConsumer {
private static final String ACTIVEMQ_URL="tcp://192.168.235.128:61616";
private static final String ACTIVEMQ_name="zhaokk";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_name);
MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage receive = (TextMessage) consumer.receive();
if(null!=receive){
System.out.println("接受消息" receive.getText());
}else{
break;
}
}
consumer.close();
session.close();
connection.close();
System.out.println("---消息接受");
}
}
当发送消息后,队列应出现一个提供者,3条消息
当接收消息后,队列应出现一个消费者,接收消息
JMS开发的基本步骤:
消息模式:
点对点:一对一
订阅:一对多
消息默认是用不过期的,
消息的可靠性 如何保证
1.持久性
2.事务
3.签收 Acknowledge
代码语言:javascript复制Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
这里的boolean就是是否开启事务,后面的就是签收方式为自动签收构造方法:
构造方法:
其中:
void setDeliveryMode(int var1) throws JMSException;
持久化方法:DeliveryMode.PERSISTENT传入int值表示支持持久化
特点:队列的消息只能消费一次
总结:这种三种角色的技术理念,
类比dubbo的提供者,注册中心,消费者
redis的主从机制等理念,技术选型需要有消息订阅,降低系统负担,异步解耦的痛点,从而并不是所有业务场景都适合与MQ。
ActiveMQ的官方文档:
http://activemq.apache.org/components/classic/documentation
对比与所有中间件的功能,维度,可理解其他产品的使用,
RabbitMQ 推荐CSDN博主:CherryCHong的博客地址
https://blog.csdn.net/a1786223749/article/details/86138970