前言
循环依赖分为2类:
- RPC服务间(dubbo、http)循环依赖
- 应用间循环依赖
- Dubbo缺省会在启动时检查依赖的服务是否可用,不可用时会抛出异常,防止Spring初始化完成。这种情况我们就叫做RPC服务间循环依赖。出现了循环依赖,必须有一方先启动。所以这种问题是一定需要解决的。
- 应用间循环依赖大致情况如下: A应用调用B应用的服务,B应用也会调用A应用的服务,无论是间接调用还是直接调用。 这种循环依赖刚开始不会出现问题 ,但随着代码变更,有可能会发展为RPC服务间循环依赖。
可以通过check=”false”关闭检查来避免 Dubbo的循环依赖的报错,但是我认为这个只是权益之计。
应用间循环依赖
当前我们应用中并没有出现RPC服务间循环调用,但是出现了应用间循环调用。下面就是这个?就是这种情况,我简单描述下大致的情况。
- marketing-base会调用site-base服务中的SiteGroupService、SiteService接口用来查询集团、店铺、微店的信息。
- site-base里有监听集团初始化的消息,然后执行集团初始化,过程中会调用marketing-base的服务以初始化文章、海报数据。
上面这个例子是应用间循环依赖,一不小心可能写出暴雷的代码。这里应该尽快把这个坑填掉。
处理方案
针对应用间循环依赖,大致的解决办法
团队协作模式:
- 重新划分归属,把职责给一个方向
- 共享内核(提出一个公共服务)
通信集成模式:
- MQ解耦服务间的依赖
由于上述的2个服务早已开发完毕,且比较成熟的应用,而且直接的依赖还是比较少的。此时MQ看起来是一个比较好的方案了。
这里考虑到site-base
比marketing-base
是更加基础的服务,所以图一中上边的调用流程不变,下方更改为site-base
发送消息,然后marketing-base
来消费消息的方式。如图二所示。
Show me Code
首先,我们需要向运维或者自己在单机环境下,新创建一个Topic。我这里新创建一个名叫: newcar_siteinit_basicinfo_test 的主题。
生产者配置
在config/user/beans中添加一个spring xml 配置文件 application-mq.xml。
application-mq.xml
代码语言:javascript复制 1 <!--ons config-->
2 <bean id="producerConfig" class="com.souche.optimus.mq.aliyunons.ONSProducerConnConfig">
3 <property name="accessKey" value="${ons.access.key}"/>
4 <property name="secretKey" value="${ons.secret.key}"/>
5 </bean>
6
7 <!-- 集团初始化 发送者 -->
8 <bean id="groupInitProducerInvoker" class="com.souche.optimus.mq.aliyunons.ONSProducerInvoker">
9 <property name="producerId" value="${ons.groupinit.producer.id}"/>
10 <property name="config" ref="producerConfig"/>
11 </bean>
12
13 <bean id="groupInitProducer" class="com.souche.optimus.mq.aliyunons.ONSProducer">
14 <property name="topic" value="${ons.groupinit.topic}"/>
15 <property name="invoker" ref="groupInitProducerInvoker"/>
16 </bean>
ons.properties
增加ons.properties 在config/optimus/properties文件夹下
代码语言:javascript复制1ons.access.key = K8pfCPRU6gL2lldi
2ons.secret.key = U3lYVGl3L9nb23cEMogWcUVziLJ2T7
3
4#初始化主题
5ons.groupinit.topic = newcar_siteinit_basicinfo_test
6ons.groupinit.producer.id = PID_newcar_siteinit_basicinfo_test
生产者逻辑代码
代码语言:javascript复制 1private void sendMsg(SiteGroupInfoDO siteGroupInfoDO) {
2 if (siteGroupInfoDO.getId() == null) {
3 return;
4 }
5 Map<String, Object> map = new HashMap<>();
6 map.put("id",siteGroupInfoDO.getId());
7 map.put("groupCode",siteGroupInfoDO.getGroupCode());
8 map.put("groupName",siteGroupInfoDO.getGroupName());
9 map.put("appName",siteGroupInfoDO.getAppName());
10 map.put("domainSuffix",siteGroupInfoDO.getDomainSuffix());
11 String uuid = UUIDUtil.getID();
12 mqProducer.send(map, uuid, CommonConstant.MQ_TAG);
13 LOGGER.info("发送消息:body: {}, keys: {}, tag: {}", map, uuid, CommonConstant.MQ_TAG);
14 }
- 每个消息都需要有一个唯一的业务ID, 即第二个参数, 也是强制要求传入的. 可以用订单ID, 用户ID, 如果不好确定就直接用UUIDUtil.getId()
- 第三个参数, 尽量用一个队列的不同的tag去区分一个业务/系统中的不同消息, 尽量不要用*或者默认的值
消费者配置
application-mq.xml
代码语言:javascript复制1 <bean id="contentPlatformMsgConsumer" class="com.souche.marketing.base.biz.mq.ContentPlatformMsgConsumer"/>
2 <bean id="tgcArticleInvoker" class="com.souche.optimus.mq.aliyunons.ONSConsumerInvoker">
3 <property name="config" ref="consumerConfig"/>
4 <property name="reciver" ref="contentPlatformMsgConsumer"/>
5 <property name="consumerId" value="${mq.ons.consumer.newcar.article.id}"/>
6 <property name="topic" value="${mq.ons.consumer.newcar.article.topic}"/>
7 <!--<property name="tag" value="${mq.aliyun.car.center.tag}"/> <!–tag选填, 如果不填将监听该topic下的所有消息–>-->
8 <property name="enabled" value="true"/>
9 </bean>
这里有一个很重要的点是,我们的消费者ID必须从阿里云控制台上建立或者联系运维创建,图示如下:
ons.properties
代码语言:javascript复制1ons.access.key = K8pfCPRU6gL2lldi
2ons.secret.key = U3lYVGl3L9nb23cEMogWcUVziLJ2T7
3
4#初始化主题
5mq.ons.consumer.newcar.siteinit.topic = newcar_siteinit_basicinfo_test
6mq.ons.consumer.newcar.siteinit.id = GID_newcar_siteinit_basicinfo_test
消费者逻辑代码
代码语言:javascript复制 1/**
2 * @author james mu
3 * @date 2019/11/7 10:16
4 */
5public class GroupInitMsgConsumer implements MQConsumer {
6
7 public static final Logger LOGGER = LoggerFactory.getLogger(GroupInitMsgConsumer.class);
8
9 @Resource(name = "articleServiceForManageProvider")
10 private ArticleServiceForManage articleServiceForManage;
11
12 @Resource(name = "posterServiceForAppProvider")
13 private PosterServiceForApp posterServiceForApp;
14
15 @Override
16 public ConsumeResult onRecived(Map<String, Object> map) {
17 if (CollectionUtils.isEmpty(map)) {
18 LOGGER.warn("msg is empty");
19 return ConsumeResult.CommitMessage;
20 }
21 JSONObject msg = new JSONObject(map);
22 Integer groupId = msg.getInteger("id");
23 String groupCode = msg.getString("groupCode");
24 String groupName = msg.getString("groupName");
25 String appName = msg.getString("appName");
26 String domainSuffix = msg.getString("domainSuffix");
27 LOGGER.info("groupId: {}, groupCode: {}, groupName: {}, appName: {}, domainSuffix: {}", groupId, groupCode, groupName, appName, domainSuffix);
28
29 if (groupId == null) {
30 LOGGER.warn("param is miss");
31 return ConsumeResult.CommitMessage;
32 } else {
33 articleServiceForManage.importSubjectForGroup(groupId);
34 SiteGroupInfoDTO siteGroupInfoDTO = new SiteGroupInfoDTO();
35 siteGroupInfoDTO.setId(groupId);
36 siteGroupInfoDTO.setGroupCode(groupCode);
37 siteGroupInfoDTO.setGroupName(groupName);
38 siteGroupInfoDTO.setAppName(appName);
39 siteGroupInfoDTO.setDomainSuffix(domainSuffix);
40 posterServiceForApp.initPosterByGroup(siteGroupInfoDTO);
41 }
42 return ConsumeResult.CommitMessage;
43 }
44
45}
在消费消息的业务代码中,大家一定要注意处理幂等性的问题,防止多次消费消息,导致业务的出错。在此,相信大家已经了解怎么清除循环依赖的思路和处理了。