如何开发一套分布式接口重推系统

2022-12-05 13:51:09 浏览数 (1)

功能介绍

业务开发中会有方法重推的需求,比如调用第三方系统接口,如果调用失败,需要能够重推,重推后需要更新业务信息,例如业务单据的状态更新为:推送失败(成功)。

因此决定写一套通用的接口重推功能,能实现自动重推和手动重推。并且记录的接口调用的信息。

该功能模块目录如下:

主要功能为:

代码语言:javascript复制
1.在需要记录日志的方法上面添加注解
2.调用方法,调用信息记录数据库
3.调用重试方法,重试调用异常方法

重试

日志记录注解

注解EnableLog说明:

代码语言:javascript复制
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EnableLog {
    OperationType type();

    /**
     * 接口提供方
     *
     * @return
     */
    String provider();

    String currentSystem() default StringUtils.EMPTY;

    /**
     * 类型 IN-本系统对外api or OUT-调的外部接口
     *
     * @return
     */
    ModelType model();

    /**
     * 成功标志Str
     *
     * @return
     */
    String successStr() default StringUtils.EMPTY;

    /**
     * 允许重试次数,默认-1,即可无限重试
     *
     * @return
     */
    long allowRetry() default -1;

    /**
     * 是否自动重试 0-否
     * @return
     */
    String needAuto() default SystemConstant.NO;

    /**
     * 方法描述
     *
     * @return
     */
    String desc() default StringUtils.EMPTY;
}

调用示例

代码语言:javascript复制
@EnableLog(type = OperationType.INSERT, provider = "OA", model = ModelType.OUT, successStr = "success", desc = "OA接口-消息提醒")
    public String sendNotify(OaTaskMessage oaTaskMessage) {
        OaMessageParam oaMessageParam = BeanUtils.copy(oaTaskMessage, OaMessageParam.class);
        oaMessageParam.setOpenMode("5");
        oaMessageParam.setModuleCode("srm");
        oaMessageParam.setMsgType("1");
        return oaApiClient.sendMessage(oaMessageParam);
    }

参数说明:

代码语言:javascript复制
1.type:目标方法的操作类型。增,删,改,查
2.provider:接口来源方
3.model:IN-本系统对外api or OUT-调的外部接口
4.successStr:指定接口调成功的返回
5.目标方法的描述

重试方法 (手动)

如果标出的方法调用异常,那么就可以执行这个重试方法。该方法主要执行逻辑为:

代码语言:javascript复制
1.在日志记录表获取指定主键id的异常状态方法调用记录
2.重复执行次数校验
3.反射执行方法
4.更细记录状态

重试方法(自动)

该系统默认手动重试,如果在@EnableLog注解中我们设置needAuto=true,那么系统自动重试,自动重试功能如下

代码语言:javascript复制
对注解标注的方法放入redis阻塞队列
异常重试线程消费阻塞队列
代码语言:javascript复制
//放入异常队列
exceptionResultQueueService.pushExceptionQueue(logId);

重试线程消费阻塞队列中的值:

代码语言:javascript复制
private void loop() {
    String name = Thread.currentThread().getName();
    log.info("异常方法重试线程启动,当前线程:【{}】", name);
    while (!Thread.interrupted()) {
        String apiLogId = (String) redisUtils.blockRightPop(EXCEPTION_QUEUE_KEY);
        LogExecutor.LogMethodParam param = new LogExecutor.LogMethodParam();
        param.setLogId(apiLogId);
        logExecutor.executorMethod(param);

        //每隔5s循环一次。减少资源消耗
        try {
         Thread.sleep(5000);
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
    }
}

异常重试后业务处理扩展

如果重新执行方法后需要处理业务逻辑,可以使用这个扩展接口

代码语言:javascript复制
@Component
public class ExampleRetryResultHandler implements RetryResultHandler {
    @Override
    public String resultHandler(Object obj) {
        //重试方法执行后业务处理
        return null;
    }

    @Override
    public String invokeMethodStr() {
        return "example";
    }

    @Override
    public String methodName() {
        return "sendTaskMessage";
    }
}

只需实现RetryResultHandler接口即可。

RetryResultHandler说明如下:

代码语言:javascript复制
public interface RetryResultHandler {
    /**
     * 方法结果处理器
     * @param obj
     * @return
     */
    String resultHandler(Object obj);

    /**
     * 该方法的父层调用方法,全路径
     * @return
     */
    String invokeMethodStr();

    /**
     * Enable注解标注的方法名称
     * @return
     */
    String methodName();
}

业务处理逻辑写在resultHandler()方法即可。invokeMethodStr()为该方法的父层方法(全路径),methodName()为该方法的方法名称。

至此,一套方法日志记录,方法重试的系统完成。

分布式系统调用

在分布式系统中,各系统如何调用这个公用的重试方法呢,我们需要将上述功能封装到一个SDK中,需要调用的系统就引入这个SDK。各业务系统中引入SDK后编写Controller层方法:

代码语言:javascript复制
@RestController
@RequestMapping("/apiLog")
public class ApiLogController {
    @Autowired
    private SysLogApiClient sysLogApiClient;

    /**
    * 执行调用异常的方法
    * @return
    */
    @ApiMetadata(actionName = "执行调用异常的方法", permissionLevel = PermissionLevel.PermissionRequired)
    @PostMapping(value = "executorExceptionMethod")
    @ResponseBody
    public Object executorExceptionMethod(@RequestBody LogMethodParam param) {
      return sysLogApiClient.executorExceptionMethod(param);
    }
}

同时我们需要一个公共服务,该公共服务需要写一个共前端调用的重试接口,前端传入logId,通过路由执行各个业务系统中SDK的executorExceptionMethod方法。

这样就可以实现在分布式系统中调用公共方法去执行重试了。

本片内容到这里就结束了。

0 人点赞