突击并发编程JUC系列-阻塞队列 BlockingQueue

突击并发编程JUC系列演示代码地址:
作者:故人
出处:
什么是阻塞队列阻塞队列(BlockingQueue)是一个支持两个附加操作的队列 。 这两个附加的操作支持阻塞的插入和移除方法 。

  • 支持阻塞的插入方法:意思是当队列满时 , 队列会阻塞插入元素的线程 , 直到队列不满 。
  • 支持阻塞的移除方法:意思是在队列为空时 , 获取元素的线程会等待队列变为非空 。
阻塞队列常用于生产者和消费者的场景 , 生产者是向队列里添加元素的线程 , 消费者是从队列里取元素的线程 。 阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器 。
插入和移除操作的4种处理方式
突击并发编程JUC系列-阻塞队列 BlockingQueue文章插图
  • 抛出异常:当队列满时 , 如果再往队列里插入元素 , 会抛出 IllegalStateException ("Queue full")异常 。 当队列空时 , 从队列里获取元素会抛出 NoSuchElementException 异常 。
  • 返回特殊值:当往队列插入元素时 , 会返回元素是否插入成功 , 成功返回true 。 如果是移除方法 , 则是从队列里取出一个元素 , 如果没有则返回 null。
  • 一直阻塞:当阻塞队列满时 , 如果生产者线程往队列里 put 元素 , 队列会一直阻塞生产者线程 , 直到队列可用或者响应中断退出 。 当队列空时 , 如果消费者线程从队列里 take 元素 , 队列会阻塞住消费者线程 , 直到队列不为空 。
  • 超时退出:当阻塞队列满时 , 如果生产者线程往队列里插入元素 , 队列会阻塞生产者线程一段时间 , 如果超过了指定的时间 , 生产者线程就会退出 。
如果是无界阻塞队列 , 队列不可能会出现满的情况 , 所以使用 put 或 offer 方法永远不会被阻塞 , 而且使用offer方法时 , 该方法永远返回 true 。
突击并发编程JUC系列-阻塞队列 BlockingQueue文章插图
ArrayBlockingQueueArrayBlockingQueue 是一个用数组实现的有界阻塞队列 。 此队列按照先进先出(FIFO)的原则对元素进行排序 。
默认情况下不保证线程公平的访问队列 , 所谓公平访问队列是指阻塞的线程 , 可以按照阻塞的先后顺序访问队列 , 即先阻塞线程先访问队列 。 非公平性是对先等待的线程是非公平的 , 当队列可用时 , 阻塞的线程都可以争夺访问队列的资格 , 有可能先阻塞的线程最后才访问队列 。 为了保证公平性 , 通常会降低吞吐量 。
阻塞式写方法在 ArrayBlockingQueue 中提供了两个阻塞式写方法 , 分别如下(在该队列中 , 无论是阻塞式写方法还是非阻塞式写方法 , 都不允许写入null) 。
void put(E e)boolean offer(E e, long timeout, TimeUnit unit)put() 方法示例public class ArrayBlockingQueueExample1 {public static void main(String[] args) {ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);try {queue.put("class 1");queue.put("class 2");queue.put("class 3");// 超过指定得容量当前线程阻塞queue.put("class 4");} catch (InterruptedException e) {e.printStackTrace();}}}非阻塞式写方法当队列已满时写入数据 , 如果不想使得当前线程进入阻塞 , 那么就可以使用非阻塞式的写操作方法 。
boolean add(E e)boolean offer(E e)add() 方法示例public class ArrayBlockingQueueExample2 {public static void main(String[] args) {ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);queue.add("class 1");queue.add("class 2");queue.add("class 3");//超过指定容量 抛出异常queue.add("class 4");}}// 抛出异常阻塞式读方法E take()E poll(long timeout, TimeUnit unit)take() 方法示例public class ArrayBlockingQueueExample3 {public static void main(String[] args) {ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);queue.add("class 1");queue.add("class 2");queue.add("class 3");try {// 取出对头元素System.out.println(queue.take());} catch (InterruptedException e) {e.printStackTrace();}// 队列大小System.out.println(queue.size());}}//class 1// 2非阻塞式读方法E poll()E peek()public class ArrayBlockingQueueExample4 {public static void main(String[] args) {ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);// 队列无元素 直接返回 nullSystem.out.println(queue.poll( ));System.out.println(queue.peek( ));}}// null// null部分源码public void put(E e) throws InterruptedException {// 检查元素checkNotNull(e);final ReentrantLock lock = this.lock;// 获取锁lock.lockInterruptibly();try {// 元素满 一直阻塞 , 队列非满时 , 被唤醒while (count == items.length)notFull.await();// 入队enqueue(e);} finally {lock.unlock();}}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 获取锁lock.lockInterruptibly();try {// 队列为空 等待while (count == 0)notEmpty.await();// 出队return dequeue();} finally {lock.unlock();}}