Java 使用 Jackson编写大型 JSON 文件

2021-09-24 10:38:01 浏览数 (6)

有时您需要将大量数据导出为 JSON 到一个文件。也许是“将所有数据导出到 JSON”,或者 GDPR“可移植性权利”,您实际上需要这样做。

与任何大型数据集一样,您不能将其全部放入内存并将其写入文件。这需要一段时间,它从数据库中读取大量条目,您需要小心不要使此类导出使整个系统过载或耗尽内存。

幸运的是,在 ​JacksonSequenceWriter​和可选的管道流的帮助下,这样做相当简单。这是它的样子:

private ObjectMapper jsonMapper =new ObjectMapper();
private ExecutorService executorService = Executors.newFixedThreadPool(5);
 
@Async
public ListenableFuture<Boolean> export(UUID customerId) {
    try (PipedInputStream in =new PipedInputStream();
            PipedOutputStream pipedOut =new PipedOutputStream(in);
            GZIPOutputStream out =new GZIPOutputStream(pipedOut)) {
     
        Stopwatch stopwatch = Stopwatch.createStarted();
 
        ObjectWriter writer = jsonMapper.writer().withDefaultPrettyPrinter();
 
        try(SequenceWriter sequenceWriter = writer.writeValues(out)) {
            sequenceWriter.init(true);
         
            Future<?> storageFuture = executorService.submit(() ->
                   storageProvider.storeFile(getFilePath(customerId), in));
 
            int batchCounter =0;
            while (true) {
                List<Record> batch = readDatabaseBatch(batchCounter++);
                for (Record record : batch) {
                    sequenceWriter.write(entry);
                }
                if (batch.isEmpty()) {
                    // if there are no more batches, stop.
                    break;
                }
            }
 
            // wait for storing to complete
            storageFuture.get();
 
            // send the customer a notification and a download link
            notifyCustomer(customerId);
        } 
 
        logger.info("Exporting took {} seconds", stopwatch.stop().elapsed(TimeUnit.SECONDS));
 
        return AsyncResult.forValue(true);
    }catch (Exception ex) {
        logger.error("Failed to export data", ex);
        return AsyncResult.forValue(false);
    }
}

代码做了几件事:

  • 使用 ​SequenceWriter ​连续写入记录。它使用 ​OutputStream ​进行初始化,所有内容都写入其中。这可以是简单的 ​FileOutputStream​,也可以是下面讨论的管道流。请注意,这里的命名有点误导——​writeValues(out)​听起来你是在指示作者现在写点什么;相反,它将其配置为稍后使用特定的流。
  • 用​SequenceWriter​初始化​true​,意思是“包裹在数组中”。您正在编写许多相同的记录,因此它们应该在最终的 JSON 中表示一个数组。
  • 使用​PipedOutputStream​和​PipedInputStream​将​SequenceWriter​链接到​InputStream​然后传递给存储服务的 ​an ​。如果我们明确地处理文件,就没有必要了——只需传递 ​aFileOutputStream​就可以了。但是,您可能希望以不同的方式存储文件,例如在 Amazon S3 中,并且 ​putObject ​调用需要一个 ​InputStream​,从中读取数据并将其存储在 ​S3 ​中。因此,实际上,您正在写入直接写入 ​InputStream ​的 ​OutputStream​,当尝试从中读取时,会将所有内容写入另一个 ​OutputStream
  • 存储文件是在单独的线程中调用的,这样写入文件不会阻塞当前线程,其目的是从数据库中读取。同样,如果使用简单的 ​FileOutputStream​,则不需要这样做。
  • 整个方法被标记为​@Async (spring) ​以便它不会阻塞执行——它在准备好时被调用并完成(使用具有有限线程池的内部 Spring 执行程序服务)
  • 数据库批量读取代码这里没有显示,因为它因数据库而异。关键是,您应该批量获取数据,而不是 ​SELECT * FROM X​。
  • OutputStream ​被包裹在 ​GZIPOutputStream ​中,因为像 JSON 这样带有重复元素的文本文件可以从压缩中显着受益

主要工作是由 Jackson 的 SequenceWriter 完成的,需要清楚的点是 - 不要假设您的数据会适合内存。它几乎从不这样做,因此以批处理和增量写入的方式进行所有操作。


0 人点赞