使用Spring AOP实现异步文件上传

2022-08-29 12:09:25 浏览数 (1)

点击上方“芋道源码”,选择“设为星标”

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 10:33 更新文章,每天掉亿点点头发...

源码精品专栏

  • 原创 | Java 2021 超神之路,很肝~
  • 中文详细注释的开源项目
  • RPC 框架 Dubbo 源码解析
  • 网络应用框架 Netty 源码解析
  • 消息中间件 RocketMQ 源码解析
  • 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析
  • 作业调度中间件 Elastic-Job 源码解析
  • 分布式事务中间件 TCC-Transaction 源码解析
  • Eureka 和 Hystrix 源码解析
  • Java 并发源码

来源:juejin.cn/post/

7102343528525037576

  • 前言
  • 代码与实现
  • 结语

前言

相信很多系统里都有这一种场景:用户上传Excel,后端解析Excel生成相应的数据,校验数据并落库。这就引发了一个问题:如果Excel的行非常多,或者解析非常复杂,那么解析 校验的过程就非常耗时。如果接口是一个同步的接口,则非常容易出现接口超时,进而返回的校验错误信息也无法展示给前端,这就需要从功能上解决这个问题。一般来说都是启动一个子线程去做解析工作,主线程正常返回,由子线程记录上传状态 校验结果到数据库。同时提供一个查询页面用于实时查询上传的状态和校验信息。

多线程处理导入excel

进一步的,如果我们每一个上传的任务都写一次线程池异步 日志记录的代码就显得非常冗余。同时,非业务代码也侵入了业务代码导致代码可读性下降。从通用性的角度上讲,这种业务场景非常适合模板方法的设计模式。即设计一个抽象类,定义上传的抽象方法,同时实现记录日志的方法,例如:

代码语言:javascript复制
//伪代码,省略了一些步骤
@Slf4j
public abstract class AbstractUploadService<T> {
   public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("-upload-pool-%d")
      .setPriority(Thread.NORM_PRIORITY).build();
   public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(10, 20, 300L,
      TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());

   protected abstract String upload(List<T> data);

   protected void execute(String userName, List<T> data) {
      // 生成一个唯一编号
      String uuid = UUID.randomUUID().toString().replace("-", "");
      uploadExecuteService.submit(() -> {
         // 记录日志
         writeLogToDb(uuid, userName, updateTime, "导入中");
         // 一个字符串,用于记录upload的校验信息
         String errorLog = "";
         //执行上传
         try {
            errorLog = upload(data);
            writeSuccess(uuid, "导入中", updateTime);
         } catch (Exception e) {
            LOGGER.error("导入错误", e);
            //计入导入错误日志
            writeFailToDb(uuid, "导入失败", e.getMessage(), updateTime);
         }
         /**
          * 检查一下upload是不是返回了错误日志,如果有,需要注意记录
          *
          * 因为错误日志可能比较长,
          * 可以写入一个文件然后上传到公司的文件服务器,
          * 然后在查看结果的时候允许用户下载该文件,
          * 这里不展开只做示意
          */
         if (StringUtils.isNotEmpty(errorLog)) {
            writeFailToDb(uuid, "导入失败", errorLog, updateTime);
         }

      });
   }
}

如上文所示,模板方法的方式虽然能够极大地减少重复代码,但是仍有下面两个问题:

  • upload方法得限定死参数结构,一旦有变化,不是很容易更改参数类型or数量
  • 每个上传的service还是要继承一下这个抽象类,还是不够简便和优雅

为解决上面两个问题,我也经常进行思考,结果在某次自定义事务提交or回滚的方法的时候得到了启发。这个上传的逻辑过程和事务提交的逻辑过程非常像,都是在实际操作前需要做初始化操作,然后在异常或者成功的时候做进一步操作。这种完全可以通过环装切面的方式实现,由此,我写了一个小轮子给团队使用。(当然了,这个小轮子在本人所在的大团队内部使用的很好,但是不一定适合其他人,但是思路一样,大家可以扩展自己的功能)

「多说无益,上代码!」

基于 Spring Boot MyBatis Plus Vue & Element 实现的后台管理系统 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

代码与实现

首先定义一个日志实体

代码语言:javascript复制
public class FileUploadLog {
   private Integer id;
    // 唯一编码
    private String batchNo;
    // 上传到文件服务器的文件key
    private String key;
    // 错误日志文件名
    private String fileName;
    //上传状态
    private Integer status;
    //上传人
    private String createName;
    //上传类型
    private String uploadType;
    //结束时间
    private Date endTime;
    // 开始时间
    private Date startTime;
}

然后定义一个上传的类型枚举,用于记录是哪里操作的

代码语言:javascript复制
public enum UploadType {
   未知(1,"未知"),
   类型2(2,"类型2"),
   类型1(3,"类型1");
   
   private int code;
   private String desc;
   private static Map<Integer, UploadType> map = new HashMap<>();
   static {
      for (UploadType value : UploadType.values()) {
         map.put(value.code, value);
      }
   }

   UploadType(int code, String desc) {
      this.code = code;
      this.desc = desc;
   }

   public int getCode() {
      return code;
   }

   public String getDesc() {
      return desc;
   }

   public static UploadType getByCode(Integer code) {
      return map.get(code);
   }
}

最后,定义一个注解,用于标识切点

代码语言:javascript复制
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Upload {
   // 记录上传类型
   UploadType type() default UploadType.未知;
}

然后,编写切面

代码语言:javascript复制
@Component
@Aspect
@Slf4j
public class UploadAspect {
   public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("upload-pool-%d")
      .setPriority(Thread.NORM_PRIORITY).build();
   public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(10, 20, 300L,
      TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());


   @Pointcut("@annotation(com.aaa.bbb.Upload)")
   public void uploadPoint() {}

   @Around(value = "uploadPoint()")
   public Object uploadControl(ProceedingJoinPoint pjp) {
       // 获取方法上的注解,进而获取uploadType
      MethodSignature signature = (MethodSignature)pjp.getSignature();
      Upload annotation = signature.getMethod().getAnnotation(Upload.class);
      UploadType type = annotation == null ? UploadType.未知 : annotation.type();
      // 获取batchNo
      String batchNo = UUID.randomUUID().toString().replace("-", "");
      // 初始化一条上传的日志,记录开始时间
      writeLogToDB(batchNo, type, new Date)
      // 线程池启动异步线程,开始执行上传的逻辑,pjp.proceed()就是你实现的上传功能
      uploadExecuteService.submit(() -> {
         try {
            String errorMessage = pjp.proceed();
            // 没有异常直接成功
            if (StringUtils.isEmpty(errorMessage)) {
                // 成功,写入数据库,具体不展开了
                writeSuccessToDB(batchNo);
            } else {
                // 失败,因为返回了校验信息
                fail(errorMessage, batchNo);
            }
         } catch (Throwable e) {
            LOGGER.error("导入失败:", e);
            // 失败,抛了异常,需要记录
            fail(e.toString(), batchNo);
         }
      });
      return new Object();
   }

   private void fail(String message, String batchNo) {
       // 生成上传错误日志文件的文件key
      String s3Key = UUID.randomUUID().toString().replace("-", "");
      // 生成文件名称
      String fileName = "错误日志_"  
         DateUtil.dateToString(new Date(), "yyyy年MM月dd日HH时mm分ss秒")   ExportConstant.txtSuffix;
      String filePath = "/home/xxx/xxx/"   fileName;
      // 生成一个文件,写入错误数据
      File file = new File(filePath);
      OutputStream outputStream = null;
      try {
         outputStream = new FileOutputStream(file);
         outputStream.write(message.getBytes());

      } catch (Exception e) {
         LOGGER.error("写入文件错误", e);
      } finally {
         try {
            if (outputStream != null)
               outputStream.close();
         } catch (Exception e) {
            LOGGER.error("关闭错误", e);
         }
      }
      // 上传错误日志文件到文件服务器,我们用的是s3
      upFileToS3(file, s3Key);
      // 记录上传失败,同时记录错误日志文件地址到数据库,方便用户查看错误信息
      writeFailToDB(batchNo, s3Key, fileName);
      // 删除文件,防止硬盘爆炸
      deleteFile(file)
   }

}

至此整个异步上传功能就完成了,是不是很简单?(笑)

那么怎么使用呢?更简单,只需要在service层加入注解即可,顶多就是把错误信息return出去。

代码语言:javascript复制
@Upload(type = UploadType.类型1)
public String upload(List<ClassOne> items)  {
   if (items == null || items.size() == 0) {
      return;
   }
   //校验
   String error = uploadCheck(items);
   if (StringUtils.isNotEmpty) {
       return error;
   }
   //删除旧的
   deleteAll();
   //插入新的
   batchInsert(items);
}

基于 Spring Cloud Alibaba Gateway Nacos RocketMQ Vue & Element 实现的后台管理系统 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

结语

写了个小轮子提升团队整体开发效率感觉真不错。程序员的最高品质就是解放双手(偷懒?),然后成功的用自己写的代码把自己干毕业。。。。。。



欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

代码语言:javascript复制
文章有帮助的话,在看,转发吧。谢谢支持哟 (*^__^*)

0 人点赞