今天就来说下 这个项目中使用ActiveMQ的情况, MQ: message queue, 顾名思义就是消息队列的意思.
一: 使用场景:
消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有这深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧。在使用队列后,用户的请求发给队列后立即返回(当然不能直接给用户提示订单提交成功,京东上提示:您“您提交了订单,请等待系统确认”),再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。 那么在babasport这个项目中, 我们可以在上架的时候使用消息队列的模式: 我们之前在点击一款商品上架的时候, 我们需要分成2步, 第一: 更新商品表中该商品的上架状态. 第二: 将该商品信息保存到Solr服务器中. 那么如果我们使用了消息队列后, 第二步就可以使用发送message来异步完成.
消息队列可以接收消息和 发送消息
消息队列类型:
队列:一对一聊天 私聊 QQ
主题(订阅模式):一对多聊天 群聊 QQ 名词解释:
二, 代码原型 ActiveMQ需要部署到Linux系统下, 这里就不再做概述. 这里也是tar包, 导入到linux下直接解压启动即可, 前面已经有过很多博文讲Linux下一些常用软件的安装步骤.
上架代码原型: 项目构件图:
未使用ActiveMQ前ProductServiceImpl.cs:
代码语言:javascript复制 1 //上架
2 public void isShow(Long[] ids){
3 Product product = new Product();
4 product.setIsShow(true);
5 for (final Long id : ids) {
6 //上下架状态
7 product.setId(id);
8 productDao.updateByPrimaryKeySelective(product);
9
10 //这个地方的代码应该在babasport-solr中写, 现在使用ActiveMQ进行迁移.
11 //TODO 保存商品信息到Solr服务器
12 SolrInputDocument doc = new SolrInputDocument();
13 //ID
14 doc.setField("id", id);
15 //名称
16 Product p = productDao.selectByPrimaryKey(id);
17 doc.setField("name_ik", p.getName());
18 //图片URL
19 doc.setField("url", p.getImgUrls()[0]);
20 //品牌 ID
21 doc.setField("brandId", p.getBrandId());
22 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1
23 SkuQuery skuQuery = new SkuQuery();
24 skuQuery.createCriteria().andProductIdEqualTo(id);
25 skuQuery.setOrderByClause("price asc");
26 skuQuery.setPageNo(1);
27 skuQuery.setPageSize(1);
28 List<Sku> skus = skuDao.selectByExample(skuQuery);
29 doc.setField("price", skus.get(0).getPrice());
30 //...时间等 剩下的省略
31
32 try {
33 solrServer.add(doc);
34 solrServer.commit();
35 } catch (Exception e) {
36 // TODO Auto-generated catch block
37 e.printStackTrace();
38 }
39
40
41
42
43 //TODO 静态化
44 }
45 }
上面的代码 除了更改本来就该更改的商品状态信息外, 还去见商品信息保存到了Solr服务器中了. 这里我们使用ActiveMQ进行改造: 使用ActiveMQ后的ProductServiceImpl.cs:
代码语言:javascript复制 1 //上架
2 public void isShow(Long[] ids){
3 Product product = new Product();
4 product.setIsShow(true);
5 for (final Long id : ids) {
6 //上下架状态
7 product.setId(id);
8 productDao.updateByPrimaryKeySelective(product);
9
10 //发送商品ID到ActiveMQ即可.
11 jmsTemplate.send(new MessageCreator() {
12
13 @Override
14 public Message createMessage(Session session) throws JMSException {
15
16 return session.createTextMessage(String.valueOf(id));
17 }
18 });
19
20 //TODO 静态化
21 }
22 }
接着就是配置消息发送方(JMS生产者) mq.xml:
代码语言:javascript复制 1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:aop="http://www.springframework.org/schema/aop"
5 xmlns:tx="http://www.springframework.org/schema/tx"
6 xmlns:task="http://www.springframework.org/schema/task"
7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
8 xsi:schemaLocation="http://www.springframework.org/schema/beans
9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12 http://www.springframework.org/schema/context
13 http://www.springframework.org/schema/context/spring-context-4.0.xsd
14 http://www.springframework.org/schema/aop
15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16 http://www.springframework.org/schema/tx
17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18 http://www.springframework.org/schema/task
19 http://www.springframework.org/schema/task/spring-task-4.0.xsd
20 http://code.alibabatech.com/schema/dubbo
21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
25 <!-- 连接工厂, 此工厂由Apache提供 -->
26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27 <!-- 连接地址
28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
29 -->
30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31 <!-- 设置用户名及密码 -->
32 <property name="userName" value="admin"></property>
33 <property name="password" value="admin"></property>
34 </bean>
35
36 <!-- 配置连接池管理工厂 -->
37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38 <!-- 注入工厂 -->
39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40 <!-- 设置最大连接数 -->
41 <property name="maxConnections" value="5"></property>
42 </bean>
43
44 <!-- 把上面的工厂交给Spring管理 -->
45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46 <!-- 注入上面的工厂 -->
47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48 </bean>
49
50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52 <!-- 注入Spring单例工厂 -->
53 <property name="connectionFactory" ref="singleConnectionFactory"></property>
54 <!-- 设置默认的目标管道 -->
55 <property name="defaultDestinationName" value="pId"/>
56 </bean>
57 </beans>
配置说明: 这里是首先构建一个MQ的连接工厂, 只要ActiveMQ启动后 就可以这样构建连接了. 配置登录的用户名和和密码. 接着就是配置连接池, 把连接工厂交给连接池去管理. 这些都是Apache厂商提供的. 接着就是再将连接池交由Spring管理. 最后我们再来配置一个jmsTemplate模板来操作ActiveMQ, 这个类似于jdbcTemplate模板. 而且我们这个里面注入了一个默认的管道, 也就是productId, 因为我们现在是 传递消息一一去对应, 关于怎么对应 就是依赖于这个管道. 接下来我们就看下消息的接收方(JMS消费者)的一些东西: 消费者的目录结构:(Solr)
Solr项目中的ActiveMQ配置文件mq.xml:
代码语言:javascript复制 1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:aop="http://www.springframework.org/schema/aop"
5 xmlns:tx="http://www.springframework.org/schema/tx"
6 xmlns:task="http://www.springframework.org/schema/task"
7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
8 xsi:schemaLocation="http://www.springframework.org/schema/beans
9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12 http://www.springframework.org/schema/context
13 http://www.springframework.org/schema/context/spring-context-4.0.xsd
14 http://www.springframework.org/schema/aop
15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16 http://www.springframework.org/schema/tx
17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18 http://www.springframework.org/schema/task
19 http://www.springframework.org/schema/task/spring-task-4.0.xsd
20 http://code.alibabatech.com/schema/dubbo
21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
25 <!-- 连接工厂, 此工厂由Apache提供 -->
26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27 <!-- 连接地址
28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
29 -->
30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31 <!-- 设置用户名及密码 -->
32 <property name="userName" value="admin"></property>
33 <property name="password" value="admin"></property>
34 </bean>
35
36 <!-- 配置连接池管理工厂 ,由Apache提供.-->
37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38 <!-- 注入工厂 -->
39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40 <!-- 设置最大连接数 -->
41 <property name="maxConnections" value="5"></property>
42 </bean>
43
44 <!-- 把上面的工厂交给Spring管理 -->
45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46 <!-- 注入上面的工厂 -->
47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48 </bean>
49
50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52 <!-- 注入Spring单例工厂 -->
53 <property name="connectionFactory" ref="singleConnectionFactory"></property>
54 <!-- 设置默认的目标管道 -->
55 <property name="defaultDestinationName" value="pId"/>
56 </bean>
57
58 <!-- 实例化一个监听到消息后 处理此消息的类 -->
59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
60
61 <!-- 配置实时监听器 -->
62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
63 <!-- 配置工厂, 需要配置spirng的工厂 -->
64 <property name="connectionFactory" ref="singleConnectionFactory"/>
65 <!-- 设置监听的目标 -->
66 <property name="destinationName" value="pId"/>
67 <!-- 监听后获取消息的类, 接收监听到的消息 -->
68 <property name="messageListener" ref="customMessageListener"></property>
69 </bean>
70 </beans>
我们来说下 和上面配置不同的地方, 我们在这里配置了一个监听器, 因为接收到 JMS 生产者发过来的消息后我们需要有个监听器去监听且 将监听到的消息拿过来处理. 接下来看看监听器的处理方法做了些什么事情: CustomMessageListener.java:
代码语言:javascript复制 1 /*
2 * 接收MQ中的消息
3 */
4 public class CustomMessageListener implements MessageListener{
5 @Autowired
6 private SearchService searchService;
7
8 @Override
9 public void onMessage(Message message) {
10 //先将接收到的消息强转为ActiveMQ类型的消息
11 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage
12 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
13 try {
14 String id = amtm.getText();
15 System.out.println("接收到的ID:" id);
16 searchService.insertProductToSolr(Long.parseLong(id));
17 } catch (JMSException e) {
18 // TODO Auto-generated catch block
19 e.printStackTrace();
20 }
21 }
因为我们接收到的是string类型的文本, 所以这里我们直接将接收到的消息转换为ActiveMQText类型, 然后通过getText去得到传递过来的id, 然后我们就可以通过这个productId去做相应的操作了. 接下来就看保存商品信息到Solr服务器的逻辑: SearchServiceImpl.java:
代码语言:javascript复制 1 //保存商品信息到Solr服务器中, 通过ActiveMQ
2 public void insertProductToSolr(Long productId){
3 //TODO 保存商品信息到Solr服务器
4 SolrInputDocument doc = new SolrInputDocument();
5 //ID
6 doc.setField("id", productId);
7 //名称
8 Product p = productDao.selectByPrimaryKey(productId);
9 doc.setField("name_ik", p.getName());
10 //图片URL
11 doc.setField("url", p.getImgUrls()[0]);
12 //品牌 ID
13 doc.setField("brandId", p.getBrandId());
14 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1
15 SkuQuery skuQuery = new SkuQuery();
16 skuQuery.createCriteria().andProductIdEqualTo(productId);
17 skuQuery.setOrderByClause("price asc");
18 skuQuery.setPageNo(1);
19 skuQuery.setPageSize(1);
20 List<Sku> skus = skuDao.selectByExample(skuQuery);
21 doc.setField("price", skus.get(0).getPrice());
22 //...时间等 剩下的省略
23
24 try {
25 solrServer.add(doc);
26 solrServer.commit();
27 } catch (Exception e) {
28 // TODO Auto-generated catch block
29 e.printStackTrace();
30 }
31 }
这样就比较明朗了, ActiveMQ 队列就是这样来实现的. ====================接下来还会有 ActiveMQ 订阅者模式的示例, 这里只是生产者发送消息给单个消费者, 下次还会更新生产者发送消息给多个消费者.
2016/09/04 20:32 更新 上面已经说了 消息的队列模式, 及点对点发送消息, 那么接下来就来说下 消息的一对多模式, 也就是 发布/订阅模式. 项目原型: 当商品上架后(babasport-product), 发送消息id给solr(babasport-solr)来将商品信息保存到solr服务器和cms(babasport-cms)来对商品详情页面做页面静态化. =================== babasport-product: 结构图:
babasport-product下的项目结构图:
ProductServiceImpl.java中的上架:
代码语言:javascript复制 1 @Autowired
2 private JmsTemplate jmsTemplate;
3
4 //上架
5 public void isShow(Long[] ids){
6 Product product = new Product();
7 product.setIsShow(true);
8 for (final Long id : ids) {
9 //上下架状态
10 product.setId(id);
11 productDao.updateByPrimaryKeySelective(product);
12
13 //发送商品ID到ActiveMQ即可.
14 jmsTemplate.send(new MessageCreator() {
15
16 @Override
17 public Message createMessage(Session session) throws JMSException {
18
19 return session.createTextMessage(String.valueOf(id));
20 }
21 });
22 }
23 }
mq.xml:
代码语言:javascript复制 1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:aop="http://www.springframework.org/schema/aop"
5 xmlns:tx="http://www.springframework.org/schema/tx"
6 xmlns:task="http://www.springframework.org/schema/task"
7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
8 xsi:schemaLocation="http://www.springframework.org/schema/beans
9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12 http://www.springframework.org/schema/context
13 http://www.springframework.org/schema/context/spring-context-4.0.xsd
14 http://www.springframework.org/schema/aop
15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16 http://www.springframework.org/schema/tx
17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18 http://www.springframework.org/schema/task
19 http://www.springframework.org/schema/task/spring-task-4.0.xsd
20 http://code.alibabatech.com/schema/dubbo
21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
25 <!-- 连接工厂, 此工厂由Apache提供 -->
26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27 <!-- 连接地址
28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
29 -->
30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31 <!-- 设置用户名及密码 -->
32 <property name="userName" value="admin"></property>
33 <property name="password" value="admin"></property>
34 </bean>
35
36 <!-- 配置连接池管理工厂 -->
37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38 <!-- 注入工厂 -->
39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40 <!-- 设置最大连接数 -->
41 <property name="maxConnections" value="5"></property>
42 </bean>
43
44 <!-- 把上面的工厂交给Spring管理 -->
45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46 <!-- 注入上面的工厂 -->
47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48 </bean>
49
50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52 <!-- 注入Spring单例工厂 -->
53 <property name="connectionFactory" ref="singleConnectionFactory"></property>
54 <!-- 设置默认的目标管道 -->
55 <property name="defaultDestinationName" value="pId"/>
56 <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->
57 <property name="pubSubDomain" value="true"/>
58 </bean>
59 </beans>
这里面的最大的变化就是将消息发布模式改为了: publish subject. ============================================ babasport-solr:
mq.xml配置文件:
代码语言:javascript复制 1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
3 xmlns:context="http://www.springframework.org/schema/context"
4 xmlns:aop="http://www.springframework.org/schema/aop"
5 xmlns:tx="http://www.springframework.org/schema/tx"
6 xmlns:task="http://www.springframework.org/schema/task"
7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
8 xsi:schemaLocation="http://www.springframework.org/schema/beans
9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10 http://www.springframework.org/schema/mvc
11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12 http://www.springframework.org/schema/context
13 http://www.springframework.org/schema/context/spring-context-4.0.xsd
14 http://www.springframework.org/schema/aop
15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16 http://www.springframework.org/schema/tx
17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18 http://www.springframework.org/schema/task
19 http://www.springframework.org/schema/task/spring-task-4.0.xsd
20 http://code.alibabatech.com/schema/dubbo
21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
25 <!-- 连接工厂, 此工厂由Apache提供 -->
26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27 <!-- 连接地址
28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
29 -->
30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31 <!-- 设置用户名及密码 -->
32 <property name="userName" value="admin"></property>
33 <property name="password" value="admin"></property>
34 </bean>
35
36 <!-- 配置连接池管理工厂 ,由Apache提供.-->
37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38 <!-- 注入工厂 -->
39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40 <!-- 设置最大连接数 -->
41 <property name="maxConnections" value="5"></property>
42 </bean>
43
44 <!-- 把上面的工厂交给Spring管理 -->
45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46 <!-- 注入上面的工厂 -->
47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48 </bean>
49
50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52 <!-- 注入Spring单例工厂 -->
53 <property name="connectionFactory" ref="singleConnectionFactory"></property>
54 <!-- 设置默认的目标管道 -->
55 <property name="defaultDestinationName" value="pId"/>
56 </bean>
57
58 <!-- 实例化一个监听到消息后 处理此消息的类 -->
59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
60
61 <!-- 配置实时监听器 -->
62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
63 <!-- 配置工厂, 需要配置spirng的工厂 -->
64 <property name="connectionFactory" ref="singleConnectionFactory"/>
65 <!-- 设置监听的目标 -->
66 <property name="destinationName" value="pId"/>
67 <!-- 监听后获取消息的类, 接收监听到的消息 -->
68 <property name="messageListener" ref="customMessageListener"></property>
69 <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->
70 <property name="pubSubDomain" value="true"/>
71 </bean>
72 </beans>
SearchServiceImpl.java: 保存商品信息到Solr服务器中, 通过ActiveMQ
代码语言:javascript复制 1 //保存商品信息到Solr服务器中, 通过ActiveMQ
2 public void insertProductToSolr(Long productId){
3 //TODO 保存商品信息到Solr服务器
4 SolrInputDocument doc = new SolrInputDocument();
5 //ID
6 doc.setField("id", productId);
7 //名称
8 Product p = productDao.selectByPrimaryKey(productId);
9 doc.setField("name_ik", p.getName());
10 //图片URL
11 doc.setField("url", p.getImgUrls()[0]);
12 //品牌 ID
13 doc.setField("brandId", p.getBrandId());
14 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1
15 SkuQuery skuQuery = new SkuQuery();
16 skuQuery.createCriteria().andProductIdEqualTo(productId);
17 skuQuery.setOrderByClause("price asc");
18 skuQuery.setPageNo(1);
19 skuQuery.setPageSize(1);
20 List<Sku> skus = skuDao.selectByExample(skuQuery);
21 doc.setField("price", skus.get(0).getPrice());
22 //...时间等 剩下的省略
23
24 try {
25 solrServer.add(doc);
26 solrServer.commit();
27 } catch (Exception e) {
28 // TODO Auto-generated catch block
29 e.printStackTrace();
30 }
31 }
CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:
代码语言:javascript复制 1 public class CustomMessageListener implements MessageListener{
2 @Autowired
3 private SearchService searchService;
4
5 @Override
6 public void onMessage(Message message) {
7 //先将接收到的消息强转为ActiveMQ类型的消息
8 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage
9 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
10 try {
11 String id = amtm.getText();
12 System.out.println("接收到的ID:" id);
13 searchService.insertProductToSolr(Long.parseLong(id));
14 } catch (JMSException e) {
15 // TODO Auto-generated catch block
16 e.printStackTrace();
17 }
18 }
19 }
=============================== babasport-cms: mq.xml:
代码语言:javascript复制 1 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
2 <!-- 连接工厂, 此工厂由Apache提供 -->
3 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
4 <!-- 连接地址
5 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
6 -->
7 <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
8 <!-- 设置用户名及密码 -->
9 <property name="userName" value="admin"></property>
10 <property name="password" value="admin"></property>
11 </bean>
12
13 <!-- 配置连接池管理工厂 ,由Apache提供.-->
14 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
15 <!-- 注入工厂 -->
16 <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
17 <!-- 设置最大连接数 -->
18 <property name="maxConnections" value="5"></property>
19 </bean>
20
21 <!-- 把上面的工厂交给Spring管理 -->
22 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
23 <!-- 注入上面的工厂 -->
24 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
25 </bean>
26
27 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
28 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
29 <!-- 注入Spring单例工厂 -->
30 <property name="connectionFactory" ref="singleConnectionFactory"></property>
31 <!-- 设置默认的目标管道 -->
32 <property name="defaultDestinationName" value="pId"/>
33 </bean>
34
35 <!-- 实例化一个监听到消息后 处理此消息的类 -->
36 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
37
38 <!-- 配置实时监听器 -->
39 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
40 <!-- 配置工厂, 需要配置spirng的工厂 -->
41 <property name="connectionFactory" ref="singleConnectionFactory"/>
42 <!-- 设置监听的目标 -->
43 <property name="destinationName" value="pId"/>
44 <!-- 监听后获取消息的类, 接收监听到的消息 -->
45 <property name="messageListener" ref="customMessageListener"></property>
46 <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->
47 <property name="pubSubDomain" value="true"/>
48 </bean>
CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:
代码语言:javascript复制 1 public class CustomMessageListener implements MessageListener{
2 @Autowired
3 private StaticPageService staticPageService;
4 @Autowired
5 private CMSService cmsService;
6
7 @Override
8 public void onMessage(Message message) {
9 //先将接收到的消息强转为ActiveMQ类型的消息
10 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage
11 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
12 try {
13 String id = amtm.getText();
14 System.out.println("CMS接收到的ID:" id);
15 Map<String, Object> root = new HashMap<String, Object>();
16
17 Product product = cmsService.selectProductById(Long.parseLong(id));
18 List<Sku> skus = cmsService.selectSkuListByProductIdWithStock(Long.parseLong(id));
19 //去掉重复的颜色
20 Set<Color> colors = new HashSet<Color>();
21 for (Sku sku : skus) {
22 colors.add(sku.getColor());
23 }
24 root.put("colors", colors);
25 root.put("product", product);
26 root.put("skus", skus);
27
28 staticPageService.index(root, id);
29 } catch (JMSException e) {
30 // TODO Auto-generated catch block
31 e.printStackTrace();
32 }
33 }
34 }
StaticPageServiceImpl.java: 静态化页面的核心类:
代码语言:javascript复制 1 public class StaticPageServiceImpl implements StaticPageService, ServletContextAware{
2 //SpringMvc 管理 conf
3 private Configuration conf;
4 public void setFreeMarkerConfig(FreeMarkerConfig freeMarkerConfig) {
5 this.conf = freeMarkerConfig.getConfiguration();
6 }
7
8 //静态化页面的方法
9 public void index(Map<String, Object> root, String id){
10 //输出目录: 通过getPath方法获取的是绝对路径
11 String path = getPath("/html/product/" id ".html");
12 File f = new File(path);
13 File parentFile = f.getParentFile();
14 if(!parentFile.exists()){
15 parentFile.mkdirs();
16 }
17
18 //spring中已经设置了模板路径:<property name="templateLoaderPath" value="/WEB-INF/ftl/" />
19 Writer out = null;
20
21 try {
22 //读
23 Template template = conf.getTemplate("product.html");
24
25 //设置输出的位置
26 //写
27 out = new OutputStreamWriter(new FileOutputStream(f), "UTF-8");
28 template.process(root, out);
29 } catch (Exception e) {
30 // TODO Auto-generated catch block
31 e.printStackTrace();
32 }finally {
33 if (out != null)
34 {
35 try {
36 out.close();
37 } catch (IOException e) {
38 // TODO Auto-generated catch block
39 e.printStackTrace();
40 }
41 }
42
43 }
44
45 }
46
47 //获取webapp下的html文件夹所在的位置
48 //将相对路径转换为绝对路径
49 public String getPath(String path){
50 return servletContext.getRealPath(path);
51 }
52
53 private ServletContext servletContext;
54 @Override
55 public void setServletContext(ServletContext servletContext) {
56 this.servletContext = servletContext;
57 }
58 }
Spring管理 freemarkerConfig配置类:
代码语言:javascript复制 1 <!-- 配置freemarker 实现类 -->
2 <bean class="cn.itcast.core.service.StaticPageServiceImpl">
3 <property name="freeMarkerConfig">
4 <bean class="org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer">
5 <!-- 设置模板所在目录或文件夹的位置, 相对路径 -->
6 <property name="templateLoaderPath" value="/WEB-INF/ftl/" />
7 <!-- 设置默认编码集 -->
8 <property name="defaultEncoding" value="UTF-8"></property>
9 </bean>
10 </property>
11 </bean>
更多关于freemarker的讲解请关注我以后的博客... 关于ActiveMQ的内容就更新到这么多.