代码语言:javascript复制
package com.shi.page;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.junit.Test;
/**
*
* @author: SHF
* @date: 2018年3月16日 上午8:48:10
* @Description:消息队列测试类
*/
public class ActiveMQTest {
/**
* 点到点形式 发送 消息 生产者
* @throws Exception
*/
@Test
public void queueProducerTest()throws Exception{
//1.创建一个连接工厂对象,需要指定服务的ip和端口
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
//2.使用工厂对象创建一个Connection对象
Connection connection = connectionFactory.createConnection();
//3.开启连接,调用Connection对象的start方法
connection.start();
//4.创建一个Session对象
//第一个参数:是否开启事物。如果开启事物第二个参数无意义。一般不开启事物。
//第二个参数:应答模式,一般:自动应答,手动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个Destination对象,俩种形式queue,topic,现在使用queue
Queue queue = session.createQueue("test-queue");//Queue extends Destination
//6.使用Session对象创建一个producer对象
MessageProducer producer = session.createProducer(queue);
//7.创建一个Message对像,可以使用TextMessage
/*TextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("你要发送的消息");*/
TextMessage textMessage = session.createTextMessage("queue你要发送的消息");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
/**
* 点到点接受消息 消费者
* @throws Exception
*/
@Test
public void queueConsumerTest()throws Exception{
//1 创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
//2 创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3 开启连接
connection.start();
//4 使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5 创建一个Destination对象 queue对象
Queue queue = session.createQueue("test-queue");
//6 使用Session对象创建一个消费者对象
MessageConsumer consumer = session.createConsumer(queue);
//7 接受消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message paramMessage) {
// 接受到消息的回调函数
TextMessage testMessage=(TextMessage) paramMessage;
try {
//8 打印消息
System.out.println(testMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();//等待接受消息
//9 关闭连接
consumer.close();
session.close();
connection.close();
}
/**
* 一对多 发送消息 生产者
* @throws Exception
*/
@Test
public void topicProducerTest()throws Exception{
//1.创建一个连接工厂对象,需要指定服务的ip和端口
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
//2.使用工厂对象创建一个Connection对象
Connection connection = connectionFactory.createConnection();
//3.开启连接,调用Connection对象的start方法
connection.start();
//4.创建一个Session对象
//第一个参数:是否开启事物。如果开启事物第二个参数无意义。一般不开启事物。
//第二个参数:应答模式,一般:自动应答,手动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个Destination对象,俩种形式queue,topic,现在使用topic
Topic topic = session.createTopic("test-topic");//Queue extends Destination
//6.使用Session对象创建一个producer对象
MessageProducer producer = session.createProducer(topic);
//7.创建一个Message对像,可以使用TextMessage
/*TextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("你要发送的消息");*/
TextMessage textMessage = session.createTextMessage("topic你要发送的消息");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
/**
* 一对多接受消息 消费者
* @throws Exception
*/
@Test
public void topicConsumerTest()throws Exception{
//1 创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
//2 创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3 开启连接
connection.start();
//4 使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5 创建一个Destination对象 topic对象
Topic topic = session.createTopic("test-topic");
//6 使用Session对象创建一个消费者对象
MessageConsumer consumer = session.createConsumer(topic);
//7 接受消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message paramMessage) {
// 接受到消息的回调函数
TextMessage testMessage=(TextMessage) paramMessage;
try {
//8 打印消息
System.out.println(testMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic消费者3 已经启动...");
System.in.read();//等待接受消息
//9 关闭连接
consumer.close();
session.close();
connection.close();
}
}