阻塞队列—LinkedBlockingQueue源码分析( 三 )

我们再看看put方法中用到的几个其他方法 , 先来看看 enqueue(Node node) 方法:
private void enqueue(Node node) {last = last.next = node;}用一张图来看看往队列里依次放入元素A和元素B , 如下:
阻塞队列—LinkedBlockingQueue源码分析文章插图
接下来我们看看signalNotEmpty , 顺带着看signalNotFull方法 。
private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}为什么要这么写?因为signal的时候要获取到该signal对应的Condition对象的锁才行 。
offer(E e)public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;Node node = new Node(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {// 队列有可用空间 , 放入node节点 , 判断放入元素后是否还有可用空间 ,// 如果有 , 唤醒下一个添加线程进行添加操作 。if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}可以看到offer仅仅对put方法改动了一点点 , 当队列没有可用元素的时候 , 不同于put方法的阻塞等待 , offer方法直接方法false 。
offer(E e, long timeout, TimeUnit unit)public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {// 等待超时时间nanos , 超时时间到了返回falsewhile (count.get() == capacity) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}该方法只是对offer方法进行了阻塞超时处理 , 使用了Condition的awaitNanos来进行超时等待 , 这里为什么要用while循环?因为awaitNanos方法是可中断的 , 为了防止在等待过程中线程被中断 , 这里使用while循环进行等待过程中中断的处理 , 继续等待剩下需等待的时间 。
出队方法入队列的方法说完后 , 我们来说说出队列的方法 。 LinkedBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求 , 如下:

  • E take();
  • E poll();
  • E poll(long timeout, TimeUnit unit);
take()public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 队列为空 , 阻塞等待while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();// 队列中还有元素 , 唤醒下一个消费线程进行消费if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 移除元素之前队列是满的 , 唤醒生产线程进行添加元素if (c == capacity)signalNotFull();return x;}take方法看起来就是put方法的逆向操作 , 它总共做了以下情况的考虑:
  • 队列为空 , 阻塞等待
  • 队列不为空 , 从对首获取并移除一个元素 , 如果消费后还有元素在队列中 , 继续唤醒下一个消费线程进行元素移除 。 如果放之前队列是满元素的情况 , 移除完后需要唤醒生产线程进行添加元素 。
我们来看看dequeue方法
private E dequeue() {// 获取到head节点Node h = head;// 获取到head节点指向的下一个节点Node first = h.next;// head节点原来指向的节点的next指向自己 , 等待下次gc回收h.next = h; // help GC// head节点指向新的节点head = first;// 获取到新的head节点的item值E x = first.item;// 新head节点的item值设置为nullfirst.item = null;return x;}我们结合注释和图来看一下链表算法:
阻塞队列—LinkedBlockingQueue源码分析文章插图
其实这个写法看起来很绕 , 我们其实也可以这么写:
private E dequeue() {// 获取到head节点Node h = head;// 获取到head节点指向的下一个节点 , 也就是节点ANode first = h.next;// 获取到下下个节点 , 也就是节点BNode next = first.next;// head的next指向下下个节点 , 也就是图中的B节点h.next = next;// 得到节点A的值E x = first.item;first.item = null; // help GCfirst.next = first; // help GCreturn x;}poll()public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}