一、概述
最大努力通知也是一种解决分布式事务的方案,下面是一个充值的例子:
通过上边的例子我们总结最大努力通知方案的目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。 具体包括: 【1】有一定的消息重复通知机制。 因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。 【2】消息校对机制。 如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。
最大努力通知与可靠消息一致性有什么不同: 【1】解决方案思想不同:可靠消息一致性发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。 最大努力通知,发起通知方尽最大的努力将业务处理结果通知给接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。 【2】两者的业务应用场景不同:可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。 最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。 【3】技术解决方向不同:可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。 最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)
二、解决方案
方案一:本方案是利用 MQ的 ack机制由 MQ向接收通知方发送通知,流程如下:
方案二:本方案也是利用 MQ的 ack机制,与方案一不同的是应用程序向接收通知方发送通知,如下图:
方案一和方案二的不同点: 1、方案一中接收通知方与 MQ对接,即接收通知方监听 MQ,此方案主要应用与内部应用之间的通知。 2、方案二中由通知程序与 MQ对接,通知程序监听MQ,收到 MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。
三、RocketMQ实现最大努力通知型事务
业务说明:本实例通过 RocketMQ中间件实现最大努力通知分布式事务,模拟充值过程。 本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是 bank1数据库,其中有张三账户。充值系统的数据库使用 bank1_pay数据库,记录了账户的充值记录。 业务流程如下图:
导入 Maven依赖:
代码语言:javascript复制<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq‐spring‐boot‐starter</artifactId>
<version>2.0.2</version>
</dependency>
配置 RocketMQ: 在 application-local.propertis中配置 RocketMQ nameServer地址及生产组:
代码语言:javascript复制rocketmq.producer.group = producer_bank2
rocketmq.name‐server = 127.0.0.1:9876
【核心代码】:支付系统服务端实现,发送通知消息服务。实现如下功能: 【1】充值接口; 【2】充值完成要通知; 【3】充值结果查询接口;
代码语言:javascript复制import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService {
@Autowired
AccountPayDao accountPayDao;
@Autowired
RocketMQTemplate rocketMQTemplate;
//插入充值记录
@Transactional
@Override
public AccountPay insertAccountPay(AccountPay accountPay) {
int success = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
if(success>0){
//发送通知,使用普通消息发送通知
accountPay.setResult("success");
rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);
return accountPay;
}
return null;
}
//查询充值记录,接收通知方调用此方法来查询充值结果
@Override
public AccountPay getAccountPay(String txNo) {
AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
return accountPay;
}
}
核心代码:被通知服务 Service端代码实现如下:
代码语言:javascript复制import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
@Autowired
PayClient payClient;
//更新账户金额
@Override
@Transactional
public void updateAccountBalance(AccountChangeEvent accountChange) {
//幂等校验
if(accountInfoDao.isExistTx(accountChange.getTxNo())>0){
return ;
}
int i = accountInfoDao.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
//插入事务记录,用于幂等控制
accountInfoDao.addTx(accountChange.getTxNo());
}
//远程调用查询充值结果,这里的事务ID指的是消息的唯一ID,如果是事务生成的ID,消息接收端在没有收到消息的时候是不可能知道的。
@Override
public AccountPay queryPayResult(String tx_no) {
//远程调用
AccountPay payresult = payClient.payresult(tx_no);
if("success".equals(payresult.getResult())){
//更新账户金额
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(payresult.getAccountNo());//账号
accountChangeEvent.setAmount(payresult.getPayAmount());//金额
accountChangeEvent.setTxNo(payresult.getId());//充值事务号
updateAccountBalance(accountChangeEvent);
}
return payresult;
}
}
核心代码:被通知服务监听类代码如下:
代码语言:javascript复制import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1")
public class NotifyMsgListener implements RocketMQListener<AccountPay> {
@Autowired
AccountInfoService accountInfoService;
//接收消息
@Override
public void onMessage(AccountPay accountPay) {
log.info("接收到消息:{}", JSON.toJSONString(accountPay));
if("success".equals(accountPay.getResult())){
//更新账户金额
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(accountPay.getAccountNo());
accountChangeEvent.setAmount(accountPay.getPayAmount());
accountChangeEvent.setTxNo(accountPay.getId());
accountInfoService.updateAccountBalance(accountChangeEvent);
}
log.info("处理消息完成:{}", JSON.toJSONString(accountPay));
}
}
四、小结
最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务; 最大努力通知方案需要实现如下功能: 【1】消息重复通知机制; 【2】消息校对机制;
本文来源程序猿进阶,由javajgs_com转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处