1、向ActiveMQ中放入消息
代码语言:javascript
复制import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.BytesMessage;
import javax.jms.BytesMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Putmsg {
// tcp 地址, tcp://localhost:61616
private String url;
private String user;
private String pwd;
//目标,队列或Topic名称
private String qName;
Session session = null;
MessageProducer producer = null;
//目标,TOPIC相关
TopicSession tsession = null;
TopicPublisher publisher = null;
/**
*
* @param url
* @param user
* @param pwd
* @param qName
*/
public Putmsg(String url, String user, String pwd, String qName){
this.url = url;
this.user = user;
this.pwd = pwd;
this.qName = qName;
}
/**
* <b>function:</b> 发送消息
* @param session
* @param producer
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < 5; i ) {
String message = "发送消息第" (i 1) "条";
BytesMessage text = session.createBytesMessage();
System.out.println(message);
producer.send(text);
}
}
/**
* 将指定数据放入到AMQ中
* @param destPath 目录下所有文本,放入到AMQ中
* @throws Exception
*/
public void sendMsg4Path(String destPath){
try {
File direct=new File(destPath);
File[] tempList = direct.listFiles();
System.out.println("该目录下需要放入到MQ的文件个数:" tempList.length);
int count = 0;
for (int i = 0; i < tempList.length; i ) {
if (tempList[i].isFile()) {
try {
//遍历文件并生成对应的字节码文件到目录中
File file = new File(tempList[i].getAbsolutePath());
//可以换成工程目录下的其他文本文件
FileInputStream fis= new FileInputStream(file);
//获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
ByteArrayOutputStream bytestream = new ByteArrayOutputStream();
// ByteArrayOutputStream 是OutputStream的一个实现类
int ch = 0;
//byte[] msg = null;
while (true) {
//取得文本对应的16进制数据
ch = fis.read();
if(ch==-1) break;
//将FileInputStream 的内容写到 ByteArrayOutputStream 中
bytestream.write(ch);
}
bytestream.close();
//关闭文件
fis.close();
byte imgdata[] = bytestream.toByteArray();
BytesMessage text = session.createBytesMessage();
text.writeBytes(imgdata);
producer.send(text);
//TODO setReadOnlyBody(true),输出其长度
text.reset();
System.out.println("len = " text.getBodyLength());
count = i 1;
// System.out.println("Put the " count " file into the MQ! " tempList[i]);
} catch (Exception e) {
e.printStackTrace();
}
}//判断是否为文件
}//在指定目录下循环取文件
System.out.println("Put " count " files all fininshed!");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 将指定数据放入到AMQ中
* @param destPath 目录
* @param fileName 文件名,放入到AMQ中的内容
* @throws Exception
*/
public void sendMsg4File(String destPath, String fileName){
try {
File direct=new File(destPath);
File[] tempList = direct.listFiles();
System.out.println("该目录下需要放入到MQ的文件个数:" tempList.length);
int count = 0;
for (int i = 0; i < tempList.length; i ) {
if (tempList[i].isFile() && tempList[i].getName().contains(fileName)) {
try {
//遍历文件并生成对应的字节码文件到目录中
File file = new File(tempList[i].getAbsolutePath());
//可以换成工程目录下的其他文本文件
FileInputStream fis= new FileInputStream(file);
//获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
ByteArrayOutputStream bytestream = new ByteArrayOutputStream();
// ByteArrayOutputStream 是OutputStream的一个实现类
int ch = 0;
//byte[] msg = null;
while (true) {
//取得文本对应的16进制数据
ch = fis.read();
if(ch==-1) break;
//将FileInputStream 的内容写到 ByteArrayOutputStream 中
bytestream.write(ch);
}
byte imgdata[] = bytestream.toByteArray();
bytestream.close();
fis.close();
BytesMessage text = session.createBytesMessage();
text.writeBytes(imgdata);
producer.send(text);
count =1;
// System.out.println("Put the " count " file into the MQ! " tempList[i]);
} catch (Exception e) {
e.printStackTrace();
}
}//判断是否为文件
}//在指定目录下循环取文件
System.out.println("Put " count " files all fininshed!");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 将指定数据放入到AMQ中
* @param destPath 目录,或放入到AMQ中的内容
* 当传入的参数既不是目录也不是文件,就把该参数放入到AMQ中
* @throws Exception
*/
public void sendMsg4Str(String msg){
BytesMessage message;
try {
message = session.createBytesMessage();
byte[] bmsg = msg.getBytes();
message.writeBytes(bmsg);
System.out.println(msg);
producer.send(message);
// TODO
message.reset();
System.out.println("len = " message.getBodyLength());
} catch (JMSException e) {
e.printStackTrace();
}
}
public void putmsg2amq() throws Exception {
Connection connection = null;
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(
user, pwd, url);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(qName);
// 创建消息生产者
producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
/**
* 数据同步,将具体报文内容发送到AMQ
* @param msg
* @throws Exception
*/
public void putmsg2amq(String msg) throws Exception {
Connection connection = null;
// Session session = null;
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(
user, pwd, url);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(qName);
// 创建消息生产者
producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMsg4Str(msg);
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
/**
* 将指定目录下所有文件中的内容,发送到AMQ
* @param msg
* @throws Exception
*/
public void putmsg2amqPath(String path) {
Connection connection = null;
try{
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(
user, pwd, url);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(qName);
// 创建消息生产者
producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMsg4Path(path);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 将指定目录下,指定文件的内容发送到AMQ
* @param msg
* @throws Exception
*/
public void putmsg2amqFile(String path, String file) {
Connection connection = null;
try{
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(
user, pwd, url);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(qName);
// 创建消息制作者
producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMsg4File(path, file);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
*
* @param msg
* @throws Exception
*/
public void putmsg2amqPath(String destPath, String filename) {
Connection connection = null;
try{
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(
user, pwd, url);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(qName);
// 创建消息制作者
producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
try {
File direct=new File(destPath);
File[] tempList = direct.listFiles();
System.out.println("该目录下需要放入到MQ的文件个数:" tempList.length);
int count = 0;
for (int i = 0; i < tempList.length; i ) {
if (tempList[i].isFile()) {
try {
//遍历文件并生成对应的字节码文件到目录中
File file = new File(tempList[i].getAbsolutePath());
//可以换成工程目录下的其他文本文件
FileInputStream fis= new FileInputStream(file);
//获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
ByteArrayOutputStream bytestream = new ByteArrayOutputStream();
// ByteArrayOutputStream 是OutputStream的一个实现类
int ch = 0;
while (true) {
//取得文本对应的16进制数据
ch = fis.read();
if(ch==-1) break;
//将FileInputStream 的内容写到 ByteArrayOutputStream 中
bytestream.write(ch);
}
bytestream.close();
//关闭文件
fis.close();
byte imgdata[] = bytestream.toByteArray();
BytesMessage text = session.createBytesMessage();
text.writeBytes(imgdata);
producer.send(text);
count = i 1;
// System.out.println("Put the " count " file into the MQ! " tempList[i]);
} catch (Exception e) {
e.printStackTrace();
}
}//判断是否为文件
}//在指定目录下循环取文件
System.out.println("Put " count " files all fininshed!");
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 向TOPIC中放入消息
* @throws Exception
*/
public void putmsg2Topic(String path){
TopicConnection connection = null;
try{
try {
// 创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(
user,
pwd,
url);
// 通过工厂创建一个连接
connection = factory.createTopicConnection();
// 启动连接
connection.start();
// 创建一个session会话
tsession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Topic topic = tsession.createTopic(qName);
// 创建消息发送者
publisher = tsession.createPublisher(topic);
// 设置持久化模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(tsession, publisher);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
// 关闭释放资源
if (tsession != null) {
tsession.close();
}
if (connection != null) {
connection.close();
}
}
}catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 将指定数据放入到AMQ的TOPIC中
* @param destPath 目录下所有文本,放入到AMQ TOPIC中
* @throws Exception
*/
public void sendMsgTopic4Path(String destPath){
try {
File direct=new File(destPath);
File[] tempList = direct.listFiles();
System.out.println("该目录下需要放入到MQ的文件个数:" tempList.length);
int count = 0;
for (int i = 0; i < tempList.length; i ) {
if (tempList[i].isFile()) {
try {
//遍历文件并生成对应的字节码文件到目录中
File file = new File(tempList[i].getAbsolutePath());
//可以换成工程目录下的其他文本文件
FileInputStream fis= new FileInputStream(file);
//获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
ByteArrayOutputStream bytestream = new ByteArrayOutputStream();
// ByteArrayOutputStream 是OutputStream的一个实现类
int ch = 0;
//byte[] msg = null;
while (true) {
//取得文本对应的16进制数据
ch = fis.read();
if(ch==-1) break;
//将FileInputStream 的内容写到 ByteArrayOutputStream 中
bytestream.write(ch);
}
bytestream.close();
//关闭文件
fis.close();
byte imgdata[] = bytestream.toByteArray();
BytesMessage text = tsession.createBytesMessage();
text.writeBytes(imgdata);
publisher.send(text);
count = i 1;
// System.out.println("Put the " count " file into the MQ! " tempList[i]);
} catch (Exception e) {
e.printStackTrace();
}
}//判断是否为文件
}//在指定目录下循环取文件
System.out.println("Put " count " files all fininshed!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、从ActiveMQ中取出消息
代码语言:javascript
复制import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
/**
* <b>function:</b> 消息接收者
*/
public class Getmsg {
private String url;
private String user;
private String pwd;
//目标,队列或Topic名称
private String qName;
Session session = null;
MessageProducer producer = null;
//目标,TOPIC相关
TopicSession tsession = null;
TopicPublisher publisher = null;
/**
*
* @param url
* @param user
* @param pwd
* @param qName
*/
public Getmsg(String url, String user, String pwd, String qName){
this.url = url;
this.user = user;
this.pwd = pwd;
this.qName = qName;
}
public BytesMessage getmsg() {
BytesMessage text = null;
Connection connection = null;
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(
user, pwd, url);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(qName);
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 接收数据的时间(等待) 100 ms
Message message = consumer.receive(100);
text = (BytesMessage) message;
} catch (Exception e) {
e.getStackTrace();
} finally {
// 关闭释放资源
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
return text;
}
public static void main(String[] args) throws Exception {
//从AMQ队列取得数据并存入文件中
Getmsg g = new Getmsg("tcp://localhost:61616","amq", "123456", "testmq");
BytesMessage bm = g.getmsg();
int msgLenth = (int)bm.getBodyLength();
byte[] bmArr = new byte[msgLenth];
bm.readBytes(bmArr);
File file = new File("D:/test.txt");
FileOutputStream fos = new FileOutputStream(file);
BufferedOutputStream bs = new BufferedOutputStream(fos);
bs.write(bmArr);
bs.close();
fos.close();
}
}
代码语言:javascript
复制import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
/**
* <b>function:</b> 消息接收者
*/
public class Jmx4Amq {
private String uri;
private String user;
private String pwd;
//目标,队列或Topic名称
private String qName;
private BrokerViewMBean mBean = null;
private MBeanServerConnection connection = null;
private JMXConnector connector = null;
/**
*
* @param url
* @param user
* @param pwd
* @param qName
*/
public Jmx4Amq(String uri, String user, String pwd){
this.uri = uri;
this.user = user;
this.pwd = pwd;
}
/**
* 对JMX连接中的对象进行初始化
*/
public void getStatus(){
try {
HashMap<String, Object> prop = new HashMap<String, Object>();
//jmx.password
String[] au = {user,pwd};
prop.put(JMXConnector.CREDENTIALS, au);
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"
uri "/jmxrmi");
connector = JMXConnectorFactory.connect(url, prop);
connector.connect();
connection = connector.getMBeanServerConnection();
// 需要注意的是,这里的jms-broker必须和上面配置的名称相同
ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=Broker_Name");
mBean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection,
name, BrokerViewMBean.class, true);
for(ObjectName queueName : mBean.getQueues()) {
QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler.
newProxyInstance(connection, queueName, QueueViewMBean.class, true);
// 消息队列名称
System.out.println(queueMBean.getName());
// 队列中剩余的消息数
System.out.println(queueMBean.getQueueSize());
// 消费者数
System.out.println(queueMBean.getConsumerCount());
// 入队数
System.out.println(queueMBean.getEnqueueCount());
// 出队数
System.out.println(queueMBean.getDequeueCount());
}
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 清空队列中的数据
*/
public void clearMsg(){
try {
//遍历AMQ中的对象
for(ObjectName queueName : mBean.getQueues()) {
QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler
.newProxyInstance(connection, queueName, QueueViewMBean.class, true);
//找到匹配队列,执行purge操作
if(queueMBean.getName().equals(qName)){
queueMBean.purge();
break;
}
}
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 关闭JMX连接
*/
public void closeJmxConn(){
try {
connector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//从AMQ取得数据
Jmx4Amq g = new Jmx4Amq("localhost:11099","admin","cacikf88");
g.getStatus();
}
}