前言
物联网设备采集到的实时数据以csv格式文件存储,需要定时导入到mongoDB数据库,数据文件大概20多M(天),10万左右数据量。
概述
前端基于VUE AntDesign实现UI及进度条,后端采用Java Spring Boot。服务器端采用redis存储处理进度,前端以特定key定时调用获取后端redis存储的处理进度信息。
后端代码
数据上传导入接口:
代码语言:javascript复制// 导入方法
@PostMapping("/importdata")
@ResponseBody
@RepeatSubmit(interval = 1000, message = "请求过于频繁")
public AjaxResult importData(MultipartFile file,String wid,String equipmentId, boolean updateSupport)
{
String message = "ok";
try{
message = System.currentTimeMillis() "_" UUID.randomUUID().toString();
AsyncManager.me().execute(asyncTask(file.getInputStream(),wid,equipmentId,message,file.getSize()));
return AjaxResult.success(message);
}catch (Exception e){
e.printStackTrace();
message = e.getMessage();
return AjaxResult.error(message);
}
}
上传后异步线程处理数据导入,并以时间 uuid生成唯一标识,返回前端。
查询处理进度接口:
代码语言:javascript复制@GetMapping("/importdata/{uuid}")
@ResponseBody
public AjaxResult getProgressValue(@PathVariable("uuid") String uuid)
{
String message = "";
try{
String error = redisCache.getCacheObject(uuid "error");
if(error != null && !error.isEmpty()){
return AjaxResult.error(error);
}
String data = redisCache.getCacheObject(uuid);
if(data == "" || data == null){
return AjaxResult.error(uuid "此key值不存在");
}
return AjaxResult.success(message,data);
}catch (Exception e){
e.printStackTrace();
message = e.getMessage();
return AjaxResult.error(message);
}
}
根据前端传入的标识key从redis里获取处理进度,如果异步线程出现异常,error信息也缓存在redis里。
采用InputStream流式处理数据导入,使用BufferedReader方式按行读取数据,然后200条数据以batch的方式保存到mongodb数据库。
代码语言:javascript复制InputStreamReader ir = null;
BufferedReader br = null;
try{
Long totalSize = Long.valueOf(0);
List<Wits58> list = new ArrayList<Wits58>();
List<String> row = new ArrayList<String>();
List<String> fields = new ArrayList<String>();
ir = new InputStreamReader(is,"UTF-8");
br =new BufferedReader(ir);
setProgress(uuid,"0%");
while ((line = br.readLine()) != null){
totalSize = totalSize Long.valueOf(line.getBytes().length);
row.clear();
list.add(entity);
if(list.size()>200){
importCSVData(list,wid,equipmentId);
list.clear();
}
String p = String.format("%.1f", ((totalSize.doubleValue() / fileSize.doubleValue()) * 100)) "";
setProgress(uuid,p);
处理完毕:
代码语言:javascript复制redisCache.setCacheObject(uuid,
"100");
setProgress(uuid,"100");
出现异常:
代码语言:javascript复制redisCache.setCacheObject(uuid "error",
e.getMessage());
数据入库:
代码语言:javascript复制private void importCSVData(List<Wits58> dataRow,String wid,String equipmentId){
List<MongoWits58> importMongoList = new ArrayList<MongoWits58>();
for (Wits58 item :dataRow) {
item.setEquipmentID(equipmentId);
item.setWid(wid);
MongoWits58 mongoWits = MongoWits58.convertToWits58(item);
importMongoList.add(mongoWits);
}
iMongoWits58Service.addWits58Batch(importMongoList,equipmentId);
}
前端代码
页面:
代码语言:javascript复制<a-form-model-item label="请选择从采集设备导出CSV格式数据文件" :rules="[{ required: true, message: '请选择从采集设备导出CSV格式数据文件!' }]" >
<a-upload
:file-list="fileList"
:show-upload-list="true"
:remove="handleRemove"
:before-upload="beforeUpload"
>
<a-button> <a-icon type="upload" /> 选择文件</a-button>
</a-upload>
</a-form-model-item>
<a-form-model-item label="数据导入完成之前,请勿关闭此页面" >
<div class="baseinfo">
<a-progress :percent="percent" strokeWidth="26" class="baseProgess" />
</div>
</a-form-model-item>
<div class="bottom-control">
<a-button :disabled="fileList.length === 0"
:loading="uploading" type="primary" @click="submitForm">
{{ uploading ? '导入中...' : '导入' }}
</a-button>
</div>
代码
代码语言:javascript复制methods: {
handleRemove(file) {
//const index = this.fileList.indexOf(file);
//const newFileList = this.fileList.slice();
//newFileList.splice(index, 1);
this.fileList = [];
},
beforeUpload(file) {
const isCSV = file.type === 'application/vnd.ms-excel';
if (!isCSV) {
this.$message.error(`${file.name} 文件格式不正确,请选择CSV文件上传。`);
return Upload.LIST_IGNORE
}
this.fileList = [file];
return true;
},
/** 提交按钮 */
submitForm: function () {
if(this.form.wellName == "" || this.form.wellName == undefined){
this.$message.error(`请输入井号`);
return false;
}
if(this.fileList == false){
this.$message.error(`请选择CSV文件上传。`);
return false;
}
if (this.form.equipmentId !== undefined) {
const formData = new FormData();
this.fileList.forEach(file => {
formData.append("file", file);
});
formData.append("wid", this.form.wellName);
formData.append("equipmentId", this.form.equipmentCode);
this.form.formData = formData;
this.uploading = true;
importData(formData).then(response => {
console.info(response.msg) ;
this.uploadKey = response.msg;
//this.open = false
//this.uploading = false;
this.$emit('ok')
});
this.processVisible = true
this.percent = 0
let that = this
let interval = setInterval(function() {
getImportData(that.uploadKey).then(response =>{
var msg = response.msg;
console.info(msg)
if(!!msg){
clearInterval(interval);
this.$message.error(
'导入失败',
3
)
that.uploading = false;
}
var percentCount = response.data;
if(percentCount == "100"){
clearInterval(interval);
that.$message.success(
'实时数据导入成功',
3
);
that.processVisible = false
that.open = false
that.uploading = false;
}
that.percent = percentCount;
},response =>{
clearInterval(interval);
that.processVisible = false
that.uploading = false;
}
)
总结
虽然是小功能,也碰到一些问题,最初的设计直接从文件流读取全部数据
到List,然后导入mongodb,因为能获取记录总数和当前处理数,可精确
记录处理的进度。发现有时用户数据文件很大,会出现内存问题。更改为
按行读边读边入库的模式。按文件大小和已处理数据大小的比率计算处理
进度。期间也考虑过把数据文件存储到服务器上后台单开线程做数据导入
处理,这样还需要专门设计查看线程执行情况的功能,最后放弃了。