主要内容
代码语言:javascript复制1. 什么是异步
2. 软件设计中如何实现异步操作
2.1 Callback机制
2.1.1 asynchronous callback
2.1.2 Event-Listener
2.2 Future机制
2.2.1 Future
2.2.2 Promise
3. 异步编程的优势和不足
4. 总结
1.什么是异步
程序或系统中关于异步的概念使用的比较多,那么什么是异步呢?下面举个生活中最常见的情景来进行说明:
叫外卖
外卖
这个例子中,订餐者作为Caller,一份蛋炒饭的制作以及配送作为一个Task,Caller只需要打个电话给餐厅,说明需求即可,然后就可以继续执行自己的任务"学习"。而待完成的Task交由对方来完成,Caller只需要在Task完成后得到通知即可。这就是一个典型的异步过程。
2.软件设计中如何实现异步操作
软件开发过程中我们经常会遇到异步的情况,比如:网络编程中的异步IO,Web开发中的异步Servlet,Ajax等等。从程序设计的角度来说,异步操作的实现主要可以通过以下两种方式实现:
- 异步回调机制
- Future机制
2.1 异步Callback机制
Callback指的就是回调机制,回调机制通常指的是将可执行的code作为参数传递给其它的code,并在合适的时机执行。当然,这里的“合适的时机”可能是异步的,也可能是同步的。前者就是我们要讨论的异步Callback机制。
异步Callback机制在具体实现上也会有不同的方案,比如:普通的回调函数或事件监听模式上面所有的方法均是基于回调函数来完成异步操作的,无非是对回调函数进行封装而已。
2.1.1 asynchronous callback
在c语言中,可以以函数指针的形式来实现回调函数的传递,但是我们知道Java中是不支持函数指针的,不过别忘了!我们还有接口呢!
情景描述
模拟ZooKeeper中client端异步对server端进行操作,这里就只模拟create node的操作。
Callback
上图是整个创建流程的时序图,步骤如下:
- 实例化Client客户端
- 启动Client内部的Worker线程
- User调用asyncCreate方法向RemoteServer发起创建Node节点请求。
- 将请求信息封装成Packet对象,加入Client内部的BlockQueue中。
- Worker不断轮询BlockQueue,通过take()方法取出队列中的待发送Packet。
- 将请求发送到RemoteServer
- 接受从RemoteServer返回的响应。
- 回调Callback接口的process方法。
代码语言:javascript复制Test Code
@Test
public void testCreateNode() throws InterruptedException{
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger(0);
Client client = new Client("localhost",8888);
long begin = System.currentTimeMillis();
client.asyncCreate("exist NodeInfo", new CallBack(){
@Override
public void process(int rc, Object response, Object ctx) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {}
count.incrementAndGet();
latch.countDown();
Assert.assertEquals("I'm context", ctx);
}
}, "I'm context");
//asyncCreate return immediately
Assert.assertTrue((System.currentTimeMillis() - begin) < 1000);
latch.await();
Assert.assertEquals(1, count.intValue());
}
代码语言:javascript复制CallBack接口实现如下:
/**
* 模拟异步回调接口
*
* @author wqx
*
*/
public interface CallBack{
/**
* 回调函数
*
* @param rc: result code
* @param response
* @param ctx : context
*/
void process(int rc, Object response,Object ctx);
}
回调函数process中有三个参数:
- rc: 返回码
- response:返回值对象
- ctx:上下文对象context
代码语言:javascript复制信息载体Packet实现如下:
/**
* Packet对象:封装request和response对象
*
* @author wqx
*
*/
public class Packet{
private Object request;
private CallBack cb;
private Object ctx;
private Object response;
private int errorCode;
//。。。
}
Client端实现
Client的设计很简单,既然需要实现异步,那么就只要将任务交给别人做喽。这里的别人就是一个叫Worker的工作线程。Client设计如下:
代码语言:javascript复制public class Client {
private final BlockingQueue<Packet> outgoingQueue = new LinkedBlockingQueue<Packet>();
private Worker worker;
public Client(String host, int port){
worker = new Worker(host,port);
worker.start();
}
/**
*
* @param nodeInfo:模拟节点信息
* @param cb:回调函数
* @param ctx:上下文信息context
*/
public void asyncCreate(String nodeInfo,CallBack cb, Object ctx){
Packet packet = new Packet();
packet.setRequest(nodeInfo);
packet.setCb(cb);
packet.setCtx(ctx);
outgoingQueue.offer(packet);
}
class Worker extends Thread {//...}
}
当第三方调用Client的asyncCreate方法后,asyncCreate做的只是将参数封装如Packet对象中,并添加入outgoingQueue发送队列,然后立即返回。可见,这一过程并不会发生阻塞。那么Worker的任务也就很明确了:不多的从outgoingQueue中取出Packet对象,然后发送到Server端,然后接收Server端返回的信息。
代码语言:javascript复制 class Worker extends Thread {
private String host;
private int port;
public Worker(String host, int port){
this.host = host;
this.port = port;
setDaemon(true);
}
public void run(){
Packet packet = null;
try {
packet = outgoingQueue.take();
} catch (InterruptedException e) {}
Object resp = sendPacket(packet);
Packet p = (Packet)resp;
//执行回调函数
packet.getCb().process(p.getErrorCode(), p.getResponse(), p.getCtx());
}
public Object sendPacket(Packet packet){
Object resp = null;
try{
Socket socket = new Socket(host,port);
OutputStream oos = socket.getOutputStream();
InputStream ois = socket.getInputStream();
try{
oos.write(JSON.toJSONString(packet).getBytes());
oos.flush();
byte[] buf = new byte[1024];
int recvSize = ois.read(buf);
String text = new String(buf,0,recvSize);
resp = JSON.parseObject(text, Packet.class);
}finally{
if(oos != null){
oos.close();
}
if(ois != null){
ois.close();
}
socket.close();
}
}catch(Exception e){}
return resp;
}
}
完整源码
2.1.2 Event-Listener
监听器模式:事件源经过事件的封装传给监听器,当事件源触发事件后,监听器接收到事件对象可以回调事件的方法.这一处理方法我们平时接触的非常多了,Servlet中HttpSessionListener、ServletContextListener等。
这一思路体现的软件设计基本原则是:重要的状态变更需要发送事件并留出监听接口。
情景描述
系统每晚定时进行批处理任务,如果任务失败则需要进行报警操作。
Test Code
测试用的批处理任务类DummyBatchTask,继承了抽象类BatchTask,根据传输入的type值,执行不同的操作,这主要方便测试。
代码语言:javascript复制public class DummyBatchTask extends BatchTask {
/*
* 测试类型
*/
private int type;
public DummyBatchTask(String taskName, int type){
super(taskName);
this.type = type;
}
@Override
void process() {
System.out.println(taskName " begin....");
switch(type){
case 1:
//process normally
break;
case 2:
//exception case
throw new NullPointerException();
}
System.out.println(taskName " completed....");
}
}
用户自定义监听类BatchTaskListener:
代码语言:javascript复制public class BatchTaskListener implements Listener<BatchTask> {
private DummyWarningService warningService;
public BatchTaskListener(DummyWarningService warningService){
this.warningService = warningService;
}
@Override
public void onSuccess(Event<BatchTask> event) {
warningService.dummyWarning(event.getElement(),"Success");
}
@Override
public void onException(Event<BatchTask> event, Throwable t) {
warningService.dummyWarning(event.getElement(),t);
}
}
下面是测试类EventListenerTest的单元测试用例:
代码语言:javascript复制 @Test
public void testTaskSuccess() {
DummyWarningService warningService = new DummyWarningService();
BatchTaskListener listener = new BatchTaskListener(warningService);
DummyBatchTask task = new DummyBatchTask("DummyTask-01",1);
BatchTaskManager manager = new BatchTaskManager();
manager.task(task)
.listener(listener)
.process();
Assert.assertEquals("DummyTask-01", warningService.taskName);
Assert.assertEquals("Success", warningService.warningMsg);
}
@Test
public void testTaskException() {
DummyWarningService warningService = new DummyWarningService();
BatchTaskListener listener = new BatchTaskListener(warningService);
DummyBatchTask task = new DummyBatchTask("DummyTask-02",2);
BatchTaskManager manager = new BatchTaskManager();
manager.task(task)
.listener(listener)
.process();
Assert.assertEquals("DummyTask-02", warningService.taskName);
Assert.assertEquals(NullPointerException.class, warningService.warningMsg.getClass());
}
测试源码
设计
涉及到的主要组件如下:
- BatchTask:批处理任务抽象类,用户只要继承该类,实现其process方法即可。
- BatchTaskManager:BatchTask实例化后提交给BatchTaskManager,BatchTaskManager负责BatchTask的执行和监控。
- Event:事件接口
- EventType:事件类型
- BatchTaskEvent:具体的事件
- Listener:监听接口
(1)首先看下BatchTask的实现:
代码语言:javascript复制public abstract class BatchTask {
public String taskName;
public BatchTask(String taskName){
this.taskName = taskName;
}
public String getTaskName(){
return taskName;
}
abstract void process() throws TimeoutException;
}
BatchTask结构很简单,有一个成员属性taskName,具体的任务类需要继承BatchTask,并实现父类中的process方法。
(2) Listener接口是一个泛型回调接口,当被监听的实体对象的状态发生变化的时候,就需要触发监听器的相应方法。这里的Listener接口中只有两个方法,监听两种状态:成功或异常。这其实需要更具具体业务具体分析。
代码语言:javascript复制/**
* Listener接口
*
* @author wqx
*
* @param <T>
*/
public interface Listener<T> {
/**
* success
*
* @param event
*/
public void onSuccess(Event<T> event);
/**
* failure
*
* @param event
* @param t
*/
void onException(Event<T> event, Throwable t);
}
(3) 这里最重要的就是BatchTaskManager,其作为供Client端使用的API,提供了一系列操作BatchTask和listener的接口。下面是code:
代码语言:javascript复制public class BatchTaskManager {
private BatchTask task;
private Listener<BatchTask> listener;
public BatchTaskManager task(BatchTask task){
this.task = task;
return this;
}
public BatchTaskManager listener(Listener<BatchTask> listener){
this.listener = listener;
return this;
}
public void process(){
if(task == null)
throw new IllegalArgumentException("Task is null");
boolean success = false;
try{
task.process();
success = true;
}catch(Throwable t){
listener.onException(new BatchTaskEvent(task,EventType.ERROR), t);
}
if(success){
listener.onSuccess(new BatchTaskEvent(task,EventType.SUCCESS));
}
}
}
在BatchTaskManager的process方法中实现了批处理任务task的执行操作,同时对于任务状态的变更留出了监听接口。
其它组件源码
2.2 Future机制
Future表示一个异步计算的结果,并提供相应方法来判断任务是否已经完成或者取消,以及获取任务结果或取消任务。最熟悉的莫过于java.util.concurrent.Future。而Promise可以认为是一个可写的Future,调用者可以通过Promise标记任务是失败或成功。下面介绍下这两种组件。
2.2.1 Future
Future既然是异步任务的抽象,那么任务提交后我们就可以做别的事情了,提交后的任何时刻都可能询问任务是否完成isDone?是否被取消isCancelled?可能随时取消任务cancel,也能通过get()方法来获取任务结果。juc中的FutureTask是Future唯一实现,表示一种抽象的可生产结果的计算。FutureTask表示的计算通过Callable实现,Callable相当于有返回值的Runnable。这里以Future的实现类FutureTask为例:
代码语言:javascript复制private static ExecutorService exec;
private static int nThreads = Runtime.getRuntime().availableProcessors() * 2;
@Test
public void testFuture() throws InterruptedException, ExecutionException{
FutureTask<String> f = new FutureTask<String>(new Callable<String>(){
@Override
public String call() throws Exception {
String result = "Hello World";
return result;
}
});
exec.submit(f);
Assert.assertTrue(!f.isDone());
Assert.assertEquals("Hello World", f.get());
}
java.util.concurrent.Future的不足
首先从上面的例子中可以看到,如果想要获取异步任务的结果,我们需要调用Future的get()方法,这个操作会阻塞到异步任务完成为止。这其实和异步编程思想是违背的。
通常在异步编程中,我们只要明确任务完成后做什么操作,而不是等待任务的结果。
这也是juc中的Future在功能上的很明显的缺陷。不过,幸运的是它的改进方案很多,比如:Guava中的ListenableFuture;Netty中的自定义的Future。两者的实现方式类似,都是通过向Future注册一个callback函数,只要异步任务一完成,则直接调用该回调函数。以Netty中的Future为例,使用过程如下:
代码语言:javascript复制//submit a task to thread pool
Future<?> f = exec.submit(new Runnable() { ... });
f.addListener(new FutureListener<?> {
public void operationComplete(Future<?> f) {
//operationComplete to be executed once the task is complete
}
});
2.2.2 Promise
Promise实际上就是一个可写的Future,它是Future的一种改进。什么是可写的Future?对于2.2.1小节中介绍的Future来说,从主流程的角度,只有通过cancel一种方式来改变其状态。那么Promise作为可写的Future,其对于Future的改变可以通过多个操作实现。一般Promise都会实现一系列setXXX方法用以改变Future状态。下面以Netty中的Promise为例进行说明,接口如下:
代码语言:javascript复制/**
* Special {@link Future} which is writable.
*/
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
Promise<V> setFailure(Throwable cause);
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
}
可以看到Promise中的setSuccess和setFailure两个方法,通过这两个方法就可以从外部改变其状态。此外还可以看到添加和移除Listener的操作接口,这其实就是对java.util.concurrent.Future的不足的改进,正如上一小节所说的那样。下面截取Netty中Bootstrap中的connect(SocketAddress remoteAddress)方法的一部分实现:
代码语言:javascript复制 if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
Bootstrap发起连接涉及到两个操作:(1)register,(2)connect。在Netty中所有的操作都是异步的,上面代码中的regFuture代表一个register异步操作结果。如果注册操作完成了,即:regFuture.isDone() = true,那么进行连接操作,具体由doConnect0实现。否则,向regFuture添加一个监听器,只有当register操作完成后,才会进行doConnect0操作。
代码语言:javascript复制regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
Netty中出现大量的利用GenericFutureListener机制代替Future的get方法。原因很简单:异步操作的时间是无法预测的,如果不设置超时时间,即用get()方法,那么会导致该线程被长时间阻塞。而设置了超时时间,即用get(timeout,TimeUnit)方法,这个超时的时间由无法精准预测。此时,利用异步通知机制回调GenericFutureListener是最佳方案。
3 异步编程的优势和不足
异步编程的优势
异步编程的优势是显而易见的,当我们的业务逻辑中有一部分的任务属于耗时型Task的时候,可以将这样的任务分发给别的线程进行处理,当前线程可以继续进行工作。这可以极大提升了程序的性能。此外,当我们的业务中存在对外界的依赖(这里的“对外界的依赖”指的是:比如网络连接的建立、SQL连接的建立、和外部系统的通信等等),异步实现的方案可以有效、便捷的处理各种失败、异常的情况,增强程序的健壮性。
异步编程的不足
首先,对于习惯于顺序编程的人来说,任务的顺序执行更加有条理性,Task1到TaskN一步一步执行,每一步的结果作为下面步骤的输入,最终得到我们想要的结果。而异步编程模型中,如何在主流程中获取异步结果是一个问题。此外,异步编程通常涉及到多线程的并发情况,线程安全方面需要做保证,这无疑增加了编程的复杂度。
4 总结
本文介绍了几种常见的异步编程模型,通过简要的代码实现了其主要原理特性。并对异步编程模型的优缺点进行了简单概括。
- Callback机制:Callback的实现相对简单,但是需要额外注意多线程环境中的安全性问题。适用于回调函数中仅需要完成简单任务的情况。
- Future机制:Future机制及其衍生的Promise可以实现在主流程中获取异步结果,对于复杂的异步任务有更加良好的可控性,这点优于Callback。