Java8 多线程及并行计算demo
代码语言:javascript复制#接口
public interface RemoteLoader {
String load();
default void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
#实现类
public class CustomerInfoService implements RemoteLoader{
@Override
public String load() {
this.delay();
return "基本信息";
}
}
public class LabelService implements RemoteLoader{
@Override
public String load() {
this.delay();
return "学习标签";
}
}
public class LearnRecordService implements RemoteLoader{
@Override
public String load() {
this.delay();
return "学习信息";
}
}
public class WatchRecordService implements RemoteLoader{
@Override
public String load() {
this.delay();
return "观看服务";
}
}
#测试类
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* 参考:https://blog.csdn.net/Alecor/article/details/113405297
*/
public class TestSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// sync();
// testFuture();
// testParallelStream();
// testCompletableFuture();
// testCompletableFuture2();
// testCompletableFuture3();
testCompletableFuture4();
}
/**
* [基本信息, 学习信息]
* 总共花费时间:2036
*/
public static void sync(){
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaderList = Arrays.asList(new CustomerInfoService(),new LearnRecordService());
List<String> customerDetail = remoteLoaderList.stream().map(RemoteLoader::load).collect(Collectors.toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" (end - start));
}
/**
* [基本信息, 学习信息]
* 总共花费时间:1037
*/
public static void testFuture() {
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
List<Future<String>> futures = remoteLoaders.stream()
.map(remoteLoader -> executorService.submit(remoteLoader::load))
.collect(Collectors.toList());
List<String> customerDetail = futures.stream()
.map(future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" (end - start));
}
/**
* [基本信息, 学习信息, 学习标签, 观看服务]
* 总共花费时间:1081
*/
public static void testParallelStream() {
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaders = Arrays.asList( new CustomerInfoService(),
new LearnRecordService(),
new LabelService(),
new WatchRecordService());
List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(Collectors.toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" (end - start));
}
/**
* doSomething...
* Finish
*/
public static void testCompletableFuture() {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
try {
doSomething();
future.complete("Finish");
} catch (Exception e) {
future.completeExceptionally(e);
} //任务执行完成后 设置返回的结果
}).start();
System.out.println(future.join()); //获取任务线程返回的结果
}
private static void doSomething() {
System.out.println("doSomething...");
}
/**
* doSomething...
* Finish
*
* @throws ExecutionException
* @throws InterruptedException
*/
public static void testCompletableFuture2() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
doSomething();
return "Finish";
});
System.out.println(future.get());
}
/**
* [基本信息, 学习信息, 学习标签, 观看服务]
* 总共花费时间:1079
*
* @throws ExecutionException
* @throws InterruptedException
*/
public static void testCompletableFuture3() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaders = Arrays.asList(
new CustomerInfoService(),
new LearnRecordService(),
new LabelService(),
new WatchRecordService());
List<CompletableFuture<String>> completableFutures = remoteLoaders
.stream()
.map(loader -> CompletableFuture.supplyAsync(loader::load).exceptionally(throwable -> "Throwable exception message:" throwable.getMessage()))
.collect(Collectors.toList());
List<String> customerDetail = completableFutures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" (end - start));
}
/**
* [基本信息, 学习信息, 学习标签, 观看服务]
* 总共花费时间:1051
*
* @throws ExecutionException
* @throws InterruptedException
*/
public static void testCompletableFuture4() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
List<RemoteLoader> remoteLoaders = Arrays.asList(
new CustomerInfoService(),
new LearnRecordService(),
new LabelService(),
new WatchRecordService());
ExecutorService executorService = Executors.newFixedThreadPool(Math.min(remoteLoaders.size(), 50));
List<CompletableFuture<String>> completableFutures = remoteLoaders
.stream()
.map(loader -> CompletableFuture.supplyAsync(loader::load, executorService))
.collect(Collectors.toList());
List<String> customerDetail = completableFutures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(customerDetail);
long end = System.currentTimeMillis();
System.out.println("总共花费时间:" (end - start));
}
}