activeMQ发布订阅模式中中常用工具类

2019-04-16 14:03:47 浏览数 (2)

代码语言:javascript复制
package com.jms;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.clapper.util.logging.Logger;

import com.pzoom.dsa.common.util.Log;
import com.pzoom.dsa.nerd.mysql.DBQueryHelper;

public class Jms
{
  static ConnectionFactory connectionFactory;
  static Connection connection = null;
  static Session session;
  static Map<String, MessageProducer> sendQueues = new ConcurrentHashMap<String, MessageProducer>();

  static Map<String, MessageConsumer> getQueues = new ConcurrentHashMap<String, MessageConsumer>();

  static Log log=Log.getLogger(DBQueryHelper.class);

  static {
    connectionFactory = new ActiveMQConnectionFactory(
      ActiveMQConnection.DEFAULT_USER, 
      ActiveMQConnection.DEFAULT_PASSWORD, 
      "tcp://10.100.100.100:61616?wireFormat.maxInactivityDuration=0");
    try
    {
      connection = connectionFactory.createConnection();

      connection.start();

      session = connection.createSession(Boolean.FALSE.booleanValue(), 
        1);
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }

  static MessageProducer getMessageProducer(String name) {
    if (sendQueues.containsKey(name))
      return ((MessageProducer)sendQueues.get(name));
    try
    {
      Destination destination = session.createQueue(name);
      MessageProducer producer = session.createProducer(destination);
      sendQueues.put(name, producer);
      return producer;
    } catch (JMSException e) {
      e.printStackTrace();
    }

    return ((MessageProducer)sendQueues.get(name));
  }

  static MessageConsumer getMessageConsumer(String name) {
    if (getQueues.containsKey(name))
      return ((MessageConsumer)getQueues.get(name));
    try
    {
      Destination destination = session.createQueue(name);
      MessageConsumer consumer = session.createConsumer(destination);
      getQueues.put(name, consumer);
      return consumer;
    } catch (JMSException e) {
      e.printStackTrace();
    }

    return ((MessageConsumer)getQueues.get(name));
  }

  public static void sendMessage(String queue, String text) {
    try {
      TextMessage message = session.createTextMessage(text);
      getMessageProducer(queue).send(message);
     // log.info("sendMessage "   queue   "tt"   text);
    }
    catch (JMSException e) {
      e.printStackTrace();
    }
  }
  
 
  
  public static String getMessage(String queue)
  {
    try {
      TextMessage message = (TextMessage)getMessageConsumer(queue).receive(10000L);
      if (message != null) 
      return message.getText();
    } catch (JMSException e) {
      e.printStackTrace();
    }
	return null;
  }

  public static void close() {
    try {
      session.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
    try {
      connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

0 人点赞