消息队列之ActiveMQ

2023-10-12 14:15:25 浏览数 (2)

为什么要用消息队列

分布式中 service之间相互调用的时候 存在耦合 比如这边添加完商品后做同步索引库处理,添加商品就是在数据库中插入一条数据,而同步索引库这个功能一般写在solr的service层里,这时候就会有出现服务间的耦合 因此我们需要一个中间商来赚差价。。 是需要一个中间件来传递信息。

我们用activeMQ

1.1. ActiveMQ的消息形式

对于消息的传递有两种类型:

一种是点对点的,即一个生产者和一个消费者一一对应;

另一种是发布/*订阅模式*,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  · StreamMessage – Java原始值的数据流

  · MapMessage–一套名称-值对

  · TextMessage–一个字符串对象

  · ObjectMessage–一个序列化的 Java对象

  · BytesMessage–一个字节的数据流

  · BytesMessage–一个字节的数据流

1.2. 安装环境:

1、需要jdk

2、安装Linux系统。生产环境都是Linux系统。

1.3. 安装步骤

第一步: 把ActiveMQ 的压缩包上传到Linux系统。

第二步:解压缩。

第三步:启动。

使用bin目录下的activemq命令启动:

[root@localhost bin]# ./activemq start

关闭:

[root@localhost bin]# ./activemq stop

查看状态:

[root@localhost bin]# ./activemq status

注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2

进入管理后台:

http://192.168.25.168:8161/admin

用户名:admin

密码:admin

测试类:

代码语言:javascript复制
package cn.e3mall.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

public class ActiveMqTest {

    /**
     * 点对点形式发送消息
     * <p>Title: testQueueProducer</p>
     * <p>Description: </p>
     * @throws Exception
     */
    @Test
    public void testQueueProducer() throws Exception {
        //1、创建一个连接工厂对象,需要指定服务的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
        //2、使用工厂对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        //3、开启连接,调用Connection对象的start方法。
        connection.start();
        //4、创建一个Session对象。
        //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
        //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
        Queue queue = session.createQueue("test-queue");
        //6、使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        //7、创建一个Message对象,可以使用TextMessage。
        /*TextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText("hello Activemq");*/
        TextMessage textMessage = session.createTextMessage("hello activemq");
        //8、发送消息
        producer.send(textMessage);
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
    }

    /**
     * 点对点接收消息 接收后 生产的消息就不存在了
     * <p>Title: testQueueConsumer</p>
     * <p>Description: </p>
     * @throws Exception
     */
    @Test
    public void testQueueConsumer() throws Exception {
        //创建一个ConnectionFactory对象连接MQ服务器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
        //创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //开启连接
        connection.start();
        //使用Connection对象创建一个Session对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建一个Destination对象。queue对象
        Queue queue = session.createQueue("test-queue");
        //使用Session对象创建一个消费者对象。
        MessageConsumer consumer = session.createConsumer(queue);
        //接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                //打印结果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
        //等待接收消息
        //不按回车键不执行下面的代码
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }

    /**
     * 广播形式 生产者
     * <p>Title: testTopicProducer</p>
     * <p>Description: </p>
     * @throws Exception
     */
    @Test
    public void testTopicProducer() throws Exception {
        //创建连接工厂对象 需要指定服务的ip及端口
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //不开启事务 应答模式:自动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建Destination对象 两种形式的queue topic 
        // topic 没人消费者接收 消息丢失
        Topic topic = session.createTopic("test-queue");
        MessageProducer messageProducer = session.createProducer(topic);
        TextMessage textMessage = session.createTextMessage("topic message");
        messageProducer.send(textMessage);
        messageProducer.close();
        session.close();
        connection.close();

    }

    /**
     * 广播形式 消费者 运行多次 产生多个消费者
     * <p>Title: testTopicConsumer</p>
     * <p>Description: </p>
     * @throws Exception
     */
    @Test
    public void testTopicConsumer() throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //从队列接受消息
        Topic topic = session.createTopic("test-queue");
        //创建消费者对象
        MessageConsumer messageConsumer = session.createConsumer(topic);
        messageConsumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //不按回车键不执行下面的代码
        System.out.println("topic消费者3启动...");
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

activeMQ与spring整合

1.1. 生产者

第一步:引用相关的jar包。

代码语言:javascript复制
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>

第二步:配置Activemq整合spring。配置ConnectionFactory

applicationContext-activemq.xml

代码语言:javascript复制
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.130:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="itemAddTopic" />
    </bean>

</beans>

消息队列生产者测试类:

代码语言:javascript复制
package cn.e3mall.activemq;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMqSpring {

    @Test
    public void sendMessage() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //从容器中获得JmsTemplate对象。
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        //从容器中获得一个Destination对象。
        Destination destination = (Destination) applicationContext.getBean("queueDestination");
        //发送消息
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("send activemq message");
            }
        });
    }
}
1.2. 消费者

消费者中配置

applicationContext-activemq.xml

代码语言:javascript复制
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.130:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="itemAddTopic" />
    </bean>
    <bean id="myMessageListener" class="cn.e3mall.search.message.MyMessageListener" />
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
    <!-- 监听商品添加消息 同步索引库 -->
    <bean id="itemAddMessageListener" class="cn.e3mall.search.message.ItemAddMessageListener"/>
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>

实现MessageListener接口

代码语言:javascript复制
package cn.e3mall.search.message;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        //取消息内容
        TextMessage textMessage = (TextMessage)message;
        try {
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

消费者测试类:

只需要加载文件就行了 因为配置文件中已经配置好了

代码语言:javascript复制
package cn.e3mall.activemq;

import java.io.IOException;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MessageConsumer {

    @Test
    public void msgConsumer() throws Exception {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        System.in.read();
    }
}
实际开发

那么在实际开发中怎么用呢

在添加商品的时候发送一条消息 消息中包含添加商品的id信息 完事之后 solr层的service接收信息 根据id从数据库中查询出商品信息 添加到索引库

添加商品 发送消息:

代码语言:javascript复制
package cn.e3mall.service.impl;


import java.util.Date;
import java.util.List;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.activemq.filter.function.regexMatchFunction;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;

import cn.e3mall.common.jedis.JedisClient;
import cn.e3mall.common.pojo.EasyUIDataGridResult;
import cn.e3mall.common.util.E3Result;
import cn.e3mall.common.util.IDUtils;
import cn.e3mall.common.util.JsonUtils;
import cn.e3mall.mapper.TbItemDescMapper;
import cn.e3mall.mapper.TbItemMapper;
import cn.e3mall.pojo.TbItem;
import cn.e3mall.pojo.TbItemDesc;
import cn.e3mall.pojo.TbItemExample;
import cn.e3mall.pojo.TbItemExample.Criteria;
//import cn.e3mall.service.ItemService;
import cn.e3mall.service.ItemService;

@Service
public class ItemServiceImpl implements ItemService {

    @Autowired
    private TbItemMapper itemMapper;
    @Autowired
    private TbItemDescMapper tbItemDescMapper;
    @Autowired
    private JmsTemplate jmsTemplate;
    @Autowired
    private JedisClient jedisClient;
    //默认使用id注入
    @Resource
    private Destination topicDestination;

    @Value("${REDIS_ITEM_PRE}")
    private String REDIS_ITEM_PRE;
    @Value("${ITEM_CACHE_EXPIRE}")
    private Integer ITEM_CACHE_EXPIRE;

    @Override
    public TbItem geTbItemById(long itemId) {
        //查询缓存
        try {
            String json = jedisClient.get(REDIS_ITEM_PRE ":" itemId ":BASE");
            if(StringUtils.isNotBlank(json)){
                TbItem tbItem = JsonUtils.jsonToPojo(json, TbItem.class);
                return tbItem;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        //缓存中没有 查询数据库
        TbItemExample example = new TbItemExample();
        Criteria criteria = example.createCriteria();
        criteria.andIdEqualTo(itemId);
        List<TbItem> list = itemMapper.selectByExample(example);
        if (list!=null&&list.size()>0) {
            try {
                //结果添加到缓存
                jedisClient.set(REDIS_ITEM_PRE ":" itemId ":BASE", JsonUtils.objectToJson(list.get(0)));
                //设置过期时间
                jedisClient.expire(REDIS_ITEM_PRE ":" itemId ":BASE",ITEM_CACHE_EXPIRE);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return list.get(0);
        }
        return null;
    }

    @Override
    public EasyUIDataGridResult getItemList(int page, int rows) {
        //设置分页信息
        PageHelper.startPage(page, rows);
        //执行查询
        TbItemExample example =new TbItemExample();
        List<TbItem> list = itemMapper.selectByExample(example);
        EasyUIDataGridResult result=new EasyUIDataGridResult();
        result.setRows(list);
        //取分页结果
        PageInfo<TbItem> pageInfo =new PageInfo<>(list);
        long total = pageInfo.getTotal();
        result.setTotal(total);

        return result;
    }

    @Override
    public E3Result addItem(TbItem item, String desc) {
        //生成Id
        final long itemId = IDUtils.genItemId();
        //补全item属性
        item.setId(itemId);
        //1-正常,2-下架,3-删除
        item.setStatus((byte)1);
        item.setCreated(new Date());
        item.setUpdated(new Date());
        //向商品插入数据
        itemMapper.insert(item);
        //创建商品描述表对应的pojo对象
        TbItemDesc itemDesc =new TbItemDesc();
        itemDesc.setItemId(itemId);
        itemDesc.setItemDesc(desc);
        itemDesc.setCreated(new Date());
        itemDesc.setUpdated(new Date());
        //向商品描述表插入数据
        tbItemDescMapper.insert(itemDesc);
        //发送商品添加消息
        jmsTemplate.send(topicDestination,new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(itemId "");
            }
        });
        //返回成功
        return E3Result.ok();
    }

    @Override
    public TbItemDesc geTbItemDescById(long itemId) {
        try {
            String json = jedisClient.get(REDIS_ITEM_PRE ":" itemId ":DESC");
            if(StringUtils.isNotBlank(json)){
                TbItemDesc itemDesc = JsonUtils.jsonToPojo(json, TbItemDesc.class);
                return itemDesc;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        TbItemDesc itemDesc = tbItemDescMapper.selectByPrimaryKey(itemId);
        try {
            //结果添加到缓存
            jedisClient.set(REDIS_ITEM_PRE ":" itemId ":DESC", JsonUtils.objectToJson(itemDesc));
            //设置过期时间
            jedisClient.expire(REDIS_ITEM_PRE ":" itemId ":DESC",ITEM_CACHE_EXPIRE);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return itemDesc;
    }

}

接收消息:

代码语言:javascript复制
package cn.e3mall.search.message;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;

import cn.e3mall.common.pojo.SearchItem;
import cn.e3mall.search.mapper.ItemMapper;

/**
 * 监听商品添加消息 接收消息后 将对应商品添加到索引库
 * <p>Title: ItemAddMessageListener</p>
 * <p>Description: </p>
 * <p>Company: www.itcast.cn</p> 
 * @version 1.0
 */
public class ItemAddMessageListener implements MessageListener {

    @Autowired
    private ItemMapper itemMapper;
    @Autowired
    private SolrServer solrServer;

    /**
     * 根据id获取商品信息 完事之后创建文档对象 文档对象中添加域 完事之后导入到索引库
     * <p>Title: onMessage</p>
     * <p>Description: </p>
     * @param message
     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
     */
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage)message;
        String text;
        try {
            text = textMessage.getText();
            Long itemId = new Long(text);
            //有可能还没提交商品 但是消息已经发出了 此时搜到的商品可能为空 因此要等待一段时间
            Thread.sleep(1000);
            SearchItem searchItem = itemMapper.getItemById(itemId);
            SolrInputDocument solrInputDocument = new SolrInputDocument();
            //添加域
            solrInputDocument.addField("id",searchItem.getId());
            solrInputDocument.addField("item_title",searchItem.getTitle());
            solrInputDocument.addField("item_sell_point",searchItem.getSell_point());
            solrInputDocument.addField("item_price",searchItem.getPrice());
            solrInputDocument.addField("item_image",searchItem.getImage());
            solrInputDocument.addField("item_category_name",searchItem.getCategory_name());
            solrServer.add(solrInputDocument);
            solrServer.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

配置文件:改一下Listener的配置就可以了

代码语言:javascript复制
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="itemAddMessageListener" />
    </bean>

0 人点赞