如何在控制台实现一个进度条—多线程处理升级版
一、介绍
在以前,使用Java
在控制台实现了一个进度条,文章如下
如何在控制台实现一个进度条-腾讯云开发者社区-腾讯云 (tencent.com)
但评论反应出来了一个BUG
,这次在实现多线程处理队列的同时,一并解决掉了这个BUG
二、代码
1)原本的代码修复
首先是父类,主要是对startNum
、endNum
那块做了些修复
package com.banmoon.utils.processbar;
import cn.hutool.core.lang.Assert;
import lombok.Getter;
import java.util.Iterator;
import java.util.function.Consumer;
/**
* @author banmoon
* @date 2024/03/08 15:52:13
*/
@Getter
@SuppressWarnings("NullableProblems")
public abstract class ProcessBarUtil<T> implements Iterable<T> {
/**
* 遍历的数据
*/
private final Iterable<T> data;
/**
* 开始数量
*/
private final Integer startNum;
/**
* 当前的进度
*/
private Integer current;
/**
* 结束数量
*/
private final Integer endNum;
/**
* 进度条当前位置
*/
private Integer processCurrentNum;
/**
* 进度条总数
*/
private final Integer processTotalNum = 100;
public ProcessBarUtil(Iterable<T> data, Integer startNum, Integer current, Integer endNum) {
this.data = data;
this.startNum = startNum;
this.current = current;
this.endNum = endNum;
}
/**
* 添加进度
*
* @param num 本次进度数量
*/
public synchronized void add(Integer num) {
this.current = num;
this.processCurrentNum = (int) (100.0 * (current - startNum) / (endNum - startNum));
updateProcessBar(processCurrentNum, processTotalNum);
}
/**
* 更新进度条
*
* @param current 当前进度
*/
public synchronized void update(Integer current) {
this.current = current;
this.processCurrentNum = (int) (100.0 * (current - startNum) / (endNum - startNum));
updateProcessBar(processCurrentNum, processTotalNum);
}
protected abstract void updateProcessBar(Integer processCurrentNum, Integer processTotalNum);
@Override
public Iterator<T> iterator() {
Assert.notNull(data, "不允许操作");
return data.iterator();
}
@Override
public void forEach(Consumer<? super T> action) {
Assert.notNull(data, "不允许操作");
data.forEach(t -> {
action.accept(t);
add(1);
});
}
}
然后是它的实现类,控制台实现,主要新增了进度的展示,当前处理数量/total
package com.banmoon.utils.processbar;
import cn.hutool.core.collection.IterUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
* 控制台打印输出进度条
*
* @author banmoon
* @date 2024/03/08 16:05:26
*/
public class ConsoleProcessBarUtil<T> extends ProcessBarUtil<T> {
private final OutputStream outputStream;
/**
* 未完成的
*/
private static final char incomplete = '░';
/**
* 完成的
*/
private static final char complete = '█';
public ConsoleProcessBarUtil(Iterable<T> data, Integer startNum, Integer current, Integer endNum) {
super(data, startNum, current, endNum);
outputStream = System.out;
}
public static <T> ProcessBarUtil<T> init(Iterable<T> data) {
return new ConsoleProcessBarUtil<>(data, 0, 0, IterUtil.size(data));
}
public static ProcessBarUtil<?> init(Integer startNum, Integer endNum) {
return new ConsoleProcessBarUtil<>(null, startNum, startNum, endNum);
}
@Override
protected void updateProcessBar(Integer processCurrentNum, Integer processTotalNum) {
String processBar = generateProcessBar(processCurrentNum, processTotalNum);
try {
outputStream.write("r".getBytes());
outputStream.write(processBar.getBytes());
} catch (IOException e) {
throw new RuntimeException("an error occurred while printing the progress bar");
}
}
private String generateProcessBar(Integer processCurrentNum, Integer processTotalNum) {
StringBuilder sb = new StringBuilder("[");
Stream.generate(() -> complete)
.limit(processCurrentNum)
.forEach(sb::append);
IntStream.range(processCurrentNum, processTotalNum)
.mapToObj(i -> incomplete)
.forEach(sb::append);
sb.append("] ");
sb.append(getCurrent()).append("/").append(getEndNum());
if (processCurrentNum >= processTotalNum) {
sb.append("t完成n");
} else {
sb.append("t").append(processCurrentNum).append("%");
}
return sb.toString();
}
}
简单来说,就是通过System.out.wirte("r".getBytes());
重置控制台光标,再打印出新的进度条,
重复重置,打印的步骤,直到100
%截止,从肉眼上看,就像进度一直在变化,进度条的目的也就达成了。
2)扩展升级版
上面这个还不够,因为上面这个是单线程的,如何扩展使其变成一个多线程执行的任务呢?
本来是想使用继承,再扩展一下ConsoleProcessBarUtil.java
,偶然间想起以前的一篇文章
多用组合,少用继承
好吧,那我转变一下思路,采用组合的方式来完成这段代码;
2.1)并发工具类
首先,我们需要一个多线程的一个工具类,主要实现下面的功能
- 可以管理一个线程池
- 可以控制最大并发
- 可以控制完成后再退出
正好,如下使用Semaphore.java
、CountDownLatch.java
即可
这都是JUC
中的工具类,如果不明白的,可以去看看我的这边文章,包会的
Java的juc并发编程包 | 半月无霜 (banmoon.top)
代码语言:java复制 package com.banmoon.utils;
import cn.hutool.core.lang.Opt;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
@Getter
@Setter
public class ConcurrentUtil {
private final Semaphore semaphore;
private final CountDownLatch countDownLatch;
private final ExecutorService threadPool;
public ConcurrentUtil(int countDownLatchSize, ExecutorService threadPool) {
this(20, countDownLatchSize, threadPool);
}
public ConcurrentUtil(int semaphoreSize, int countDownLatchSize, ExecutorService threadPool) {
this.semaphore = new Semaphore(semaphoreSize);
this.countDownLatch = new CountDownLatch(countDownLatchSize);
this.threadPool = threadPool;
}
public void execute(Runnable execute, Consumer<Exception> exceptionConsumer) {
threadPool.execute(() -> {
try {
semaphore.acquire();
execute.run();
} catch (Exception e) {
Opt.ofNullable(exceptionConsumer).peek(a -> a.accept(e));
} finally {
semaphore.release();
countDownLatch.countDown();
}
});
}
@SneakyThrows
public void await() {
countDownLatch.await();
}
}
主要执行在execute()
方法,传入Runnable
实现,以及异常消费处理,即可使用这个类
2.2)并发的控制台进度条工具类
好的,上面的并发工具类有了,那么接下来就是将ConsoleProcessBarUtil.java
和ConcurrentUtil.java
组合起来使用
package com.banmoon.utils.processbar;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.thread.ThreadUtil;
import com.banmoon.utils.ConcurrentUtil;
import lombok.Setter;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 并发的控制台进度条工具类
*
* @param <T>
*/
@Setter
public class ConcurrentProcessBarUtil<T> {
/**
* 遍历的数据
*/
private Collection<T> data;
/**
* 遍历的数据总数
*/
private Integer totalSize;
/**
* 需要分批处理的数据
*/
private List<Collection<T>> dataList;
/**
* 控制台进度条工具类
*/
private ProcessBarUtil<T> processBarUtil;
/**
* 并发工具类
*/
private ConcurrentUtil concurrentUtil;
private ConcurrentProcessBarUtil(Collection<T> data, Integer totalSize, ProcessBarUtil<T> processBarUtil) {
this.data = data;
this.totalSize = totalSize;
this.processBarUtil = processBarUtil;
}
/**
* 初始化数据列表
*/
public static <T> ConcurrentProcessBarUtil<T> initData(Collection<T> data) {
ProcessBarUtil<T> processBarUtil = ConsoleProcessBarUtil.init(data);
return new ConcurrentProcessBarUtil<>(data, data.size(), processBarUtil);
}
/**
* 通过条数将list均分
*/
public ConcurrentProcessBarUtil<T> splitDataByLimit(int limit) {
Assert.isTrue(limit > 0, "数量必须大于0");
Integer pieces = Convert.toInt(Math.ceil((double) totalSize / limit));
dataList = listSplitList(pieces, limit);
return this;
}
/**
* 通过页数将list均分
*/
public ConcurrentProcessBarUtil<T> splitDataByPieces(int pieces) {
Assert.isTrue(pieces > 0, "数量必须大于0");
int limit = Convert.toInt(Math.ceil(totalSize / (double) pieces));
dataList = listSplitList(pieces, limit);
return this;
}
/**
* 将list均分
*/
private List<Collection<T>> listSplitList(int pieces, int limit) {
Assert.notNull(data, "数据不能为空,请先初始化数据");
return Stream.iterate(0, n -> n limit)
.limit(pieces)
.map(startIndex -> CollUtil.sub(data, Math.min(startIndex, totalSize), Math.min(startIndex limit, totalSize)))
.filter(CollUtil::isNotEmpty)
.collect(Collectors.toList());
}
/**
* 设置并发线程池
*/
public ConcurrentProcessBarUtil<T> threadPool(Integer threadNum, String prefix) {
Assert.notNull(dataList, "数据不能为空,请先拆分数据");
ExecutorService executorService = Executors.newFixedThreadPool(threadNum, ThreadUtil.newNamedThreadFactory(prefix, true));
this.concurrentUtil = new ConcurrentUtil(dataList.size(), executorService);
return this;
}
/**
* 设置并发线程池
*/
public ConcurrentProcessBarUtil<T> threadPool(Integer semaphoreSize, Integer threadNum, String prefix) {
Assert.notNull(dataList, "数据不能为空,请先拆分数据");
ExecutorService executorService = Executors.newFixedThreadPool(threadNum, ThreadUtil.newNamedThreadFactory(prefix, true));
this.concurrentUtil = new ConcurrentUtil(semaphoreSize, dataList.size(), executorService);
return this;
}
/**
* 执行
*/
public void execute(Consumer<T> consumer) {
Assert.notNull(concurrentUtil, "线程池不能为空,请先设置线程池");
for (Collection<T> list : dataList) {
concurrentUtil.execute(() -> list.forEach(t -> {
consumer.accept(t);
processBarUtil.add(1);
}), Exception::printStackTrace);
}
concurrentUtil.await();
}
}
可以看到上面每个方法都有断言,使用上遵循initData()
、splitDataByLimit() | splitDataByPieces()
、threadPool()
、execute()
的顺序来进行
理解如下
- 先设置需要处理的数据列表,这里设置的是总的数据
- 再对这些数据进行拆分,拆分出来的数据,每一个线程处理一份,一份多少条数据自己看清楚决定
- 接着设置线程池,目前仅有两个重载方法,使用的都是
Executors.newFixedThreadPool()
,可以按需进行重载,总之这边设置线程池就对了 - 最后就是执行了,传入一个消费者,进行处理
三、验证执行
执行代码验证,如下准备了2000
条的数据,每300
条分一批,交给线程池去执行,线程池的线程数为20
package com.banmoon;
import cn.hutool.core.stream.StreamUtil;
import cn.hutool.core.util.RandomUtil;
import com.banmoon.utils.processbar.ConcurrentProcessBarUtil;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author banmoon
* @date 2024/03/08 12:05:30
*/
public class ConsoleProcessBarMain {
public static void main(String[] args) {
List<Integer> list = StreamUtil.of(0, i -> i 1, 2000)
.collect(Collectors.toList());
ConcurrentProcessBarUtil
.initData(list)
.splitDataByLimit(300)
.threadPool(20, "thread-")
.execute(ll -> {
try {
// 模拟业务处理
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(200, 500));
} catch (InterruptedException ignored) {
}
});
}
}
运行查看结果
四、最后
控制台打印进度条,实际上并不是一种特别有用的做法,它有一定的局限性
例如,我在处理数据过程中,不能打印自己的日志了,所以我还在寻求其他的进度条实现
目前在看JavaFx
,在数据处理的同时,弹出一个窗口
- 布局上方是进度条
- 布局下方是一个控制台,用来滚动日志
想法很好,后面找时机写写看