【Node】大数据导出

2022-02-23 14:44:03 浏览数 (1)

简单的导出接口,无非就是处理数据,写入文件,返回文件响应

但是如果处理超大的数据,比如几百万条甚至以上,服务压力就很大,这样处理就肯定会挂掉

所以我们对导出接口做了一波优化

简单说

数据分批处理 文件流 逐段响应流

1、文件流,是为了节省内存

2、数据分批处理,同样也是节省内存,一次性处理太多数据同样消耗内存

3、逐段响应流,是为了避免大数据处理耗时太久,用户端一直转圈无实际响应,导致体验不好,所以数据处理完一批之后就马上响应,及时反馈。

本文实践性,主要是代码

文件流

以流的形式处理文件可以节省内存,不需要把整个文件都放入内存中,只需要分配一小块内存缓冲区

下面这个图就十分地形象

图片来自

https://www.cnblogs.com/vajoy/p/6349817.html

举一个例子来自 你所需要知道的关于 Node.js Streams 的一切

下面将会使用 流 [fs.createFileStream] 读取整个文件 [fs.readFile] 的 形式 来 对比一下 内存的消耗

1、首先创造一个400M的 文件

代码语言:javascript复制
const fs = require("fs");
const file = fs.createWriteStream("./big.file");
for (let i = 0; i <= 1e6; i  ) {
  file.write(
    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.n"
  );
}
file.end();

2、fs.readFile 读取整个文件

代码语言:javascript复制
const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
    res.end(data);
  });
})
server.listen(8000);

运行时内存大概占用 6M

当开始请求服务器时,内存飙升到 432M

readFile 读取文件把 整个文件都 放到了内存中,如果多并发几个请求,估计内存直接爆炸

3、fs.readFileStream 读取文件流

代码语言:javascript复制
const fs = require("fs");
const server = require("http").createServer();
server.on("request", (req, res) => {
  const src = fs.createReadStream("./big.file");
  src.pipe(res);
});
server.listen(8000);

Node 内存仅仅升高了 十几M

4、npm 包

之前我们是使用 【node-xlsx】这个npm 包去处理 excel 导出,直接build 完了整个文件放在内存中之后,然后才开始响应给用户端,数据大起来之后就直接 over 了

代码语言:javascript复制
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
  if (err) throw err;
  const arrData=[...] // 几百万条数据
  const buffer = xlsx.build([{ name: '名字', data: arrData }]);
  res.end(buffer);
});
server.listen(8000);

现在我们换了一个 可以直接文件流的 npm 包 【exceljs】,可以直接新建一个 stream,减少内存消耗

大概用法流程是这样

代码语言:javascript复制
const ExcelJS = require('exceljs');
const tempFilePath = `./xlsx/${Date.now()}.xlsx`;

// 创建一个流式写入器,指定写入的文件路径
const workbook = new ExcelJS.stream.xlsx.WorkbookWriter({
  filename: tempFilePath,
}); 
// 添加工作表
const sheet = workbook.addWorksheet('My Sheet');
// 往工作表一条条插入数据
for (let i = 0; i < 10; i  ) {
  sheet.addRow(['xxxx', 'xxx']); // 添加行,commit()是将添加的行提交
}
// 提交工作表 My Sheet
sheet.commit(); 
// 数据写入完成,交工作簿
workbook.commit()

然后就会在 xlsx 目录下 看到一个 excel 文件

上面我们创建文件流的的时候,指定了一个文件路径,是因为得有同一个写入数据的端

当我们在作为接口响应的时候,就没必要指定文件了

因为 HTTP 响应对象(也就是上面代码中的 res)是一个可写流

我们通过exceljs 可以直接拿到 读流,然后把 两者pipe 起来就可以了

比如下面这样

代码语言:javascript复制
const server = require('http').createServer();
const ExcelJS = require('exceljs');
var data = ['xxxx', 'xxxxx', 'xxxxxxxx'];

server.on('request', async (req, res) => {
  // 创建一个流,直接可以获取 文件流
  const workbook = new ExcelJS.stream.xlsx.WorkbookWriter(); 
  // 添加工作表
  const sheet = workbook.addWorksheet('My Sheet');
  // 添加行,commit()是将添加的行提交
  for (let i = 0; i < 10; i  ) {
    sheet.addRow(data); 
  }
  // 提交工作表 My Sheet
  sheet.commit(); 
  // 交工作簿,即写入文件
  workbook.commit(); 
  // 连接 响应流 和 文件读流
  workbook.stream.pipe(res);
});
server.listen(8000);

exceljs 更多api 用法请看

https://github.com/exceljs/exceljs/blob/master/README_zh.md

数据分批处理

在我们实际的运行情况中,发现虽然用了文件流,但是下载大文件时内存依旧会爆炸,因为一次性处理 几十万甚至上百万的数据,也仍然十分消耗内存

并且我们还会对查出来的数据 做二次处理,比如 翻译字段等工作,使得消耗就更大

以 处理 一百万数据为例

1、一次性处理

直接一个for循环处理所有数据

代码语言:javascript复制
const server = require("http").createServer();
const ExcelJS = require("exceljs");

var data=['xxxx','xxxxx','xxxxxxxx']
server.on("request", async (req, res) => {
  // 创建一个流
  const workbook = new ExcelJS.stream.xlsx.WorkbookWriter();
  // 添加工作表
  const sheet = workbook.addWorksheet("My Sheet"); 
  // 添加行,commit()是将添加的行提交
  for (let i = 0; i < 1000000; i  ) {
    sheet.addRow(data); 
  }
  // 提交工作表
  sheet.commit(); 
  await workbook.commit(); // 交工作簿,即写入文件
  // 连接 响应流 和 文件读流
  workbook.stream.pipe(res);
});
server.listen(8000);

结果内存爆炸了,直接溢出了,服务失去响应

2、数据分页 异步任务

所以我们通常要避免一次性运算处理太多数据的时候,都是采用分页处理,这里也是

处理完一页数据之后,添加一个异步任务去查询接着处理下一页数据

代码语言:javascript复制
const server = require("http").createServer();
const ExcelJS = require("exceljs");
var data=['xxxx','xxxxx','xxxxxxxx']
server.on("request", async (req, res) => {
  const workbook = new ExcelJS.stream.xlsx.WorkbookWriter();
  const moreConfig = {
    page: 0,
    limit: 5,
  };
  const totalCount = 1000000;
  const totalPage = totalCount / moreConfig.limit;
  // 添加工作表
  const sheet = workbook.addWorksheet("My Sheet"); 
  const write = async () => {
    if (moreConfig.page >= totalPage) {
      console.log("交工作簿");
      await workbook.commit(); // 交工作簿,即写入
      return;
    }
    console.log("处理page", moreConfig.page);
    for (let i = 0; i < moreConfig.limit; i  ) {
      sheet.addRow(data).commit(); // 添加行,commit()是将添加的行提交
    }
    moreConfig.page  = 1;
    new Promise((res) => {
      res(write());
    });
  };
  // 连接 响应流 和 文件读流
  workbook.stream.pipe(res);
  write();
});
server.listen(8000);

同样是处理一百万条数据,内存根本没有消耗多少,并且时间也很快

逐段响应流

在前面的例子中,我们都是处理完全部数据再返回,但是就算是分页处理,当数据量大起来之后,总耗时难免加长,甚至可能长达十几分钟,用户端就一直转圈 没有任何反馈,让人无法知道结果,根本不知道服务是否还正常,体验十分不好

所以最好是处理完一页数据之后,就响应给用户端,让用户端知道下载有在进行。

像这样的效果

这里我们还是用了 exceljs 这个 npm 包去新建文件流,然后马上返回 文件流响应,接着才开始异步分页往 流里面添加数据,从而让用户端一直接收数据

代码语言:javascript复制
const server = require('http').createServer();
const ExcelJS = require('exceljs');

var data = ['xxxx', 'xxxxx', 'xxxxxxxx'];

server.on('request', async (req, res) => {
  // 创建一个流,不指定文件,直接获取文件流
  const workbook = new ExcelJS.stream.xlsx.WorkbookWriter();
  const sheet = workbook.addWorksheet('My Sheet'); // 添加工作表
  const moreConfig = {
    page: 0,
    limit: 5000,
  };
  const totalPage = 100; // 加入一百页
  const write = async () => {
    if (moreConfig.page >= totalPage) {
      console.log('交工作簿');
      await workbook.commit(); // 交工作簿,即写入
      return;
    }
    console.log('n当前处理 page ', moreConfig.page);
    for (let i = 0; i < moreConfig.limit; i  ) {
      sheet.addRow(data).commit(); // 添加行,commit()是将添加的行提交
    }
    moreConfig.page  = 1;
    // 使用 setTimeout 模拟查询数据库时间
    new Promise((res) => {
      setTimeout(() => {
        res(write()); 
      }, 100);
    });
  };
  // 告诉浏览器这是一个二进制文件
  res.setHeader(
    'Content-Type',
    'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
  );
  // 告诉浏览器这是一个需要下载的文件
  res.setHeader(
    'Content-Disposition',
    `attachment; filename=${encodeURIComponent('中文')}.xlsx`
  );
  workbook.stream.pipe(res);
  Promise.resolve().then(write);
});

server.listen(8000)

另外,这里要特别注重说明一个问题

如果你使用 koa 去处理接口的话,需要锁住 async 这个包版本为 3.2.0

问题是因为 之后的版本 使用了 微任务 替代了 宏任务,会导致 exceljs 生成的文件流 提前中断

更具体是因为我们使用的 exceljs 这个包

它依赖的链路是

exceljs 依赖了 archiver ,而 archiver 依赖了 async

async 是一个工具函数包,主要是扩展异步 js 的功能

在 3.2.1 版本,使用 微任务 queueMicrotask 替代 宏任务 setImmediate

这其实是一个 breaking change,但是作者只是作为一个 patch 小版本发布

导致安装依赖的时候,自动升级到最新版本

看下会出问题的示例代码

代码语言:javascript复制
const ExcelJS = require('exceljs');
const app = new koa();
const myRouter = new Router();

app.use(myRouter.routes()).use(myRouter.allowedMethods())

myRouter.get('/exceljsStreamPage', async (ctx, next) => {
  const workbook = new ExcelJS.stream.xlsx.WorkbookWriter();
  const write = async () => {...处理数据};
  ctx.body = workbook.stream;
  Promise.resolve().then(write);
});
app.listen(9004, function (a) {
  console.log('启动成功');
})

问题主要在两行代码

代码语言:javascript复制
const workbook = new ExcelJS.stream.xlsx.WorkbookWriter();

和这个

代码语言:javascript复制
ctx.body = workbook.stream;

在 我们使用 exceljs 新建流的时候,它内部使用了 archiver, 而 archiver 会使用 async 新建一些 异步任务

async v3.2.1 中,优先使用了 queueMicrotask

ctx.body 赋值同样也是一个异步任务,但是使用的是 Promise

然而 queueMicrotask 执行比 Promise 快,导致 还没有把 文件流 连接 响应流时

exceljs 添加的异步任务就开始执行了,然后触发了内部的一些判断条件,文件流就中断了

而 async 3.2.0 之前使用的是 setImmediate,是宏任务,比 Promise 要慢

也就是在 ctx.body 连接文件流成功之后 才开始执行任务

写一个执行顺序的小例子

代码语言:javascript复制
function KoaCompose(fn) {
  return Promise.resolve(fn());
}
function middleware() {
  console.log('执行 middleware');
  setImmediate(() => {
    console.log('middleware 中 setImmediate');
  });
  queueMicrotask(() => {
    console.log('middleware 中 queueMicrotask');
  });
}
KoaCompose(middleware).then(() => {
  console.log('ctx.body 连接流');
});

执行结果如下

然后给这个作者提了一个 issue,原谅我蹩脚的英文,希望作者能看懂

https://github.com/caolan/async/issues/1788

不知道他啥时候解决,现在也没有回

0 人点赞