大数据量文件导入实时更新进度条实现

2022-11-18 15:32:18 浏览数 (1)

前言

物联网设备采集到的实时数据以csv格式文件存储,需要定时导入到mongoDB数据库,数据文件大概20多M(天),10万左右数据量。

概述

前端基于VUE AntDesign实现UI及进度条,后端采用Java Spring Boot。服务器端采用redis存储处理进度,前端以特定key定时调用获取后端redis存储的处理进度信息。

后端代码

数据上传导入接口:

代码语言:javascript复制
// 导入方法
@PostMapping("/importdata")
@ResponseBody
@RepeatSubmit(interval = 1000, message = "请求过于频繁")
public AjaxResult importData(MultipartFile file,String wid,String equipmentId, boolean updateSupport)
{
    String message = "ok";
    try{
        message = System.currentTimeMillis()   "_"   UUID.randomUUID().toString();
        AsyncManager.me().execute(asyncTask(file.getInputStream(),wid,equipmentId,message,file.getSize()));
        return AjaxResult.success(message);
    }catch (Exception e){
        e.printStackTrace();
        message = e.getMessage();
        return  AjaxResult.error(message);
    }
}

上传后异步线程处理数据导入,并以时间 uuid生成唯一标识,返回前端。

查询处理进度接口:

代码语言:javascript复制
@GetMapping("/importdata/{uuid}")
@ResponseBody
public AjaxResult getProgressValue(@PathVariable("uuid") String uuid)
{
    String message = "";
    try{
        String error = redisCache.getCacheObject(uuid  "error");

        if(error != null && !error.isEmpty()){
            return  AjaxResult.error(error);
        }
        String data = redisCache.getCacheObject(uuid);
        if(data == "" || data == null){
            return  AjaxResult.error(uuid   "此key值不存在");
        }
        return AjaxResult.success(message,data);
    }catch (Exception e){
        e.printStackTrace();
        message = e.getMessage();
        return  AjaxResult.error(message);
    }
}

根据前端传入的标识key从redis里获取处理进度,如果异步线程出现异常,error信息也缓存在redis里。

采用InputStream流式处理数据导入,使用BufferedReader方式按行读取数据,然后200条数据以batch的方式保存到mongodb数据库。

代码语言:javascript复制
InputStreamReader ir = null;
BufferedReader br = null;
try{
    Long totalSize = Long.valueOf(0);
    List<Wits58> list = new ArrayList<Wits58>();
    List<String> row = new ArrayList<String>();
    List<String> fields = new ArrayList<String>();
    ir = new InputStreamReader(is,"UTF-8");
br =new BufferedReader(ir);





setProgress(uuid,"0%");
while ((line = br.readLine()) != null){
    totalSize = totalSize   Long.valueOf(line.getBytes().length);
row.clear();





list.add(entity);
if(list.size()>200){
    importCSVData(list,wid,equipmentId);
    list.clear();

}





String p = String.format("%.1f", ((totalSize.doubleValue() / fileSize.doubleValue()) * 100))   "";

setProgress(uuid,p);

处理完毕:

代码语言:javascript复制
redisCache.setCacheObject(uuid,
        "100");
setProgress(uuid,"100");

出现异常:

代码语言:javascript复制
redisCache.setCacheObject(uuid  "error",
e.getMessage());

数据入库:

代码语言:javascript复制
private void importCSVData(List<Wits58> dataRow,String wid,String equipmentId){
    List<MongoWits58> importMongoList = new ArrayList<MongoWits58>();
    for (Wits58 item :dataRow) {
        item.setEquipmentID(equipmentId);
        item.setWid(wid);
        MongoWits58 mongoWits = MongoWits58.convertToWits58(item);
        importMongoList.add(mongoWits);

    }
    iMongoWits58Service.addWits58Batch(importMongoList,equipmentId);
}

前端代码

页面:

代码语言:javascript复制
<a-form-model-item label="请选择从采集设备导出CSV格式数据文件" :rules="[{ required: true, message: '请选择从采集设备导出CSV格式数据文件!' }]"  >

      <a-upload

        :file-list="fileList"

        :show-upload-list="true"

        :remove="handleRemove"

        :before-upload="beforeUpload"

      >

        <a-button> <a-icon type="upload" /> 选择文件</a-button>

      </a-upload>



      </a-form-model-item>



      <a-form-model-item label="数据导入完成之前,请勿关闭此页面"  >

      <div class="baseinfo">

        <a-progress :percent="percent" strokeWidth="26" class="baseProgess" />

      </div>

     </a-form-model-item>

     

     <div class="bottom-control">

       <a-button  :disabled="fileList.length === 0"

      :loading="uploading" type="primary" @click="submitForm">

             {{ uploading ? '导入中...' : '导入' }}

          </a-button>

     </div>

代码

代码语言:javascript复制
methods: {



    handleRemove(file) {

      //const index = this.fileList.indexOf(file);

      //const newFileList = this.fileList.slice();

      //newFileList.splice(index, 1);

      this.fileList = [];

    },

    beforeUpload(file) {

        const isCSV = file.type === 'application/vnd.ms-excel';

        if (!isCSV) {

           

            this.$message.error(`${file.name} 文件格式不正确,请选择CSV文件上传。`);

            return Upload.LIST_IGNORE

        }



        this.fileList = [file];

        return true;

    },





/** 提交按钮 */

    submitForm: function () {



      if(this.form.wellName == "" || this.form.wellName == undefined){

        this.$message.error(`请输入井号`);

        return false;

      }



      if(this.fileList == false){

        this.$message.error(`请选择CSV文件上传。`);

        return false;

      }

     

      if (this.form.equipmentId !== undefined) {        

            const formData = new FormData();

            this.fileList.forEach(file => {

                formData.append("file", file);

            });



            formData.append("wid", this.form.wellName);

            formData.append("equipmentId", this.form.equipmentCode);

            this.form.formData = formData;

            this.uploading = true;

            importData(formData).then(response => {

              console.info(response.msg)  ;

              this.uploadKey = response.msg;

             

              //this.open = false

              //this.uploading = false;

              this.$emit('ok')

            });



            this.processVisible = true

            this.percent = 0

            let that = this

            let interval = setInterval(function() {




              getImportData(that.uploadKey).then(response =>{

                var msg = response.msg;

                console.info(msg)

                if(!!msg){

                  clearInterval(interval);

                  this.$message.error(

                '导入失败',

                3

                )

                  that.uploading = false;

                }



                var percentCount = response.data;

                if(percentCount == "100"){

                  clearInterval(interval);

                 

                  that.$message.success(

                  '实时数据导入成功',

                  3

                  );



                  that.processVisible = false

                   



                  that.open = false

                  that.uploading = false;



                }



                that.percent = percentCount;

               

              },response =>{



                clearInterval(interval);

                  that.processVisible = false

                  that.uploading = false;



              }

               

              )

总结

虽然是小功能,也碰到一些问题,最初的设计直接从文件流读取全部数据

到List,然后导入mongodb,因为能获取记录总数和当前处理数,可精确

记录处理的进度。发现有时用户数据文件很大,会出现内存问题。更改为

按行读边读边入库的模式。按文件大小和已处理数据大小的比率计算处理

进度。期间也考虑过把数据文件存储到服务器上后台单开线程做数据导入

处理,这样还需要专门设计查看线程执行情况的功能,最后放弃了。

0 人点赞