CountDownlatch 使用:
它经常用于监听某些初始化操作 等初始化执行完毕后 通知主线程继续工作。
说多了没用 上代码:
代码语言:javascript复制import java.util.concurrent.CountDownLatch;
public class UseCountDownLatch {
public static void main(String[] args) {
final CountDownLatch countDown = new CountDownLatch(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("进入线程t1" "等待其他线程处理完成...");
countDown.await();
System.out.println("t1线程继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t2线程进行初始化操作...");
Thread.sleep(3000);
System.out.println("t2线程初始化完毕,通知t1线程继续...");
countDown.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t3线程进行初始化操作...");
Thread.sleep(4000);
System.out.println("t3线程初始化完毕,通知t1线程继续...");
countDown.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
t3.start();
}
}
运行结果:
结果分析:
线程1执行到countDown.await();
发生阻塞 因为 new CountDownLatch(2);
则需要等到两个线程发出通知(countDown.countDown())才能继续进行
也就是在线程2和线程3完事后线程1再继续执行
应用场景 :zookeeper建立完连接前阻塞 建立连接后再进行zookeeper的其他操作
CyclicBarrier
barrier 是障碍的意思 意思每个线程都准备好了然后在各在执行 就像运动员都ready后才各自开跑
上代码:
代码语言:javascript复制import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseCyclicBarrier {
static class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(name " 准备OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name " Go!!");
}
}
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3); // 3
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "zhangsan")));
executor.submit(new Thread(new Runner(barrier, "lisi")));
executor.submit(new Thread(new Runner(barrier, "wangwu")));
executor.shutdown();
}
}
运行结果:
结果分析: 三个线程各自准备 传入的CyclicBarrier 是同一个 然后等第三个人准备完事后 三个线程同时打印System.out.println(name " Go!!");
两者区别:
CountDownLatch 是一个线程阻塞 等n个线程完事后再执行 CountDownLatch 针对一个线程 CyclicBarrier 是多个线程处于阻塞中 然后到达一定数量后各自执行 CyclicBarrier 针对多个线程
Callable和Future使用:
future模式jdk给我们封装好了 用就行了
future模式非常适合在处理很耗时的业务逻辑使用 可以有效减少系统的响应时间 提高系统的吞吐量。
代码语言:javascript复制package com.bjsxt.height.concurrent019;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class UseFuture implements Callable<String>{
private String para;
public UseFuture(String para){
this.para = para;
}
/**
* 这里是真实的业务逻辑,其执行可能很慢
*/
@Override
public String call() throws Exception {
//模拟执行耗时
Thread.sleep(5000);
String result = this.para "处理完成";
return result;
}
//主控制函数
public static void main(String[] args) throws Exception {
String queryStr = "query";
//构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类
FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
//创建一个固定线程的线程池且线程数为1,
ExecutorService executor = Executors.newFixedThreadPool(2);
//这里提交任务future,则开启线程执行RealData的call()方法执行
//submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值
Future f1 = executor.submit(future); //单独启动一个线程去执行的
Future f2 = executor.submit(future2);
System.out.println("请求完毕");
try {
//这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
System.out.println("处理实际的业务逻辑...");
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
System.out.println("数据:" future.get());
System.out.println("数据:" future2.get());
executor.shutdown();
}
}
运行结果:
两个线程并行执行 然后返回结果 一个线程执行5秒 两个线程并行执行也是耗时5秒 节省了时间 这两个线程和主线程独立的 主线程执行到future.get()
阻塞 需要等待线程完成后返回结果
Semaphore 信号量
PV (page view) 网站的总访问量 页面浏览量或点击量 用户每刷新一次就会被记录一次。 UV (unique Visitor) 访问网站的一台电脑客户端为一个访客 一般0点到24点相同ip的客户端只记录一次 QPS(query per second)即每秒查询数 RT (response time) 即请求响应时间 这个指标非常关键 直接说明前端用户的体验 因此系统设计都想降低rt时间
Semaphore可以做限流
代码语言:javascript复制import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class UseSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index < 20; index ) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire();
System.out.println("Accessing: " NO);
//模拟实际业务逻辑
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release();
} catch (InterruptedException e) {
}
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(semp.getQueueLength());
// 退出线程池
exec.shutdown();
}
}
许可只能取五次 然后有线程释放许可后 其他线程才能获取许可 执行方法