一、消息三大属性:
1、消息头
2、消息体
3、消息属性
二、消息类型
1、TextMessage
2、MapMessage
3、ObjectMessage
4、BytesMessage
5、StreamMessage
三、消息持久化
一、参数说明
1、非持久化
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
非持久化,当mq宕机后消息不存在
2、持久化(消息默认是持久化)
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
持久化,当mq宕机后消息存在
二、持久化消息
1、非持久化队列
生产者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/2 17:04
* @Version: 1.0
*/
public class ActiveMQTest {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//队列名称
private static final String QUEUE_NAME="queue01";
//主题名称
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题),这里是创建队列
Queue queue=session.createQueue(QUEUE_NAME);
//5、创建消息生产者,队列模式
MessageProducer messageProducer = session.createProducer(queue);
//6、通过messageProducer生产三条消息发送到MQ消息队列中
for (int i=0;i<3;i ){
//7、创建消息
TextMessage textMessage = session.createTextMessage("msg----->" i);//创建一个文本消息
//消息属性
textMessage.setStringProperty("c01","vip");
//8、通过messageProducer发送给mq
messageProducer.send(textMessage);
//9、数据非持久化
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/3 8:47
* @Version: 1.0
*/
public class ActiveMQConsumer {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//队列名称
private static final String QUEUE_NAME="queue01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、这里接受的queue的名称要和发送者的一致
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//6、通过监听的方式消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//如果message不等于null并且属于TextMessage类型(因为消息发送的类型是TextMessage,所以这里判断是否是这个类型)
if(null!=message&&message instanceof TextMessage){
TextMessage textMessage=(TextMessage)message;
try {
System.out.println(textMessage.getText());
//获取消息属性
System.out.println("消息属性--->" textMessage.getStringProperty("c01"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//7、保证控制台一直在运行
System.in.read();
//8、闭资源
consumer.close();
session.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
2、非持久化主题
生产者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/4 9:29
* @Version: 1.0
*/
public class ActiveMQTopicTest {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//主题名称
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题),这里是创建主题
Topic topic=session.createTopic(TOPIC_NAME);
//5、创建消息生产者,主题模式
MessageProducer messageProducer = session.createProducer(topic);
//6、通过messageProducer生产三条消息发送到MQ消息主题中
for (int i=0;i<3;i ){
//7、创建消息
TextMessage textMessage = session.createTextMessage("msg----->" i);//创建一个文本消息
//8、通过messageProducer发送给mq
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/4 9:43
* @Version: 1.0
*/
public class ActiveMQTopicConsumer {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//主题名称
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、这里接受的queue的名称要和发送者的一致
Topic topic = session.createTopic(TOPIC_NAME);
//5、创建消费者
MessageConsumer consumer = session.createConsumer(topic);
//6、通过监听的方式消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//如果message不等于null并且属于TextMessage类型(因为消息发送的类型是TextMessage,所以这里判断是否是这个类型)
if(null!=message&&message instanceof TextMessage){
TextMessage textMessage=(TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//7、保证控制台一直在运行
System.in.read();
//8、闭资源
consumer.close();
session.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
3、持久化队列
生产者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/2 17:04
* @Version: 1.0
*/
public class ActiveMQTest {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//队列名称
private static final String QUEUE_NAME="queue01";
//主题名称
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题),这里是创建队列
Queue queue=session.createQueue(QUEUE_NAME);
//5、创建消息生产者,队列模式
MessageProducer messageProducer = session.createProducer(queue);
//6、通过messageProducer生产三条消息发送到MQ消息队列中
for (int i=0;i<3;i ){
//7、创建消息
TextMessage textMessage = session.createTextMessage("msg----->" i);//创建一个文本消息
//消息属性
textMessage.setStringProperty("c01","vip");
//8、通过messageProducer发送给mq
messageProducer.send(textMessage);
//9、数据持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/3 8:47
* @Version: 1.0
*/
public class ActiveMQConsumer {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//队列名称
private static final String QUEUE_NAME="queue01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、这里接受的queue的名称要和发送者的一致
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//6、通过监听的方式消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//如果message不等于null并且属于TextMessage类型(因为消息发送的类型是TextMessage,所以这里判断是否是这个类型)
if(null!=message&&message instanceof TextMessage){
TextMessage textMessage=(TextMessage)message;
try {
System.out.println(textMessage.getText());
//获取消息属性
System.out.println("消息属性--->" textMessage.getStringProperty("c01"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//7、保证控制台一直在运行
System.in.read();
//8、闭资源
consumer.close();
session.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
4、持久化主题
生产者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/4 9:29
* @Version: 1.0
*/
public class ActiveMQTopicTest {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//主题名称
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题),这里是创建主题
Topic topic=session.createTopic(TOPIC_NAME);
//5、创建消息生产者,主题模式
MessageProducer messageProducer = session.createProducer(topic);
//持久化数据
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
//6、通过messageProducer生产三条消息发送到MQ消息主题中
for (int i=0;i<3;i ){
//7、创建消息
TextMessage textMessage = session.createTextMessage("msg----->" i);//创建一个文本消息
//8、通过messageProducer发送给mq
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.bdqn.test
* @Author: huat
* @Date: 2020/1/4 9:43
* @Version: 1.0
*/
public class ActiveMQTopicConsumer {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//主题名称
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
//订阅名称
connection.setClientID("test01");
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、这里接受的queue的名称要和发送者的一致
Topic topic = session.createTopic(TOPIC_NAME);
//5、持久化的订阅者,第一个参数为订阅主题名称,第二个为备注
TopicSubscriber topicSubscriber=session.createDurableSubscriber(topic,"remakr");
//6、启动连接
connection.start();
//receive等待消息,不限制时间
Message message = topicSubscriber.receive();
while (null!=message){
TextMessage textMessage=(TextMessage)message;
System.out.println("******------>" textMessage.getText());
//等待五秒钟
message=topicSubscriber.receive(5000L);
}
session.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
主题持久化注意事项
1、如果消费者注册关闭后,消息提供者在消费者关闭后在发送消息,消费者再打开,会收到消息提供者发送的消息。
2、如果在消费者没有注册之前,消息提供者发送消息,消费者不会收到之前的消息
总结:
1、一定要先运行一次消费者,等同于向MQ注册,类似我订阅了这个主题
2、然后在运行消费者发送消息
3、无论消费者是否在线,都会收到消息,不在线的话,下次连接时会把没有收到过的消息全部接受。