《Java高并发编程详解》第19章:Future设计模式

2021-10-14 14:27:09 浏览数 (1)

简介

假设有个任务需要执行比较长的的时间,通常需要等待任务执行结束或者出错才能返回结果,在此期间调用者只能陷人阻塞苦苦等待,对此,Future设计模式提供了一种凭据式的解决方案。在我们日常生活中,关于凭据的使用非常多见,比如你去某西服手工作坊

想订做一身合体修身的西服,西服的制作过程比较漫长,少则一个礼拜,多则一个月,你不可能一直待在原地等待,一般来说作坊会为你开一个凭据,此凭据就是Future,在接下来的任意日子里你可以凭借此凭据到作坊获取西服。在本章中,我们将通过程序的方式实现Future设计模式,让读者体会这种设计的好处。

自JDK1.5起,Java提供了比较强大的Future接口,在JDK1.8时更是引人了CompletableFuture,其结合函数式接口可实现更强大的功能,由于本书不涉及讨论并发包的知识点,读者可自行查阅。

Future设计模式实现

图19-1是Future设计模式所涉及的关键接口和它们之间的关系UML图,其中FutureTest用于测试。

接口定义

1. Future接口设计

Future提供了获取计算结果和判断任务是否完成的两个接口,其中获取计算结果将会导致调用阻塞(在任务还未完成的情况下),相关代码如清单所示。

代码语言:javascript复制
public interface Future<T>{
    //返回计算后的结果,该方法会陷入阻塞状态
    T get() throws InterruptedException;
    
    //判断任务是否已经被执行完成
    boolean done( ) ;
}

2. FutureService接口设计

FutureService主要用于提交任务,提交的任务主要有两种,第一种不需要返回值,第二种则需要获得最终的计算结果。FutureService 接口中提供了对FutureServiceImpl构建的工厂方法,JDK 8中不仅支持default方法还支持静态方法,JDK 9甚至还支持接口私有方法。

FutureService接口的设计代码如清单所示。

代码语言:javascript复制
public interface FutureService<IN, OUT>{
    //提交不需要返回值的任务,Future.get方法返回的将会是null
    Future<?> submit ( Runnable runnable) ;
    
    //提交需要返回值的任务,其中Task接口代替了Runnable接口
    Future<OUT> submit(Task<IN, OUT> task, IN input);
    
    //使用静态方法创建一个FutureService的实现
    static <T, R> FutureService<T, R> newService( ){
        return new FutureServiceImpl<>();
    }
}

3. Task接口设计

Task接口主要是提供给调用者实现计算逻辑之用的,可以接受一个参数并且返回最终的计算结果,这一点非常类似于JDK1.5中的Callable接口,Task接口的设计代码如清单19-3所示。

代码语言:javascript复制
@FunctionalInterface
public interface Task<IN, OUT>{
    //给定一个参数,经过计算返回结果
    OUT get(IN input);
}

程序实现

1. FutureTask

FutureTask是Future的一个实现,除了实现Future中定义的get()以及done() 方法,还额外增加了protected 方法finish,该方法主要用于接收任务被完成的通知,FutureTask接口的设计代码如清单19-4所示。

代码语言:javascript复制
public class FutureTask<T> implements Future <T>{
    //计算结果
    private T result;
    
    //任务是否完成
    private boolean isDone = false ;
    
    //定义对象锁
    private final Object LOCK = new Object();
    
    @Override
    public T get() throws InterruptedException {
        synchronized (LOCK){
            //当任务还没完成时,调用get方法会被挂起而进入阻塞
            while (!isDone){
                LocK.wait();
            }
            //返回最终计算结果
            return result;
        }
    }
    
    //finish方法主要用于为FutureTask设置计算结果
    protected void finish(T result) {
        synchronized (LOCK) {
            //balking设计模式
            if (isDone) {
                return;
            }
            //计算完成,为result指定结果,并且将isDone设为true,同时唤醒阻塞中的线程
            this.result = result;
            this.isDone = true;
            LOCK.notifyA1l();
        }
    }
    
    //返回当前任务是否已经完成
    @Override
    public boolean done( )
        return isDone;
    }
}

FutureTask中充分利用了线程间的通信wait和notifyAll,当任务没有被完成之前通过get方法获取结果,调用者会进人阻塞,直到任务完成并接收到其他线程的唤醒信号,finish方法接收到了任务完成通知,唤醒了因调用get而进人阻塞的线程。

2. FutureServicelmpl

FutureServiceImpl的主要作用在于当提交任务时创建一个新的线程来受理该任务,进而达到任务异步执行的效果。在FutureServiceImpl的submit方法中,分别启动了新的线程运行任务,起到了异步的作用,在任务最终运行成功之后,会通知FutureTask任务已完成。

Future 的使用以及技巧总结

Future直译是“未来”的意思,主要是将一些耗时的操作交给一个线程去执行,从而达到异步的目的,提交线程在提交任务和获得计算结果的过程中可以进行其他的任务执行,而不至于傻傻等待结果的返回。

我们提供了两种任务的提交(无返回值和有返回值)方式,在这里分别对其进行测试。

  • 无返回值的任务提交测试如下:
代码语言:javascript复制
//定义不需要返回值的FutureService
FutureService<Void, Void> service = FutureService.newService();
//submit方法为立即返回的方法
Future<?> future = service.submit(() ->
    try{
        TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e){
        e. printStackTrace();
    }
    System.out.println("I am finish done.");
});
//get方法会使当前线程进入阻塞
future.get();

上面的测试代码中提交了一个无返回值的任务,当调用了submit 方法之后会立即返回不再进人阻塞。

有返回值的任务提交测试如下:

代码语言:javascript复制
//定义有返回值的FutureService
FutureService<string, Integer> service = FutureService.newService() ;
//submit方法会立即返回
Future<Integer> future = service.submit(input -> {
    try
        TimeUnit.SECONDS.sleep(10);
    } catch ( InterruptedException e) {
        e.printStackTrace( ) ;
    }
    return input.length();
},"Hello");
//get方法使当前线程进入阻塞,最终会返回计算的结果
System.out.println( future.get());

上面的测试提交了一个有返回值类型的任务,用于计算字符串的长度,最后的计算结果将会返回输人字符串的长度。

至此,Future 模式设计已讲解完毕,虽然我们提交任务时不会进入任何阻塞,但是当调用者需要获取结果的时候,还是有可能陷人阻塞直到任务完成,其实这个问题不仅在我们设计的Future中有,在JDK 1.5 时期也存在,直到JDK 1.8 引人了CompletableFuture才得到了完美的增强,那么在此期间各种开源项目中都给出了各自的解决方案,比如Google的Guava Toolkit就提供了ListenableFuture用于支持任务完成时回调的方式。

增强FutureService使其支持回调

使用任务完成时回调的机制可以让调用者不再进行显式地通过get的方式获得数据而导致进入阻塞,可在提交任务的时候将回调接口一并注人,在这里对FutureService接口稍作修改,修改代码如清单19-6所示。

代码语言:javascript复制
//增加回调接口Callback,当任务执行结束之后,Callback会得到执行
@Override
public Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback ){
    final FutureTask<OUT> future = new FutureTask<>();
    new Thread(( ) -> {
        OUT result = task.get(input) ;
        future.finish(result) ;
        //执行回调接口
        if (null != callback) {
            callback. call(result) ;
        }
    }, getNextName()).start() ;
    return future;
}

修改后的submit方法,增加了一个Callback参数,主要用来接受并处理任务的计算结果,当提交的任务执行完成之后,会将结果传递给Callback接口进行进一步的执行, 这样在提交任务之后不再会因为通过get方法获得结果而陷人阻塞。

Callback接口非常简单,非常类似于JDK 8中的Consumer函数式接口,Callback 接口的代码如清单19-7所示。

代码语言:javascript复制
@FunctionalInterface
public interface Callback<T> {
    //任务完成后会调用该方法,其中T为任务执行后的结果
    void call(T t) ;
}

好了,我们再测试一下增加了Callback之后的Future任务提交,代码如下:

代码语言:javascript复制
public static void main(String[] args) throws InterruptedException {
    FutureService<String, Integer> service = FutureService.newService();
    service.submit(input -> {
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e. printStackTrace( ) ;
        }
        return input.length();
    },"Hello", System.out::println);
}

System.out:println是一个Lambda表达式的静态推导,其作用就是实现call方法,通过升级后的程序你会发现,我们再也不需要通过get的方式来获得结果了,当然你也可以继续使用get 方法获得最终的计算结果。

本章总结

当某个任务运行需要较长的时间时,调用线程在提交任务之后的徒劳等待对CPU资源来说是一种浪费,在等待的这段时间里,完全可以进行其他任务的执行,这种场景完全符合Future设计模式的应用,虽然我们实现了一个简单的Future设计,但是仍旧存在诸多缺陷,读者在阅读完本章内容之后可以对其进行再次增强。

  • 将提交的任务交给线程池运行,比如我们在第8章自定义的线程池。
  • Get方法没有超时功能,如果获取一一个计算结果在规定的时间内没有返回,则可以抛出异常通知调用线程。
  • Future未提供Cancel功能,当任务提交之后还可以对其进行取消。
  • 任务运行时出错未提供回调方式。
  • 其他需要改进的地方请读者自行思考并解决。

0 人点赞