如何在控制台实现一个进度条—多线程处理升级版

2024-09-09 20:53:22 浏览数 (1)

如何在控制台实现一个进度条—多线程处理升级版

一、介绍

在以前,使用Java在控制台实现了一个进度条,文章如下

如何在控制台实现一个进度条-腾讯云开发者社区-腾讯云 (tencent.com)

但评论反应出来了一个BUG,这次在实现多线程处理队列的同时,一并解决掉了这个BUG

二、代码

1)原本的代码修复

首先是父类,主要是对startNumendNum那块做了些修复

代码语言:java复制
 package com.banmoon.utils.processbar;
 ​
 import cn.hutool.core.lang.Assert;
 import lombok.Getter;
 ​
 import java.util.Iterator;
 import java.util.function.Consumer;
 ​
 /**
  * @author banmoon
  * @date 2024/03/08 15:52:13
  */
 @Getter
 @SuppressWarnings("NullableProblems")
 public abstract class ProcessBarUtil<T> implements Iterable<T> {
 ​
     /**
      * 遍历的数据
      */
     private final Iterable<T> data;
 ​
     /**
      * 开始数量
      */
     private final Integer startNum;
 ​
     /**
      * 当前的进度
      */
     private Integer current;
 ​
     /**
      * 结束数量
      */
     private final Integer endNum;
 ​
     /**
      * 进度条当前位置
      */
     private Integer processCurrentNum;
 ​
     /**
      * 进度条总数
      */
     private final Integer processTotalNum = 100;
 ​
     public ProcessBarUtil(Iterable<T> data, Integer startNum, Integer current, Integer endNum) {
         this.data = data;
         this.startNum = startNum;
         this.current = current;
         this.endNum = endNum;
     }
 ​
     /**
      * 添加进度
      *
      * @param num 本次进度数量
      */
     public synchronized void add(Integer num) {
         this.current  = num;
         this.processCurrentNum = (int) (100.0 * (current - startNum) / (endNum - startNum));
         updateProcessBar(processCurrentNum, processTotalNum);
     }
 ​
     /**
      * 更新进度条
      *
      * @param current 当前进度
      */
     public synchronized void update(Integer current) {
         this.current = current;
         this.processCurrentNum = (int) (100.0 * (current - startNum) / (endNum - startNum));
         updateProcessBar(processCurrentNum, processTotalNum);
     }
 ​
     protected abstract void updateProcessBar(Integer processCurrentNum, Integer processTotalNum);
 ​
     @Override
     public Iterator<T> iterator() {
         Assert.notNull(data, "不允许操作");
         return data.iterator();
     }
 ​
     @Override
     public void forEach(Consumer<? super T> action) {
         Assert.notNull(data, "不允许操作");
         data.forEach(t -> {
             action.accept(t);
             add(1);
         });
     }
 }

然后是它的实现类,控制台实现,主要新增了进度的展示,当前处理数量/total

代码语言:java复制
 package com.banmoon.utils.processbar;
 ​
 import cn.hutool.core.collection.IterUtil;
 ​
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 ​
 /**
  * 控制台打印输出进度条
  *
  * @author banmoon
  * @date 2024/03/08 16:05:26
  */
 public class ConsoleProcessBarUtil<T> extends ProcessBarUtil<T> {
 ​
     private final OutputStream outputStream;
 ​
     /**
      * 未完成的
      */
     private static final char incomplete = '░';
 ​
     /**
      * 完成的
      */
     private static final char complete = '█';
 ​
 ​
     public ConsoleProcessBarUtil(Iterable<T> data, Integer startNum, Integer current, Integer endNum) {
         super(data, startNum, current, endNum);
         outputStream = System.out;
     }
 ​
     public static <T> ProcessBarUtil<T> init(Iterable<T> data) {
         return new ConsoleProcessBarUtil<>(data, 0, 0, IterUtil.size(data));
     }
 ​
     public static ProcessBarUtil<?> init(Integer startNum, Integer endNum) {
         return new ConsoleProcessBarUtil<>(null, startNum, startNum, endNum);
     }
 ​
     @Override
     protected void updateProcessBar(Integer processCurrentNum, Integer processTotalNum) {
         String processBar = generateProcessBar(processCurrentNum, processTotalNum);
         try {
             outputStream.write("r".getBytes());
             outputStream.write(processBar.getBytes());
         } catch (IOException e) {
             throw new RuntimeException("an error occurred while printing the progress bar");
         }
     }
 ​
     private String generateProcessBar(Integer processCurrentNum, Integer processTotalNum) {
         StringBuilder sb = new StringBuilder("[");
         Stream.generate(() -> complete)
                 .limit(processCurrentNum)
                 .forEach(sb::append);
         IntStream.range(processCurrentNum, processTotalNum)
                 .mapToObj(i -> incomplete)
                 .forEach(sb::append);
         sb.append("] ");
         sb.append(getCurrent()).append("/").append(getEndNum());
         if (processCurrentNum >= processTotalNum) {
             sb.append("t完成n");
         } else {
             sb.append("t").append(processCurrentNum).append("%");
         }
         return sb.toString();
     }
 ​
 }

简单来说,就是通过System.out.wirte("r".getBytes());重置控制台光标,再打印出新的进度条,

重复重置,打印的步骤,直到100%截止,从肉眼上看,就像进度一直在变化,进度条的目的也就达成了。

2)扩展升级版

上面这个还不够,因为上面这个是单线程的,如何扩展使其变成一个多线程执行的任务呢?

本来是想使用继承,再扩展一下ConsoleProcessBarUtil.java,偶然间想起以前的一篇文章

多用组合,少用继承

好吧,那我转变一下思路,采用组合的方式来完成这段代码;


2.1)并发工具类

首先,我们需要一个多线程的一个工具类,主要实现下面的功能

  • 可以管理一个线程池
  • 可以控制最大并发
  • 可以控制完成后再退出

正好,如下使用Semaphore.javaCountDownLatch.java即可

这都是JUC中的工具类,如果不明白的,可以去看看我的这边文章,包会的

Java的juc并发编程包 | 半月无霜 (banmoon.top)

代码语言:java复制
 package com.banmoon.utils;
 ​
 import cn.hutool.core.lang.Opt;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
 ​
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.function.Consumer;
 ​
 @Getter
 @Setter
 public class ConcurrentUtil {
 ​
     private final Semaphore semaphore;
 ​
     private final CountDownLatch countDownLatch;
 ​
     private final ExecutorService threadPool;
 ​
     public ConcurrentUtil(int countDownLatchSize, ExecutorService threadPool) {
         this(20, countDownLatchSize, threadPool);
     }
 ​
     public ConcurrentUtil(int semaphoreSize, int countDownLatchSize, ExecutorService threadPool) {
         this.semaphore = new Semaphore(semaphoreSize);
         this.countDownLatch = new CountDownLatch(countDownLatchSize);
         this.threadPool = threadPool;
     }
 ​
     public void execute(Runnable execute, Consumer<Exception> exceptionConsumer) {
         threadPool.execute(() -> {
             try {
                 semaphore.acquire();
                 execute.run();
             } catch (Exception e) {
                 Opt.ofNullable(exceptionConsumer).peek(a -> a.accept(e));
             } finally {
                 semaphore.release();
                 countDownLatch.countDown();
             }
         });
     }
 ​
     @SneakyThrows
     public void await() {
         countDownLatch.await();
     }
 ​
 }

主要执行在execute()方法,传入Runnable实现,以及异常消费处理,即可使用这个类

2.2)并发的控制台进度条工具类

好的,上面的并发工具类有了,那么接下来就是将ConsoleProcessBarUtil.javaConcurrentUtil.java组合起来使用

代码语言:java复制
 package com.banmoon.utils.processbar;
 ​
 import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.convert.Convert;
 import cn.hutool.core.lang.Assert;
 import cn.hutool.core.thread.ThreadUtil;
 import com.banmoon.utils.ConcurrentUtil;
 import lombok.Setter;
 ​
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 ​
 /**
  * 并发的控制台进度条工具类
  *
  * @param <T>
  */
 @Setter
 public class ConcurrentProcessBarUtil<T> {
 ​
     /**
      * 遍历的数据
      */
     private Collection<T> data;
 ​
     /**
      * 遍历的数据总数
      */
     private Integer totalSize;
 ​
     /**
      * 需要分批处理的数据
      */
     private List<Collection<T>> dataList;
 ​
     /**
      * 控制台进度条工具类
      */
     private ProcessBarUtil<T> processBarUtil;
 ​
     /**
      * 并发工具类
      */
     private ConcurrentUtil concurrentUtil;
 ​
     private ConcurrentProcessBarUtil(Collection<T> data, Integer totalSize, ProcessBarUtil<T> processBarUtil) {
         this.data = data;
         this.totalSize = totalSize;
         this.processBarUtil = processBarUtil;
     }
 ​
     /**
      * 初始化数据列表
      */
     public static <T> ConcurrentProcessBarUtil<T> initData(Collection<T> data) {
         ProcessBarUtil<T> processBarUtil = ConsoleProcessBarUtil.init(data);
         return new ConcurrentProcessBarUtil<>(data, data.size(), processBarUtil);
     }
 ​
     /**
      * 通过条数将list均分
      */
     public ConcurrentProcessBarUtil<T> splitDataByLimit(int limit) {
         Assert.isTrue(limit > 0, "数量必须大于0");
         Integer pieces = Convert.toInt(Math.ceil((double) totalSize / limit));
         dataList = listSplitList(pieces, limit);
         return this;
     }
 ​
     /**
      * 通过页数将list均分
      */
     public ConcurrentProcessBarUtil<T> splitDataByPieces(int pieces) {
         Assert.isTrue(pieces > 0, "数量必须大于0");
         int limit = Convert.toInt(Math.ceil(totalSize / (double) pieces));
         dataList = listSplitList(pieces, limit);
         return this;
     }
 ​
     /**
      * 将list均分
      */
     private List<Collection<T>> listSplitList(int pieces, int limit) {
         Assert.notNull(data, "数据不能为空,请先初始化数据");
         return Stream.iterate(0, n -> n   limit)
                 .limit(pieces)
                 .map(startIndex -> CollUtil.sub(data, Math.min(startIndex, totalSize), Math.min(startIndex   limit, totalSize)))
                 .filter(CollUtil::isNotEmpty)
                 .collect(Collectors.toList());
     }
 ​
     /**
      * 设置并发线程池
      */
     public ConcurrentProcessBarUtil<T> threadPool(Integer threadNum, String prefix) {
         Assert.notNull(dataList, "数据不能为空,请先拆分数据");
         ExecutorService executorService = Executors.newFixedThreadPool(threadNum, ThreadUtil.newNamedThreadFactory(prefix, true));
         this.concurrentUtil = new ConcurrentUtil(dataList.size(), executorService);
         return this;
     }
 ​
     /**
      * 设置并发线程池
      */
     public ConcurrentProcessBarUtil<T> threadPool(Integer semaphoreSize, Integer threadNum, String prefix) {
         Assert.notNull(dataList, "数据不能为空,请先拆分数据");
         ExecutorService executorService = Executors.newFixedThreadPool(threadNum, ThreadUtil.newNamedThreadFactory(prefix, true));
         this.concurrentUtil = new ConcurrentUtil(semaphoreSize, dataList.size(), executorService);
         return this;
     }
 ​
     /**
      * 执行
      */
     public void execute(Consumer<T> consumer) {
         Assert.notNull(concurrentUtil, "线程池不能为空,请先设置线程池");
         for (Collection<T> list : dataList) {
             concurrentUtil.execute(() -> list.forEach(t -> {
                 consumer.accept(t);
                 processBarUtil.add(1);
             }), Exception::printStackTrace);
         }
         concurrentUtil.await();
     }
 ​
 }

可以看到上面每个方法都有断言,使用上遵循initData()splitDataByLimit() | splitDataByPieces()threadPool()execute()的顺序来进行

理解如下

  1. 先设置需要处理的数据列表,这里设置的是总的数据
  2. 再对这些数据进行拆分,拆分出来的数据,每一个线程处理一份,一份多少条数据自己看清楚决定
  3. 接着设置线程池,目前仅有两个重载方法,使用的都是Executors.newFixedThreadPool(),可以按需进行重载,总之这边设置线程池就对了
  4. 最后就是执行了,传入一个消费者,进行处理

三、验证执行

执行代码验证,如下准备了2000条的数据,每300条分一批,交给线程池去执行,线程池的线程数为20

代码语言:java复制
 package com.banmoon;
 ​
 import cn.hutool.core.stream.StreamUtil;
 import cn.hutool.core.util.RandomUtil;
 import com.banmoon.utils.processbar.ConcurrentProcessBarUtil;
 ​
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 ​
 /**
  * @author banmoon
  * @date 2024/03/08 12:05:30
  */
 public class ConsoleProcessBarMain {
 ​
     public static void main(String[] args) {
         List<Integer> list = StreamUtil.of(0, i -> i   1, 2000)
                 .collect(Collectors.toList());
         ConcurrentProcessBarUtil
                 .initData(list)
                 .splitDataByLimit(300)
                 .threadPool(20, "thread-")
                 .execute(ll -> {
                     try {
                         // 模拟业务处理
                         TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(200, 500));
                     } catch (InterruptedException ignored) {
                         
                     }
                 });
     }
 ​
 }

运行查看结果

动画动画

四、最后

控制台打印进度条,实际上并不是一种特别有用的做法,它有一定的局限性

例如,我在处理数据过程中,不能打印自己的日志了,所以我还在寻求其他的进度条实现

目前在看JavaFx,在数据处理的同时,弹出一个窗口

  • 布局上方是进度条
  • 布局下方是一个控制台,用来滚动日志

想法很好,后面找时机写写看

0 人点赞