最大努力通知【分布式事务解决方案】

2021-05-14 16:20:25 浏览数 (1)

一、概述


最大努力通知也是一种解决分布式事务的方案,下面是一个充值的例子:

通过上边的例子我们总结最大努力通知方案的目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。 具体包括: 【1】有一定的消息重复通知机制。 因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。 【2】消息校对机制。 如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

最大努力通知与可靠消息一致性有什么不同: 【1】解决方案思想不同:可靠消息一致性发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。 最大努力通知,发起通知方尽最大的努力将业务处理结果通知给接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。 【2】两者的业务应用场景不同:可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。 最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。 【3】技术解决方向不同:可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。 最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)

二、解决方案


方案一:本方案是利用 MQ的 ack机制由 MQ向接收通知方发送通知,流程如下:

方案二:本方案也是利用 MQ的 ack机制,与方案一不同的是应用程序向接收通知方发送通知,如下图:

方案一和方案二的不同点: 1、方案一中接收通知方与 MQ对接,即接收通知方监听 MQ,此方案主要应用与内部应用之间的通知。 2、方案二中由通知程序与 MQ对接,通知程序监听MQ,收到 MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。

三、RocketMQ实现最大努力通知型事务


业务说明:本实例通过 RocketMQ中间件实现最大努力通知分布式事务,模拟充值过程。 本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是 bank1数据库,其中有张三账户。充值系统的数据库使用 bank1_pay数据库,记录了账户的充值记录。 业务流程如下图:

导入 Maven依赖:

代码语言:javascript复制
<dependency> 
    <groupId>org.apache.rocketmq</groupId> 
    <artifactId>rocketmq‐spring‐boot‐starter</artifactId> 
    <version>2.0.2</version> 
</dependency>

配置 RocketMQ: 在 application-local.propertis中配置 RocketMQ nameServer地址及生产组:

代码语言:javascript复制
rocketmq.producer.group = producer_bank2 
rocketmq.name‐server = 127.0.0.1:9876

核心代码支付系统服务端实现,发送通知消息服务。实现如下功能: 【1】充值接口; 【2】充值完成要通知; 【3】充值结果查询接口;

代码语言:javascript复制
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService {

    @Autowired
    AccountPayDao accountPayDao;

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    //插入充值记录
    @Transactional
    @Override
    public AccountPay insertAccountPay(AccountPay accountPay) {
        int success = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
        if(success>0){
            //发送通知,使用普通消息发送通知
            accountPay.setResult("success");
            rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);
            return accountPay;
        }
        return null;
    }

    //查询充值记录,接收通知方调用此方法来查询充值结果
    @Override
    public AccountPay getAccountPay(String txNo) {
        AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
        return accountPay;
    }
}

核心代码:被通知服务 Service端代码实现如下:

代码语言:javascript复制
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    PayClient payClient;

    //更新账户金额
    @Override
    @Transactional
    public void updateAccountBalance(AccountChangeEvent accountChange) {
        //幂等校验
        if(accountInfoDao.isExistTx(accountChange.getTxNo())>0){
            return ;
        }
        int i = accountInfoDao.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
        //插入事务记录,用于幂等控制
        accountInfoDao.addTx(accountChange.getTxNo());
    }

    //远程调用查询充值结果,这里的事务ID指的是消息的唯一ID,如果是事务生成的ID,消息接收端在没有收到消息的时候是不可能知道的。
    @Override
    public AccountPay queryPayResult(String tx_no) {

        //远程调用
        AccountPay payresult = payClient.payresult(tx_no);
        if("success".equals(payresult.getResult())){
            //更新账户金额
            AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
            accountChangeEvent.setAccountNo(payresult.getAccountNo());//账号
            accountChangeEvent.setAmount(payresult.getPayAmount());//金额
            accountChangeEvent.setTxNo(payresult.getId());//充值事务号
            updateAccountBalance(accountChangeEvent);
        }
        return payresult;
    }
}

核心代码:被通知服务监听类代码如下:

代码语言:javascript复制
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;

@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1")
public class NotifyMsgListener implements RocketMQListener<AccountPay> {

    @Autowired
    AccountInfoService accountInfoService;

    //接收消息
    @Override
    public void onMessage(AccountPay accountPay) {
        log.info("接收到消息:{}", JSON.toJSONString(accountPay));
        if("success".equals(accountPay.getResult())){
            //更新账户金额
            AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
            accountChangeEvent.setAccountNo(accountPay.getAccountNo());
            accountChangeEvent.setAmount(accountPay.getPayAmount());
            accountChangeEvent.setTxNo(accountPay.getId());
            accountInfoService.updateAccountBalance(accountChangeEvent);
        }
        log.info("处理消息完成:{}", JSON.toJSONString(accountPay));
    }
}

四、小结


最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务; 最大努力通知方案需要实现如下功能: 【1】消息重复通知机制; 【2】消息校对机制;

本文来源程序猿进阶,由javajgs_com转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处

0 人点赞