@adamhand
2018-12-22T03:22:02.000000Z
字数 8410
阅读 1250
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
| 方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 检查方法 | element() | peek() | 不可用 | 不可用 |
IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。true。移除方法,则是从队列里拿出一个元素,如果没有则返回null。put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。JDK7提供了7个阻塞队列。分别是:
| 队列 | 有界性 | 锁 | 数据结构 |
|---|---|---|---|
| ArrayBlockingQueue | 有界 | 加锁 | 数组 |
| LinkedBlockingQueue | 有界 | 加锁 | 单链表 |
| PriorityBlockingQueue | 无界 | 加锁 | 堆 |
| DelayQueue | 无界 | 加锁 | 堆 |
| SynchronousQueue | 有界 | 无锁(CAS) | - |
| LinkedTransferQueue | 无界 | 无锁(CAS) | 单链表 |
| LinkedBlockingDeque | 无界 | 加锁 | 双链表 |
ReentrantLock默认实现非公平锁,但是可以使用一个带参的构造函数实现公平锁。
public int compareTo(Delayed other) {if (other == this) // compare zero ONLY if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask x = (ScheduledFutureTask)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long d = (getDelay(TimeUnit.NANOSECONDS) -other.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1);}
private static final int NOW = 0; // for untimed poll, tryTransferprivate static final int ASYNC = 1; // for offer, put, addprivate static final int SYNC = 2; // for transfer, takeprivate static final int TIMED = 3; // for timed poll, tryTransfer
① NOW :在取数据的时候,如果没有数据,则直接返回,无需阻塞等待。
② ASYNC:入队的操作都不会阻塞,也就是说,入队后线程会立即返回,不需要等到消费者线程来取数据。
③ SYNC :取数据的时候,如果没有数据,则会进行阻塞等待。
④ TIMED : 取数据的时候,如果没有数据,则会进行超时阻塞等待。
由上面的分析可以看到,阻塞队列有两种:加锁和不加锁。
加锁的队列是使用ReentrantLock的Condition的await()和signal()方法来实现生产者和消费者之间通信的,当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。而await()方法调用的是LockSupport.park()方法,这个park方法调用的又是调用的unsafe.park()方法实现队列的阻塞的。
不加锁的队列使用的是CAS算法+LockSupport.park()/unpark()方法来实现的。
下面以ArrayBlockingQueue为例看一下。通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现,代码如下:
private final Condition notFull;private final Condition notEmpty;public ArrayBlockingQueue(int capacity, boolean fair) {//省略其他代码notEmpty = lock.newCondition();notFull = lock.newCondition();}public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();insert(e);} finally {lock.unlock();}}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return extract();} finally {lock.unlock();}}private void insert(E x) {items[putIndex] = x;putIndex = inc(putIndex);++count;notEmpty.signal();}
当往队列里插入一个元素时,如果队列不可用,阻塞生产者主要通过LockSupport.park(this);来实现:
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
继续进入源码,发现调用setBlocker先保存下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。
public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);unsafe.park(false, 0L);setBlocker(t, null);}
unsafe.park是个native方法,这个方法会阻塞当前线程,只有以下四种情况中的一种发生时,该方法才会返回。
继续看一下JVM是如何实现park方法的,park在不同的操作系统使用不同的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代码如下:
void os::PlatformEvent::park() {int v ;for (;;) {v = _Event ;if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;}guarantee (v >= 0, "invariant") ;if (v == 0) {// Do this the hard way by blocking ...int status = pthread_mutex_lock(_mutex);assert_status(status == 0, status, "mutex_lock");guarantee (_nParked == 0, "invariant") ;++ _nParked ;while (_Event < 0) {status = pthread_cond_wait(_cond, _mutex);// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...// Treat this the same as if the wait was interruptedif (status == ETIME) { status = EINTR; }assert_status(status == 0 || status == EINTR, status, "cond_wait");}-- _nParked ;// In theory we could move the ST of 0 into _Event past the unlock(),// but then we'd need a MEMBAR after the ST._Event = 0 ;status = pthread_mutex_unlock(_mutex);assert_status(status == 0, status, "mutex_unlock");}guarantee (_Event >= 0, "invariant") ;}}
pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数,一个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用WaitForSingleObject实现的。
在我们的业务中通常会有一些需求是这样的:
那么这类业务我们可以总结出一个特点:需要延迟工作。由此的情况,就是我们的DelayQueue应用需求的产生。
我们在网咖或者网吧上网时会用到一个网吧综合系统,其中有一个主要功能就是给每一位网民计时,用户充值一定金额会有相应的上网时常,这里我们用DelayQueue模拟实现一下:用DelayQueue存储网民(Wangmin类),每一个考生都有自己的名字和完成试卷的时间,Wangba线程对DelayQueue进行监控,从队列中取出到时间的网民执行下机操作。
public class Wangmin implements Delayed {private String name;//身份证private String id;//截止时间private long endTime;//定义时间工具类private TimeUnit timeUnit = TimeUnit.SECONDS;public Wangmin(String name,String id,long endTime){this.name=name;this.id=id;this.endTime = endTime;}public String getName(){return this.name;}public String getId(){return this.id;}/*** 用来判断是否到了截止时间*/@Overridepublic long getDelay(TimeUnit unit) {//return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);return endTime - System.currentTimeMillis();}/*** 相互批较排序用*/@Overridepublic int compareTo(Delayed delayed) {Wangmin w = (Wangmin)delayed;return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;}}
public class WangBa implements Runnable {private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();public boolean yingye =true;/*** 上机*/public void shangji(String name,String id,int money){Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");this.queue.add(man);}// 下机public void xiaji(Wangmin man){System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");}@Overridepublic void run() {while(yingye){try {Wangmin man = queue.take();xiaji(man);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String args[]){try{System.out.println("网吧开始营业");WangBa siyu = new WangBa();Thread shangwang = new Thread(siyu);shangwang.start();siyu.shangji("路人甲", "123", 1);siyu.shangji("路人乙", "234", 10);siyu.shangji("路人丙", "345", 5);}catch(Exception e){e.printStackTrace();}}}
聊聊并发(七)——Java中的阻塞队列
Java 并发 --- 阻塞队列总结
Java并发编程-阻塞队列(BlockingQueue)的实现原理
java并发之SynchronousQueue实现原理
使用delayedQueue实现你本地的延迟队列
DelayedQueue学习笔记