为什么要用消息队列
分布式中 service之间相互调用的时候 存在耦合 比如这边添加完商品后做同步索引库处理,添加商品就是在数据库中插入一条数据,而同步索引库这个功能一般写在solr的service层里,这时候就会有出现服务间的耦合 因此我们需要一个中间商来赚差价。。 是需要一个中间件来传递信息。
我们用activeMQ
1.1. ActiveMQ的消息形式
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/*订阅模式*,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage – Java原始值的数据流
· MapMessage–一套名称-值对
· TextMessage–一个字符串对象
· ObjectMessage–一个序列化的 Java对象
· BytesMessage–一个字节的数据流
· BytesMessage–一个字节的数据流
1.2. 安装环境:
1、需要jdk
2、安装Linux系统。生产环境都是Linux系统。
1.3. 安装步骤
第一步: 把ActiveMQ 的压缩包上传到Linux系统。
第二步:解压缩。
第三步:启动。
使用bin目录下的activemq命令启动:
[root@localhost bin]# ./activemq start
关闭:
[root@localhost bin]# ./activemq stop
查看状态:
[root@localhost bin]# ./activemq status
注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2
进入管理后台:
http://192.168.25.168:8161/admin
用户名:admin
密码:admin
测试类:
代码语言:javascript复制package cn.e3mall.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class ActiveMqTest {
/**
* 点对点形式发送消息
* <p>Title: testQueueProducer</p>
* <p>Description: </p>
* @throws Exception
*/
@Test
public void testQueueProducer() throws Exception {
//1、创建一个连接工厂对象,需要指定服务的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
//2、使用工厂对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用Connection对象的start方法。
connection.start();
//4、创建一个Session对象。
//第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
//第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
Queue queue = session.createQueue("test-queue");
//6、使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
//7、创建一个Message对象,可以使用TextMessage。
/*TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("hello Activemq");*/
TextMessage textMessage = session.createTextMessage("hello activemq");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
/**
* 点对点接收消息 接收后 生产的消息就不存在了
* <p>Title: testQueueConsumer</p>
* <p>Description: </p>
* @throws Exception
*/
@Test
public void testQueueConsumer() throws Exception {
//创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
//创建一个连接对象
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个Destination对象。queue对象
Queue queue = session.createQueue("test-queue");
//使用Session对象创建一个消费者对象。
MessageConsumer consumer = session.createConsumer(queue);
//接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待接收消息
//不按回车键不执行下面的代码
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
/**
* 广播形式 生产者
* <p>Title: testTopicProducer</p>
* <p>Description: </p>
* @throws Exception
*/
@Test
public void testTopicProducer() throws Exception {
//创建连接工厂对象 需要指定服务的ip及端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
//不开启事务 应答模式:自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建Destination对象 两种形式的queue topic
// topic 没人消费者接收 消息丢失
Topic topic = session.createTopic("test-queue");
MessageProducer messageProducer = session.createProducer(topic);
TextMessage textMessage = session.createTextMessage("topic message");
messageProducer.send(textMessage);
messageProducer.close();
session.close();
connection.close();
}
/**
* 广播形式 消费者 运行多次 产生多个消费者
* <p>Title: testTopicConsumer</p>
* <p>Description: </p>
* @throws Exception
*/
@Test
public void testTopicConsumer() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//从队列接受消息
Topic topic = session.createTopic("test-queue");
//创建消费者对象
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//不按回车键不执行下面的代码
System.out.println("topic消费者3启动...");
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
activeMQ与spring整合
1.1. 生产者
第一步:引用相关的jar包。
代码语言:javascript复制 <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
第二步:配置Activemq整合spring。配置ConnectionFactory
applicationContext-activemq.xml
代码语言:javascript复制<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.130:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopic" />
</bean>
</beans>
消息队列生产者测试类:
代码语言:javascript复制package cn.e3mall.activemq;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class ActiveMqSpring {
@Test
public void sendMessage() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//从容器中获得JmsTemplate对象。
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//从容器中获得一个Destination对象。
Destination destination = (Destination) applicationContext.getBean("queueDestination");
//发送消息
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("send activemq message");
}
});
}
}
1.2. 消费者
消费者中配置
applicationContext-activemq.xml
代码语言:javascript复制<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.130:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopic" />
</bean>
<bean id="myMessageListener" class="cn.e3mall.search.message.MyMessageListener" />
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<!-- 监听商品添加消息 同步索引库 -->
<bean id="itemAddMessageListener" class="cn.e3mall.search.message.ItemAddMessageListener"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
实现MessageListener接口
代码语言:javascript复制package cn.e3mall.search.message;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//取消息内容
TextMessage textMessage = (TextMessage)message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者测试类:
只需要加载文件就行了 因为配置文件中已经配置好了
代码语言:javascript复制package cn.e3mall.activemq;
import java.io.IOException;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MessageConsumer {
@Test
public void msgConsumer() throws Exception {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
System.in.read();
}
}
实际开发
那么在实际开发中怎么用呢
在添加商品的时候发送一条消息 消息中包含添加商品的id信息 完事之后 solr层的service接收信息 根据id从数据库中查询出商品信息 添加到索引库
添加商品 发送消息:
代码语言:javascript复制package cn.e3mall.service.impl;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.filter.function.regexMatchFunction;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import cn.e3mall.common.jedis.JedisClient;
import cn.e3mall.common.pojo.EasyUIDataGridResult;
import cn.e3mall.common.util.E3Result;
import cn.e3mall.common.util.IDUtils;
import cn.e3mall.common.util.JsonUtils;
import cn.e3mall.mapper.TbItemDescMapper;
import cn.e3mall.mapper.TbItemMapper;
import cn.e3mall.pojo.TbItem;
import cn.e3mall.pojo.TbItemDesc;
import cn.e3mall.pojo.TbItemExample;
import cn.e3mall.pojo.TbItemExample.Criteria;
//import cn.e3mall.service.ItemService;
import cn.e3mall.service.ItemService;
@Service
public class ItemServiceImpl implements ItemService {
@Autowired
private TbItemMapper itemMapper;
@Autowired
private TbItemDescMapper tbItemDescMapper;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private JedisClient jedisClient;
//默认使用id注入
@Resource
private Destination topicDestination;
@Value("${REDIS_ITEM_PRE}")
private String REDIS_ITEM_PRE;
@Value("${ITEM_CACHE_EXPIRE}")
private Integer ITEM_CACHE_EXPIRE;
@Override
public TbItem geTbItemById(long itemId) {
//查询缓存
try {
String json = jedisClient.get(REDIS_ITEM_PRE ":" itemId ":BASE");
if(StringUtils.isNotBlank(json)){
TbItem tbItem = JsonUtils.jsonToPojo(json, TbItem.class);
return tbItem;
}
} catch (Exception e) {
e.printStackTrace();
}
//缓存中没有 查询数据库
TbItemExample example = new TbItemExample();
Criteria criteria = example.createCriteria();
criteria.andIdEqualTo(itemId);
List<TbItem> list = itemMapper.selectByExample(example);
if (list!=null&&list.size()>0) {
try {
//结果添加到缓存
jedisClient.set(REDIS_ITEM_PRE ":" itemId ":BASE", JsonUtils.objectToJson(list.get(0)));
//设置过期时间
jedisClient.expire(REDIS_ITEM_PRE ":" itemId ":BASE",ITEM_CACHE_EXPIRE);
} catch (Exception e) {
e.printStackTrace();
}
return list.get(0);
}
return null;
}
@Override
public EasyUIDataGridResult getItemList(int page, int rows) {
//设置分页信息
PageHelper.startPage(page, rows);
//执行查询
TbItemExample example =new TbItemExample();
List<TbItem> list = itemMapper.selectByExample(example);
EasyUIDataGridResult result=new EasyUIDataGridResult();
result.setRows(list);
//取分页结果
PageInfo<TbItem> pageInfo =new PageInfo<>(list);
long total = pageInfo.getTotal();
result.setTotal(total);
return result;
}
@Override
public E3Result addItem(TbItem item, String desc) {
//生成Id
final long itemId = IDUtils.genItemId();
//补全item属性
item.setId(itemId);
//1-正常,2-下架,3-删除
item.setStatus((byte)1);
item.setCreated(new Date());
item.setUpdated(new Date());
//向商品插入数据
itemMapper.insert(item);
//创建商品描述表对应的pojo对象
TbItemDesc itemDesc =new TbItemDesc();
itemDesc.setItemId(itemId);
itemDesc.setItemDesc(desc);
itemDesc.setCreated(new Date());
itemDesc.setUpdated(new Date());
//向商品描述表插入数据
tbItemDescMapper.insert(itemDesc);
//发送商品添加消息
jmsTemplate.send(topicDestination,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(itemId "");
}
});
//返回成功
return E3Result.ok();
}
@Override
public TbItemDesc geTbItemDescById(long itemId) {
try {
String json = jedisClient.get(REDIS_ITEM_PRE ":" itemId ":DESC");
if(StringUtils.isNotBlank(json)){
TbItemDesc itemDesc = JsonUtils.jsonToPojo(json, TbItemDesc.class);
return itemDesc;
}
} catch (Exception e) {
e.printStackTrace();
}
TbItemDesc itemDesc = tbItemDescMapper.selectByPrimaryKey(itemId);
try {
//结果添加到缓存
jedisClient.set(REDIS_ITEM_PRE ":" itemId ":DESC", JsonUtils.objectToJson(itemDesc));
//设置过期时间
jedisClient.expire(REDIS_ITEM_PRE ":" itemId ":DESC",ITEM_CACHE_EXPIRE);
} catch (Exception e) {
e.printStackTrace();
}
return itemDesc;
}
}
接收消息:
代码语言:javascript复制package cn.e3mall.search.message;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;
import cn.e3mall.common.pojo.SearchItem;
import cn.e3mall.search.mapper.ItemMapper;
/**
* 监听商品添加消息 接收消息后 将对应商品添加到索引库
* <p>Title: ItemAddMessageListener</p>
* <p>Description: </p>
* <p>Company: www.itcast.cn</p>
* @version 1.0
*/
public class ItemAddMessageListener implements MessageListener {
@Autowired
private ItemMapper itemMapper;
@Autowired
private SolrServer solrServer;
/**
* 根据id获取商品信息 完事之后创建文档对象 文档对象中添加域 完事之后导入到索引库
* <p>Title: onMessage</p>
* <p>Description: </p>
* @param message
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
String text;
try {
text = textMessage.getText();
Long itemId = new Long(text);
//有可能还没提交商品 但是消息已经发出了 此时搜到的商品可能为空 因此要等待一段时间
Thread.sleep(1000);
SearchItem searchItem = itemMapper.getItemById(itemId);
SolrInputDocument solrInputDocument = new SolrInputDocument();
//添加域
solrInputDocument.addField("id",searchItem.getId());
solrInputDocument.addField("item_title",searchItem.getTitle());
solrInputDocument.addField("item_sell_point",searchItem.getSell_point());
solrInputDocument.addField("item_price",searchItem.getPrice());
solrInputDocument.addField("item_image",searchItem.getImage());
solrInputDocument.addField("item_category_name",searchItem.getCategory_name());
solrServer.add(solrInputDocument);
solrServer.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}
配置文件:改一下Listener的配置就可以了
代码语言:javascript复制<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="itemAddMessageListener" />
</bean>