Java及Dubbo异步编程

2021-09-02 16:45:21 浏览数 (1)

一、Java异步编程

1、Future

JDK5提供Future用于实现异步,一般配合线程池执行,示例代码如下:

代码语言:javascript复制
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future<String> future = executor.submit(() -> {
      Thread.sleep(2000);
      return "hello world";
    });
    System.out.println(future.get());

Future.get方法也提供带超时时间的参数,这样就不用阻塞调用方了,有兴趣的可以搜索下相关资料。

上面是通过调用Future.get获取结果,会阻塞执行线程,属于同步调用;

如果想不阻塞可以调用Future.isDone判断是否完成。

Future使用有诸多不便:

1)、异步再加其它的异步比较麻烦

2)、判断是否完成需要调用方不断轮询,效率低下

如上面所说,如果通过异步方式,则需要调用方不断的轮询isDone方法来查询是否完成;

2、CompletableFuture

JDK8提供了CompletableFuture的解决方案,先看示例代码:

代码语言:javascript复制
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()-> {
try {
        Thread.sleep(2000);
        System.out.println("第1步休眠完毕");
      } catch(Exception ex){

      }
      return  1;
  });
  completableFuture.thenApply((i)->{
      try {
          Thread.sleep(2000);
          System.out.println("第2步休眠完毕");
      } catch(Exception ex){

      }
      return i 1;
  }).whenComplete((r, e)->{
      System.out.println(r);
  });

CompletableFuture通过thenApply叠加其它的CompletableFuture;

CompletableFuture也解决了要不断轮询isDone方法的问题,通过设置whenComplete回调就可以在操作完成时执行相应的操作。

二、Dubbo异步编程

Dubbo异步编程分2方面:消费方和提供方。

1、消费方(Consumer)

先看Dubbo2.6的方式:

代码语言:javascript复制
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
    referenceConfig.setApplication(new ApplicationConfig("first-dubbo-consumer"));
    referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    referenceConfig.setInterface(GreetingService.class);
    referenceConfig.setTimeout(5000);
    referenceConfig.setVersion("1.0.0");
    referenceConfig.setGroup("dubbo");

    // 2. 设置为异步
    referenceConfig.setAsync(true);

    // 3. 直接返回null
    GreetingService greetingService = referenceConfig.get();
    System.out.println(greetingService.sayHello("world"));

    // 4.异步执行回调
    ((FutureAdapter) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {

      @Override
      public void done(Object response) {
        System.out.println("result:"   response);
      }

      @Override
      public void caught(Throwable exception) {
        System.out.println("error:"   exception.getLocalizedMessage());
      }
    });

通过调用((FutureAdapter) RpcContext.getContext().getFuture()).getFuture().setCallback设置ResponseCallback回调,实现done和caught就可以了。

再看Dubbo2.7的方式:

代码语言:javascript复制
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
    referenceConfig.setApplication(new ApplicationConfig("first-dubbo-consumer"));
    referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    referenceConfig.setInterface(GreetingService.class);
    referenceConfig.setTimeout(30000);
    referenceConfig.setVersion("1.0.0");
    referenceConfig.setGroup("dubbo");

    // 2. 设置为异步
    referenceConfig.setAsync(true);

    // 3. 直接返回null
    GreetingService greetingService = referenceConfig.get();
    System.out.println(greetingService.sayHello("world"));

    // 4.异步执行回调
    CompletableFuture<String> future = RpcContext.getContext().getCompletableFuture();
    future.whenComplete((v, t) -> {
      if (t != null) {
        t.printStackTrace();
      } else {
        System.out.println(v);
      }

    });

调用RpcContext.getContext().getCompletableFuture()得到CompletableFuture对象,然后调用whenComplete设置回调方法。

2、服务提供方(Provider)

Dubbo支持使用AsyncContext实现异步执行,代码如下:

代码语言:javascript复制
final ThreadPoolExecutor bizThreadpool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
      new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"),
      new ThreadPoolExecutor.CallerRunsPolicy());
final AsyncContext asyncContext = RpcContext.startAsync();
    executor.execute(() -> {
      //如果要使用上下文,则必须要放在第一句执行
      asyncContext.signalContextSwitch();
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      asyncContext.write("Hello "   name   " "   RpcContext.getContext().getAttachment("company"));
    });

三、总结

1、JDK5提供了Future的异步编程方式,只能说功能有限,使用还有诸多不便;

2、JDK8提供CompletableFuture的异步编程方式,解决了Future使用上的不便;

3、Dubbo从2方面提供不同的能力,对于消费方2.6提供FutureAdapter.setCallback的方式支持异步,2.7通过RpcContext.getContext().getCompletableFuture()得到CompletableFuture对象;对于服务提供方则通过AsyncContext实现对异步的支持;

0 人点赞