我们都知道线程间的通信可以使用BlockingQueue,那么为什么BlockingQueue可以进行线程间的通信呢?其实就在于BlockingQueue的锁机制。重入锁ReentrantLock带有一个Condition的条件,可以进行线程等待和唤醒的功能,而BlockingQueue正是使用了这一机制来进行线程间的通信的。从LinkedBlockingQueue的jdk的源码可以看到
代码语言:javascript复制/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
我们来看一组示例代码
代码语言:javascript复制public final class PCData {
private final int intData;
public PCData(int intData) {
this.intData = intData;
}
public PCData(String d) {
this.intData = Integer.valueOf(d);
}
public int getIntData() {
return intData;
}
@Override
public String toString() {
return "data:" intData;
}
}
保存数据的对象类
代码语言:javascript复制public class Consumer implements Runnable {
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id=" Thread.currentThread().getId());
Random r = new Random();
try {
while (true) {
PCData data = queue.take();
if (null != data) {
int re = data.getIntData() * data.getIntData();
System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntData(),data.getIntData(),re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
消费者线程,这里面有一个PCData data = queue.take();这个take()方法就是线程通信的关键。从jdk源码来看
代码语言:javascript复制public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//优先处理中断信号
takeLock.lockInterruptibly();
try {
//从队列内取数,当队列内没数的时候进行等待中断
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
当队列内没有数据可取的时候,当前线程被中断,等待队列内有数据为止继续执行。而不是简单取个数出来那么简单。
代码语言:javascript复制public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producer id=" Thread.currentThread().getId());
try {
while (isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data " is put into queue");
if (!queue.offer(data,2, TimeUnit.SECONDS)) {
System.out.println("failed to put data: " data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
生产者,这里有一个方法queue.offer(data,2, TimeUnit.SECONDS),来看看他的jdk源码
代码语言:javascript复制public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//插入一个数据后,c从-1加到0
if (c == 0)
//唤醒take()方法的等待,继续执行
signalNotEmpty();
return true;
}
代码语言:javascript复制private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
由上面的代码可以看到,如果队列中一旦有数据就会唤醒取数的线程继续取数,取完继续等待中断。
代码语言:javascript复制public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<PCData> queue = new LinkedBlockingQueue<>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
由此执行可以看到消费者会一直等待生产者放入数据进行消费。
运行结果:
start producer id=11 start producer id=13 start producer id=12 start Consumer id=14 start Consumer id=15 start Consumer id=16 data:1 is put into queue 1*1=1 data:2 is put into queue 2*2=4 data:3 is put into queue 3*3=9 data:4 is put into queue 4*4=16 data:5 is put into queue data:6 is put into queue 5*5=25 6*6=36 data:7 is put into queue 7*7=49 data:8 is put into queue 8*8=64 data:9 is put into queue 9*9=81 data:10 is put into queue 10*10=100 data:11 is put into queue 11*11=121 data:12 is put into queue 12*12=144 data:13 is put into queue 13*13=169 data:14 is put into queue 14*14=196 data:15 is put into queue 15*15=225 data:16 is put into queue 16*16=256 data:17 is put into queue 17*17=289 data:18 is put into queue 18*18=324 data:19 is put into queue data:20 is put into queue 19*19=361 data:21 is put into queue 20*20=400 data:22 is put into queue 21*21=441 22*22=484 data:23 is put into queue data:24 is put into queue data:25 is put into queue data:26 is put into queue data:27 is put into queue data:28 is put into queue 23*23=529 24*24=576 data:29 is put into queue data:30 is put into queue data:31 is put into queue 25*25=625 26*26=676 27*27=729 28*28=784 data:32 is put into queue data:33 is put into queue data:34 is put into queue 29*29=841 data:35 is put into queue 30*30=900 31*31=961 data:36 is put into queue 32*32=1,024 33*33=1,089 data:37 is put into queue data:38 is put into queue data:39 is put into queue 34*34=1,156 data:40 is put into queue data:41 is put into queue data:42 is put into queue data:43 is put into queue 35*35=1,225 36*36=1,296 data:44 is put into queue 37*37=1,369 38*38=1,444 39*39=1,521 data:45 is put into queue data:46 is put into queue data:47 is put into queue 40*40=1,600 41*41=1,681 data:48 is put into queue data:49 is put into queue data:50 is put into queue 42*42=1,764 data:51 is put into queue data:52 is put into queue 43*43=1,849 44*44=1,936 45*45=2,025 data:53 is put into queue data:54 is put into queue data:55 is put into queue data:56 is put into queue data:57 is put into queue 46*46=2,116 47*47=2,209 48*48=2,304 49*49=2,401 data:58 is put into queue 50*50=2,500 data:59 is put into queue 51*51=2,601 52*52=2,704 53*53=2,809 data:60 is put into queue 54*54=2,916 55*55=3,025 data:61 is put into queue data:62 is put into queue 56*56=3,136 57*57=3,249 58*58=3,364 59*59=3,481 60*60=3,600 data:63 is put into queue 61*61=3,721 62*62=3,844 63*63=3,969
但这个程序的弊端也是非常明显的,在高并发的场合,完全使用锁和阻塞来实现线程同步,性能并不优越。
当然还有一种特殊的队列——延迟队列,也是BlockingQueue接口实现类的一种——DelayQueue
源码开头
代码语言:javascript复制public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
//优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
代码语言:javascript复制 private final Condition available = lock.newCondition();
代码语言:javascript复制 private Thread leader = null;
它跟LinkedBlockingQueue不同,它只有一个锁跟一个条件跟一个优先队列。
先简单介绍一下优先队列PriorityQueue
代码语言:javascript复制@Data
@AllArgsConstructor
public class Student implements Comparable {
private int score;
private String name;
/**
* 对其以分数进行比较
* @param o
* @return
*/
public int compareTo(Object o) {
Student current = (Student)o;
if (current.getScore() > this.getScore()) {
return 1;
}else if (current.getScore() == this.getScore()) {
return 0;
}
return -1;
}
}
代码语言:javascript复制public class PriorityMain {
public static void main(String[] args) {
final PriorityQueue<Student> queue = new PriorityQueue();
Student p1=new Student(95,"张三");
Student p2=new Student(89,"李四");
Student p3=new Student(69,"孙七");
Student p4=new Student(67,"王五");
Student p5=new Student(92,"赵六");
queue.add(p1);
queue.add(p2);
queue.add(p3);//add 和offer效果一样。
queue.offer(p4);//add 方法实现,其实就是调用了offer
queue.offer(p5);
for (Student student:queue) {
System.out.println(student.toString());
}
System.out.println("-----------------------");
while (!queue.isEmpty()) {
System.out.println(queue.poll());
}
}
}
运行结果:
Student(score=95, name=张三) Student(score=92, name=赵六) Student(score=69, name=孙七) Student(score=67, name=王五) Student(score=89, name=李四) ----------------------- Student(score=95, name=张三) Student(score=92, name=赵六) Student(score=89, name=李四) Student(score=69, name=孙七) Student(score=67, name=王五)
从下盘的结果可以看出,优先队列的取出(poll)出来的总是分数最高的。不过这个队列并不是线程安全的。
再回到延迟队列DelayQueue,它的数据存储对象需要实现一个接口——Delayed,并具体实现两个方法compairTo和GetDelay
代码语言:javascript复制@Data
public class Message implements Delayed {
//消息id
private int id;
// 消息内容
private String body;
// 延迟时长,这个是必须的属性因为要按照这个判断延时时长
private long excuteTime;
public Message(int id,String body,long delayTime) {
this.id = id;
this.body = body;
//将延迟时长(单位毫秒)转化成纳秒
this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime,TimeUnit.MILLISECONDS) System.nanoTime();
}
/**
* 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
* @param unit
* @return
*/
public long getDelay(TimeUnit unit) {
return unit.convert(this.excuteTime - System.nanoTime(),TimeUnit.NANOSECONDS);
}
/**
* 自定义实现比较方法返回 1 0 -1三个参数
* @param o
* @return
*/
public int compareTo(Delayed o) {
Message msg = (Message)o;
return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1:(Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
}
}
消费者线程
代码语言:javascript复制@AllArgsConstructor
public class Cosumer implements Runnable {
private DelayQueue<Message> queue;
public void run() {
while (true) {
try {
long time = System.currentTimeMillis();
Message take = queue.take();
System.out.println("消费消息id:" take.getId() "消息体:" take.getBody());
System.out.println("共使用" (System.currentTimeMillis() - time));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这里同样有一个从延迟队列取出queue.take(),我们来看一下JDK的源码(加了注释),由源码可以看出,Message take = queue.take()要在延迟时间过后且队列中有消息,才可以拿到该消息,否则就一直等待和中断。
代码语言:javascript复制public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//优先处理中断
lock.lockInterruptibly();
try {
for (;;) {
//获取优先队列的队首元素
E first = q.peek();
//如果队首元素为空,等待中断
if (first == null)
available.await();
else {
//取得延期时间
long delay = first.getDelay(NANOSECONDS);
//如果到期,从队列中弹出消息
if (delay <= 0)
return q.poll();
//未到期
first = null; // don't retain ref while waiting
//该线程依然在运行,中断等待
if (leader != null)
available.await();
else { //该线程未运行,获取当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//在延迟时间之前一直等待
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果该线程未运行且优先队列首元素不为空,唤醒
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
最后是main方法
代码语言:javascript复制public class DelayMain {
public static void main(String[] args) {
DelayQueue<Message> queue = new DelayQueue();
Message m1 = new Message(1,"world",3000);
Message m2 = new Message(2,"hello",10000);
queue.offer(m2);
queue.offer(m1);
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(new Cosumer(queue));
exec.shutdown();
}
}
运行结果
消费消息id:1消息体:world 共使用2999 消费消息id:2消息体:hello 共使用7000
不过程序并没有执行完成,因为queue一直在中断等待。
无锁框架:Disruptor
在Disruptor中,使用环形队列(RingBuffer)来代替普通线性队列。
要使用Disruptor,需要先引用他的jar,pom中配置如下
代码语言:javascript复制<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
现在我们同样来实现上面的业务逻辑
代码语言:javascript复制public class PCData {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
数据对象类
代码语言:javascript复制public class PCDataFactory implements EventFactory<PCData> {
@Override
public PCData newInstance() {
return new PCData();
}
}
数据工厂
代码语言:javascript复制public class Consumer implements WorkHandler<PCData> {
@Override
public void onEvent(PCData pcData) throws Exception {
System.out.println(Thread.currentThread().getId() ":Event: --" pcData.getValue() * pcData.getValue() "--");
}
}
消费者
代码语言:javascript复制public class Producer {
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer bb) {
long sequence = ringBuffer.next();
try {
PCData event = ringBuffer.get(sequence);
event.setValue(bb.getLong(0));
} finally {
ringBuffer.publish(sequence);
}
}
}
生产者
代码语言:javascript复制public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
int bufferSize = 1024;
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,bufferSize,executor, ProducerType.MULTI,new YieldingWaitStrategy());
//这里起了4个消费者线程
disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer(),new Consumer(),new Consumer());
disruptor.start();
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0;true;l ) {
bb.putLong(0,l);
producer.pushData(bb);
Thread.sleep(100);
System.out.println("add data " l);
}
}
}
主方法中YieldingWaitStrategy()有4种策略
1、BlockingWaitStrategy:类似于BlockingQueue,性能不佳
2、SleepingWaitStrategy:比较保守,适合对延时要求不是特别高的场合。
3、YieldingWaitStrategy():低延时场合,对延时有较为严格的要求,可以使用这个策略,最好计算机实际线程大于程序使用线程。
4、BusySpinWaitStrategy():最疯狂的等待策略。就是一个死循环。对延迟非常苛刻的场合可以考虑使用它。物理CPU数大于程序使用线程。
运行结果:
11:Event: --0-- add data 0 12:Event: --1-- add data 1 13:Event: --4-- add data 2 14:Event: --9-- add data 3 11:Event: --16-- add data 4 12:Event: --25-- add data 5 13:Event: --36-- add data 6 14:Event: --49-- add data 7 11:Event: --64-- add data 8 12:Event: --81-- add data 9 13:Event: --100-- add data 10 14:Event: --121-- add data 11 11:Event: --144-- add data 12 12:Event: --169-- add data 13 13:Event: --196-- add data 14 14:Event: --225-- add data 15 11:Event: --256-- add data 16 12:Event: --289-- add data 17 13:Event: --324-- add data 18 14:Event: --361-- add data 19 11:Event: --400-- add data 20 12:Event: --441-- add data 21 13:Event: --484-- add data 22 14:Event: --529-- add data 23 11:Event: --576-- add data 24 12:Event: --625-- add data 25 13:Event: --676-- add data 26 14:Event: --729-- add data 27 11:Event: --784-- add data 28 12:Event: --841-- add data 29 13:Event: --900-- add data 30 14:Event: --961-- add data 31 11:Event: --1024-- add data 32 12:Event: --1089-- add data 33 13:Event: --1156-- add data 34 14:Event: --1225-- add data 35 11:Event: --1296-- add data 36 12:Event: --1369-- add data 37 13:Event: --1444-- add data 38 14:Event: --1521-- add data 39 11:Event: --1600-- add data 40 12:Event: --1681-- add data 41 13:Event: --1764-- add data 42 14:Event: --1849-- add data 43 11:Event: --1936-- add data 44 12:Event: --2025-- add data 45 13:Event: --2116-- add data 46 14:Event: --2209-- add data 47 11:Event: --2304-- add data 48 12:Event: --2401-- add data 49 13:Event: --2500-- add data 50 14:Event: --2601-- add data 51 11:Event: --2704-- add data 52 12:Event: --2809-- add data 53 13:Event: --2916-- add data 54 14:Event: --3025-- add data 55 11:Event: --3136-- add data 56 12:Event: --3249-- add data 57 13:Event: --3364-- add data 58 14:Event: --3481-- add data 59 11:Event: --3600-- add data 60 12:Event: --3721-- add data 61 13:Event: --3844-- add data 62 14:Event: --3969-- add data 63 11:Event: --4096-- add data 64 12:Event: --4225-- add data 65 13:Event: --4356-- add data 66 14:Event: --4489-- add data 67 11:Event: --4624-- add data 68 12:Event: --4761-- add data 69 13:Event: --4900-- add data 70 14:Event: --5041-- add data 71 11:Event: --5184-- add data 72 12:Event: --5329-- add data 73 13:Event: --5476-- add data 74 14:Event: --5625-- add data 75 11:Event: --5776-- add data 76 12:Event: --5929-- add data 77 13:Event: --6084-- add data 78 14:Event: --6241-- add data 79 11:Event: --6400-- add data 80 12:Event: --6561-- add data 81 13:Event: --6724-- add data 82 14:Event: --6889-- add data 83 11:Event: --7056-- add data 84 12:Event: --7225-- add data 85 13:Event: --7396--
最后说明的是Disruptor使用的是无锁技术,性能比BlockingQueue至少高一个数量级以上。