使用JAVA获取ActiveMQ队列数据和状态

2022-11-13 13:31:20 浏览数 (2)

1、向ActiveMQ中放入消息

代码语言:javascript复制
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.BytesMessage;
import javax.jms.BytesMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;



public class Putmsg {
    // tcp 地址, tcp://localhost:61616
    private String url;
    private String user;
    private String pwd;
    //目标,队列或Topic名称
    private String qName;
    Session session = null;
    MessageProducer producer = null;
    //目标,TOPIC相关
    TopicSession tsession = null;
    TopicPublisher publisher = null;

    
    /**
     * 
     * @param url
     * @param user
     * @param pwd
     * @param qName
     */
    public Putmsg(String url, String user, String pwd, String qName){
        this.url = url;
        this.user = user;
        this.pwd = pwd;
        this.qName = qName;
    }
    
    /**
     * <b>function:</b> 发送消息
     * @param session
     * @param producer
     * @throws Exception
     */    
    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 0; i < 5; i  ) {
            String message = "发送消息第"   (i   1)   "条";
            BytesMessage text = session.createBytesMessage();
            
            System.out.println(message);
            producer.send(text);
        }
    }
    
    /**
     * 将指定数据放入到AMQ中
     * @param destPath 目录下所有文本,放入到AMQ中
     * @throws Exception
     */
    public void sendMsg4Path(String destPath){
        try {            
              File direct=new File(destPath);
              
              File[] tempList = direct.listFiles();
              System.out.println("该目录下需要放入到MQ的文件个数:" tempList.length);
    
              int count = 0;
              
              for (int i = 0; i < tempList.length; i  ) {
                  
                  if (tempList[i].isFile()) {
                      try {
                        //遍历文件并生成对应的字节码文件到目录中
                        File file = new File(tempList[i].getAbsolutePath());
                        
                        //可以换成工程目录下的其他文本文件
                        FileInputStream fis= new FileInputStream(file);
                        //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
                        ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); 
                        // ByteArrayOutputStream 是OutputStream的一个实现类 
                        int ch = 0;
                        //byte[] msg = null;
                        
                        while (true) {
                            //取得文本对应的16进制数据
                            ch = fis.read(); 
                            if(ch==-1) break;
                            //将FileInputStream 的内容写到 ByteArrayOutputStream 中
                            bytestream.write(ch);  
                        }
                        
                        bytestream.close();
                        //关闭文件
                        fis.close();
                        
                        byte imgdata[] = bytestream.toByteArray();

                        BytesMessage text = session.createBytesMessage();
                        text.writeBytes(imgdata);                
                        
                        
                        producer.send(text);
                        
                        //TODO setReadOnlyBody(true),输出其长度
                        text.reset();
                        System.out.println("len = "   text.getBodyLength());
                        
                        count = i   1;
//                        System.out.println("Put the "   count  " file into the MQ! "    tempList[i]);
                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                  }//判断是否为文件
               
              }//在指定目录下循环取文件
              
            
            System.out.println("Put "  count  " files all fininshed!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
        
        
    /**
         * 将指定数据放入到AMQ中
         * @param destPath 目录
         * @param fileName 文件名,放入到AMQ中的内容
         * @throws Exception
    */
    public void sendMsg4File(String destPath, String fileName){
        try {            
              File direct=new File(destPath);
                  
              File[] tempList = direct.listFiles();
              System.out.println("该目录下需要放入到MQ的文件个数:" tempList.length);
        
              int count = 0;
                  
              for (int i = 0; i < tempList.length; i  ) {
                      
                  if (tempList[i].isFile() && tempList[i].getName().contains(fileName)) {
                      try {
                           //遍历文件并生成对应的字节码文件到目录中
                           File file = new File(tempList[i].getAbsolutePath());
                            
                            //可以换成工程目录下的其他文本文件
                            FileInputStream fis= new FileInputStream(file);
                            //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
                            ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); 
                            // ByteArrayOutputStream 是OutputStream的一个实现类 
                            int ch = 0;
                            //byte[] msg = null;
                            
                            while (true) { 
                                //取得文本对应的16进制数据
                                ch = fis.read(); 
                                if(ch==-1) break;
                                //将FileInputStream 的内容写到 ByteArrayOutputStream 中
                                bytestream.write(ch);  
                            }
                            
                            byte imgdata[] = bytestream.toByteArray();
                            bytestream.close();
                            fis.close();
                            
                            BytesMessage text = session.createBytesMessage();
                            text.writeBytes(imgdata);
                            
                            producer.send(text);
                            count  =1;
    //                        System.out.println("Put the "   count  " file into the MQ! "    tempList[i]);
                            
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                      }//判断是否为文件
                   
                  }//在指定目录下循环取文件
                  
                
                System.out.println("Put "  count  " files all fininshed!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    /**
         * 将指定数据放入到AMQ中
         * @param destPath 目录,或放入到AMQ中的内容
         * 当传入的参数既不是目录也不是文件,就把该参数放入到AMQ中
         * @throws Exception
    */
    public void sendMsg4Str(String msg){                  
        BytesMessage message;
        try {
            message = session.createBytesMessage();                
                
            byte[] bmsg = msg.getBytes();
            message.writeBytes(bmsg);
            System.out.println(msg);
                    
            producer.send(message);
                    
            // TODO
            message.reset();
            System.out.println("len = "   message.getBodyLength());
                    
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void putmsg2amq() throws Exception {
        
        Connection connection = null;

        try {
            // 创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(
                    user, pwd, url);
            // 通过工厂创建一个连接
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(qName);
            // 创建消息生产者
            producer = session.createProducer(destination);
            // 设置持久化模式
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            sendMessage(session, producer);
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    /**
     * 数据同步,将具体报文内容发送到AMQ
     * @param msg
     * @throws Exception
     */
    public void putmsg2amq(String msg) throws Exception {
            
            Connection connection = null;
    //        Session session = null;
            try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, pwd, url);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(qName);
                // 创建消息生产者
                producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                
                sendMsg4Str(msg);
                
            } catch (Exception e) {
                throw e;
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }

    /**
     * 将指定目录下所有文件中的内容,发送到AMQ
     * @param msg
     * @throws Exception
     */
    public void putmsg2amqPath(String path) {
            
       Connection connection = null;
       try{
           try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, pwd, url);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(qName);
                // 创建消息生产者
                producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                sendMsg4Path(path);
                
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    
    }

    /**
     * 将指定目录下,指定文件的内容发送到AMQ
     * @param msg
     * @throws Exception
     */
    public void putmsg2amqFile(String path, String file) {
            
        Connection connection = null;
        try{
            try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, pwd, url);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(qName);
                // 创建消息制作者
                producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                sendMsg4File(path, file);
                
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    
    }

    /**
     * 
     * @param msg
     * @throws Exception
     */
    public void putmsg2amqPath(String destPath, String filename) {
            
        Connection connection = null;

        try{
            try {
                // 创建链接工厂
                ConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, pwd, url);
                // 通过工厂创建一个连接
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue(qName);
                // 创建消息制作者
                producer = session.createProducer(destination);
                // 设置持久化模式
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                try {            
                    File direct=new File(destPath);
                    
                    File[] tempList = direct.listFiles();
                    System.out.println("该目录下需要放入到MQ的文件个数:" tempList.length);
          
                    int count = 0;
                    
                    for (int i = 0; i < tempList.length; i  ) {
                        
                        if (tempList[i].isFile()) {
                            try {
                                //遍历文件并生成对应的字节码文件到目录中
                                File file = new File(tempList[i].getAbsolutePath());
                              
                                //可以换成工程目录下的其他文本文件
                                FileInputStream fis= new FileInputStream(file);
                                //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
                                ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); 
                                // ByteArrayOutputStream 是OutputStream的一个实现类 
                                int ch = 0;
                              
                                while (true) { 
                                    //取得文本对应的16进制数据
                                    ch = fis.read(); 
                                    if(ch==-1) break;
                                    //将FileInputStream 的内容写到 ByteArrayOutputStream 中
                                    bytestream.write(ch);  
                                }
                              
                                bytestream.close();
                                //关闭文件
                                fis.close();
                              
                                byte imgdata[] = bytestream.toByteArray();

                                BytesMessage text = session.createBytesMessage();
                                text.writeBytes(imgdata);                        
                              
                              
                                producer.send(text);
                                
                                count = i   1;
//                              System.out.println("Put the "   count  " file into the MQ! "    tempList[i]);
                              
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                        }//判断是否为文件
                        
                    }//在指定目录下循环取文件

                  System.out.println("Put "  count  " files all fininshed!");
              } catch (Exception e) {
                  e.printStackTrace();
              }
               
                
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                // 关闭释放资源
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    
    }
    
    
    /**
     * 向TOPIC中放入消息
     * @throws Exception
     */
    public void putmsg2Topic(String path){
        
        TopicConnection connection = null;

        try{
            try {
                // 创建链接工厂
                TopicConnectionFactory factory = new ActiveMQConnectionFactory(
                        user, 
                        pwd, 
                        url);
                // 通过工厂创建一个连接
                connection = factory.createTopicConnection();
                // 启动连接
                connection.start();
                // 创建一个session会话
                tsession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Topic topic = tsession.createTopic(qName);
                // 创建消息发送者
                publisher = tsession.createPublisher(topic);
                // 设置持久化模式
                publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                sendMessage(tsession, publisher);
                
            } catch (Exception e) {
               System.out.println(e.getMessage());
            } finally {
                // 关闭释放资源
                if (tsession != null) {
                    tsession.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }catch (Exception e) {
               System.out.println(e.getMessage());
        }
    }

    /**
         * 将指定数据放入到AMQ的TOPIC中
         * @param destPath 目录下所有文本,放入到AMQ TOPIC中
         * @throws Exception
         */
    public void sendMsgTopic4Path(String destPath){
            try {            
                  File direct=new File(destPath);
                  
                  File[] tempList = direct.listFiles();
                  System.out.println("该目录下需要放入到MQ的文件个数:" tempList.length);
        
                  int count = 0;
                  
                  for (int i = 0; i < tempList.length; i  ) {
                      
                      if (tempList[i].isFile()) {
                          try {
                            //遍历文件并生成对应的字节码文件到目录中
                            File file = new File(tempList[i].getAbsolutePath());
                            
                            //可以换成工程目录下的其他文本文件
                            FileInputStream fis= new FileInputStream(file);
                            //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类;
                            ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); 
                            // ByteArrayOutputStream 是OutputStream的一个实现类 
                            int ch = 0;
                            //byte[] msg = null;
                            
                            while (true) {
                                //取得文本对应的16进制数据
                                ch = fis.read(); 
                                if(ch==-1) break;
                                //将FileInputStream 的内容写到 ByteArrayOutputStream 中
                                bytestream.write(ch);  
                            }
                            
                            bytestream.close();
                            //关闭文件
                            fis.close();
                            
                            byte imgdata[] = bytestream.toByteArray();
    
                            BytesMessage text = tsession.createBytesMessage();
                            text.writeBytes(imgdata);                        
                            
                            
                            publisher.send(text);
                            count = i   1;
    //                        System.out.println("Put the "   count  " file into the MQ! "    tempList[i]);
                            
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                      }//判断是否为文件
                   
                  }//在指定目录下循环取文件
                  
                
                System.out.println("Put "  count  " files all fininshed!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
}

2、从ActiveMQ中取出消息

代码语言:javascript复制
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;


/**
 * <b>function:</b> 消息接收者
 */
public class Getmsg {
    private String url;
    private String user;
    private String pwd;
    //目标,队列或Topic名称
    private String qName;
    Session session = null;
    MessageProducer producer = null;
    //目标,TOPIC相关
    TopicSession tsession = null;
    TopicPublisher publisher = null;
    

    
    /**
     * 
     * @param url
     * @param user
     * @param pwd
     * @param qName
     */
    public Getmsg(String url, String user, String pwd, String qName){
        this.url = url;
        this.user = user;
        this.pwd = pwd;
        this.qName = qName;
    }
    
    public BytesMessage getmsg() {
        BytesMessage text = null;
         
        Connection connection = null;
        try {
            // 创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(
                    user, pwd, url);
            // 通过工厂创建一个连接
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(qName);
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            
            // 接收数据的时间(等待) 100 ms
            Message message = consumer.receive(100);
            
            text = (BytesMessage) message;
            
        } catch (Exception e) {
            e.getStackTrace();
        } finally {
            // 关闭释放资源
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
        
        return text;
    }
    
    
    public static void main(String[] args) throws Exception {
        //从AMQ队列取得数据并存入文件中
        Getmsg g = new Getmsg("tcp://localhost:61616","amq", "123456", "testmq");

        BytesMessage bm = g.getmsg();
        
        int msgLenth = (int)bm.getBodyLength();
        byte[] bmArr = new byte[msgLenth];
        bm.readBytes(bmArr);
        
        File file = new File("D:/test.txt");
        FileOutputStream fos = new FileOutputStream(file);
        BufferedOutputStream bs = new BufferedOutputStream(fos);
        
        bs.write(bmArr);
        
        bs.close();
        fos.close(); 
    }
}
代码语言:javascript复制
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;

import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;



/**
 * <b>function:</b> 消息接收者
 */
public class Jmx4Amq {
	private String uri;
    private String user;
    private String pwd;
    //目标,队列或Topic名称
    private String qName;
    private BrokerViewMBean mBean = null;
    private MBeanServerConnection connection = null;
    private JMXConnector connector = null;

    
    
    /**
     * 
     * @param url
     * @param user
     * @param pwd
     * @param qName
     */
    public Jmx4Amq(String uri, String user, String pwd){
    	this.uri = uri;
    	this.user = user;
    	this.pwd = pwd;
    }
    
    /**
     * 对JMX连接中的对象进行初始化
     */
    public void getStatus(){
        try {
        	
        	HashMap<String, Object> prop = new HashMap<String, Object>(); 
        	//jmx.password
        	String[] au = {user,pwd};
            prop.put(JMXConnector.CREDENTIALS, au); 
        	
        	JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" 
            		uri "/jmxrmi");
            connector = JMXConnectorFactory.connect(url, prop);
            connector.connect();
            connection = connector.getMBeanServerConnection();

             // 需要注意的是,这里的jms-broker必须和上面配置的名称相同
            ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=Broker_Name");
            mBean =  (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection,  
            		name, BrokerViewMBean.class, true);
            
            for(ObjectName queueName : mBean.getQueues()) {
            	QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler.
            			newProxyInstance(connection, queueName, QueueViewMBean.class, true);

            	// 消息队列名称
            	System.out.println(queueMBean.getName());
            	// 队列中剩余的消息数
            	System.out.println(queueMBean.getQueueSize());
            	// 消费者数
            	System.out.println(queueMBean.getConsumerCount());
            	// 入队数
            	System.out.println(queueMBean.getEnqueueCount());
            	// 出队数
            	System.out.println(queueMBean.getDequeueCount());
            }
        }catch(Exception e){
        	e.printStackTrace();
        }
    }
    
    
    /**
     * 清空队列中的数据
     */
    public void clearMsg(){
        try {        	
            //遍历AMQ中的对象
            for(ObjectName queueName : mBean.getQueues()) {
                QueueViewMBean queueMBean =  (QueueViewMBean)MBeanServerInvocationHandler
                			.newProxyInstance(connection, queueName, QueueViewMBean.class, true);
                //找到匹配队列,执行purge操作
                if(queueMBean.getName().equals(qName)){
                	queueMBean.purge();
                	break;
                }
            }
            
        }catch(Exception e){
        	e.printStackTrace();
        }
    }
    
    /**
     * 关闭JMX连接
     */
    public void closeJmxConn(){
        try {
			connector.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
    }
    
    public static void main(String[] args) throws Exception {
    	//从AMQ取得数据
    	Jmx4Amq g = new Jmx4Amq("localhost:11099","admin","cacikf88");
        g.getStatus();
    }
}

0 人点赞