ActiveMQ的入门程序

2019-08-01 10:09:20 浏览数 (2)

代码语言:javascript复制
package com.shi.page;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.junit.Test;

/**
 * 
 * @author: SHF
 * @date: 2018年3月16日 上午8:48:10
 * @Description:消息队列测试类
 */
public class ActiveMQTest {

	/**
	 * 点到点形式 发送 消息 生产者
	 * @throws Exception
	 */
	@Test
	public void queueProducerTest()throws Exception{
		//1.创建一个连接工厂对象,需要指定服务的ip和端口
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2.使用工厂对象创建一个Connection对象
		Connection connection = connectionFactory.createConnection();
		//3.开启连接,调用Connection对象的start方法
		connection.start();
		//4.创建一个Session对象
				//第一个参数:是否开启事物。如果开启事物第二个参数无意义。一般不开启事物。
		 		//第二个参数:应答模式,一般:自动应答,手动应答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.使用Session对象创建一个Destination对象,俩种形式queue,topic,现在使用queue
		Queue queue = session.createQueue("test-queue");//Queue extends Destination
		//6.使用Session对象创建一个producer对象
		MessageProducer producer = session.createProducer(queue);
		//7.创建一个Message对像,可以使用TextMessage
		/*TextMessage textMessage=new ActiveMQTextMessage();
		textMessage.setText("你要发送的消息");*/
		TextMessage textMessage = session.createTextMessage("queue你要发送的消息");
		//8.发送消息
		producer.send(textMessage);
		//9.关闭资源
		producer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * 点到点接受消息  消费者
	 * @throws Exception
	 */
	@Test
	public void queueConsumerTest()throws Exception{
		//1 创建一个ConnectionFactory对象连接MQ服务器
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2 创建一个连接对象
		Connection connection = connectionFactory.createConnection();
		//3 开启连接
		connection.start();
		//4 使用Connection对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 创建一个Destination对象 queue对象
		Queue queue = session.createQueue("test-queue");
		//6 使用Session对象创建一个消费者对象
		MessageConsumer consumer = session.createConsumer(queue);
		//7 接受消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message paramMessage) {
				// 接受到消息的回调函数
				TextMessage testMessage=(TextMessage) paramMessage;
				try {
					//8 打印消息
					System.out.println(testMessage.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		System.in.read();//等待接受消息
		//9 关闭连接
		consumer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * 一对多 发送消息  生产者
	 * @throws Exception
	 */
	@Test
	public void topicProducerTest()throws Exception{
		//1.创建一个连接工厂对象,需要指定服务的ip和端口
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2.使用工厂对象创建一个Connection对象
		Connection connection = connectionFactory.createConnection();
		//3.开启连接,调用Connection对象的start方法
		connection.start();
		//4.创建一个Session对象
				//第一个参数:是否开启事物。如果开启事物第二个参数无意义。一般不开启事物。
		 		//第二个参数:应答模式,一般:自动应答,手动应答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.使用Session对象创建一个Destination对象,俩种形式queue,topic,现在使用topic
		Topic topic = session.createTopic("test-topic");//Queue extends Destination
		//6.使用Session对象创建一个producer对象
		MessageProducer producer = session.createProducer(topic);
		//7.创建一个Message对像,可以使用TextMessage
		/*TextMessage textMessage=new ActiveMQTextMessage();
		textMessage.setText("你要发送的消息");*/
		TextMessage textMessage = session.createTextMessage("topic你要发送的消息");
		//8.发送消息
		producer.send(textMessage);
		//9.关闭资源
		producer.close();
		session.close();
		connection.close();
	}
	
	
	/**
	 * 一对多接受消息  消费者
	 * @throws Exception
	 */
	@Test
	public void topicConsumerTest()throws Exception{
		//1 创建一个ConnectionFactory对象连接MQ服务器
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2 创建一个连接对象
		Connection connection = connectionFactory.createConnection();
		//3 开启连接
		connection.start();
		//4 使用Connection对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 创建一个Destination对象 topic对象
		Topic topic = session.createTopic("test-topic");
		//6 使用Session对象创建一个消费者对象
		MessageConsumer consumer = session.createConsumer(topic);
		//7 接受消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message paramMessage) {
				// 接受到消息的回调函数
				TextMessage testMessage=(TextMessage) paramMessage;
				try {
					//8 打印消息
					System.out.println(testMessage.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		System.out.println("topic消费者3 已经启动...");
		System.in.read();//等待接受消息
		//9 关闭连接
		consumer.close();
		session.close();
		connection.close();
	}
}

0 人点赞