一、基本概念:
ActiveMQ中共分为两种:queue和topic
queue:在点对点消息传递域中,目的地被称为队列(一对一)
topic:在发布订阅消息中,目的地被称为主题(一对多)
特点:1、生产者将消息发布到topic中,每个消息可以有多个消费者,属于一对多的关系
2、生产者和消费者有时间上的相关性,订阅某个主题的消费者只能消费自他订阅以后发布到消息
3、生产者生产消息时,topic是不保存消息它是无状态不落地的,假如无人订阅就生产消息即生产了一条废消息,所以一般先启动消费者,再启动生产者;
二、创建maven工程,并引入依赖,这里我创建的springboot项目,所以引入的依赖如下:
依赖:
代码语言:javascript复制<!--activemq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
三、主题测试
消息生产者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/4 9:29
* @Version: 1.0
*/
public class ActiveMQTopicTest {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//主题名称
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题),这里是创建主题
Topic topic=session.createTopic(TOPIC_NAME);
//5、创建消息生产者,主题模式
MessageProducer messageProducer = session.createProducer(topic);
//6、通过messageProducer生产三条消息发送到MQ消息主题中
for (int i=0;i<3;i ){
//7、创建消息
TextMessage textMessage = session.createTextMessage("msg----->" i);//创建一个文本消息
//8、通过messageProducer发送给mq
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消息消费者
代码语言:javascript复制import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ProjectName: springbootActiveMQ
* @Package: cn.**.test
* @Author: huat
* @Date: 2020/1/4 9:43
* @Version: 1.0
*/
public class ActiveMQTopicConsumer {
//url路径
private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
//主题名称
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1、创建连接工厂
//如果账号密码没有修改的话,账号密码默认均为admin
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
//如果账号密码修改的话
//第一个参数为账号,第二个为密码,第三个为请求的url
//ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
try {
//2、通过连接工厂获取连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建session会话
//里面会有两个参数,第一个为事物,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、这里接受的topic的名称要和发送者的一致
Topic topic = session.createTopic(TOPIC_NAME);
//5、创建消费者
MessageConsumer consumer = session.createConsumer(topic);
//6、通过监听的方式消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//如果message不等于null并且属于TextMessage类型(因为消息发送的类型是TextMessage,所以这里判断是否是这个类型)
if(null!=message&&message instanceof TextMessage){
TextMessage textMessage=(TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//7、保证控制台一直在运行
System.in.read();
//8、闭资源
consumer.close();
session.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}