阻塞队列—LinkedBlockingQueue源码分析( 二 )


源码分析定义LinkedBlockingQueue的类继承关系如下:
阻塞队列—LinkedBlockingQueue源码分析文章插图
其包含的方法定义如下:
阻塞队列—LinkedBlockingQueue源码分析文章插图
成员属性/*** 节点类 , 用于存储数据*/static class Node {E item;Node next;Node(E x) { item = x; }}/** 阻塞队列的大小, 默认为Integer.MAX_VALUE */private final int capacity;/** 当前阻塞队列中的元素个数 */private final AtomicInteger count = new AtomicInteger();/** * 阻塞队列的头节点 */transient Node head;/** * 阻塞队列的尾节点 */private transient Node last;/** 获取并移除元素时使用的锁 , 如take , poll , etc */private final ReentrantLock takeLock = new ReentrantLock();/** notEmpty 条件对象 , 当队列没有数据时用于挂起执行删除的线程 */private final Condition notEmpty = takeLock.newCondition();/** 添加元素时使用的锁 , 如 put , offer , etc */private final ReentrantLock putLock = new ReentrantLock();/** notFull 条件对象 , 每当队列数据已满时用于挂起执行添加的线程 */private final Condition notFull = putLock.newCondition();从上面的属性我们知道 , 每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点 , 添加的链表队列中 , 其中head和last分别指向队列的头结点和尾结点 。 与ArrayBlockingQueue不同的是 , LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制 , 也就是说 , 添加和删除操作并不是互斥操作 , 可以同时进行 , 这样也就可以大大提高吞吐量 。
这里如果不指定队列的容量大小 , 也就是使用默认的Integer.MAX_VALUE , 如果存在添加速度大于删除速度时候 , 有可能会内存溢出 , 这点在使用前希望慎重考虑 。
另外 , LinkedBlockingQueue对每一个lock锁都提供了一个Condition用来挂起和唤醒其他线程 。
构造函数默认的构造函数和最后一个构造函数创建的队列大小都为 Integer.MAX_VALUE , 只有第二个构造函数用户可以指定队列的大小 。 第二个构造函数最后初始化了last和head节点 , 让它们都指向了一个元素为null的节点 。
最后一个构造函数使用了putLock来进行加锁 , 但是这里并不是为了多线程的竞争而加锁 , 只是为了放入的元素能立即对其他线程可见 。
public LinkedBlockingQueue() {// 默认大小为Integer.MAX_VALUEthis(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node(null);}public LinkedBlockingQueue(Collection c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node(e));++n;}count.set(n);} finally {putLock.unlock();}}入队方法LinkedBlockingQueue提供了多种入队操作的实现来满足不同情况下的需求 , 入队操作有如下几种:

  • void put(E e);
  • boolean offer(E e);
  • boolean offer(E e, long timeout, TimeUnit unit) 。
其中:
  • offer方法有两个重载版本 , 只有一个参数的版本 , 如果队列满了就返回false , 否则加入到队列中 , 返回true , add方法就是调用此版本的offer方法;另一个带时间参数的版本 , 如果队列满了则等待 , 可指定等待的时间 , 如果这期间中断了则抛出异常 , 如果等待超时了则返回false , 否则加入到队列中返回true;
  • put方法跟带时间参数的offer方法逻辑一样 , 不过没有等待的时间限制 , 会一直等待直到队列有空余位置了 , 再插入到队列中 , 返回true 。
put(E e)public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node node = new Node(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 获取锁中断putLock.lockInterruptibly();try {//判断队列是否已满 , 如果已满阻塞等待while (count.get() == capacity) {notFull.await();}// 把node放入队列中enqueue(node);c = count.getAndIncrement();// 再次判断队列是否有可用空间 , 如果有唤醒下一个线程进行添加操作if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// 如果队列中有一条数据 , 唤醒消费线程进行消费if (c == 0)signalNotEmpty();}小结put方法来看 , 它总共做了以下情况的考虑: