阻塞队列(4)LinkedBlockingDeque源码详解
问题
- LinkedBlockingDeque 是什么?
- 优缺点?
- 应用场景?
- 源码实现?
- 个人启发?
所谓双向是指可以从队列的头和尾同时操作 , 并发只是线程安全的实现 , 阻塞允许在入队出队不满足条件时挂起线程 , 这里说的队列是指支持FIFO/FILO实现的链表 。
- 要想支持阻塞功能 , 队列的容量一定是固定的 , 否则无法在入队的时候挂起线程 。 也就是capacity是final类型的 。
- 既然是双向链表 , 每一个结点就需要前后两个引用 , 这样才能将所有元素串联起来 , 支持双向遍历 。 也即需要prev/next两个引用 。
- 双向链表需要头尾同时操作 , 所以需要first/last两个节点 , 当然可以参考LinkedList那样采用一个节点的双向来完成 , 那样实现起来就稍微麻烦点 。
- 既然要支持阻塞功能 , 就需要锁和条件变量来挂起线程 。 这里使用一个锁两个条件变量来完成此功能 。
文章插图
优缺点优点当然是功能足够强大 , 同时由于采用一个独占锁 , 因此实现起来也比较简单 。 所有对队列的操作都加锁就可以完成 。 同时独占锁也能够很好的支持双向阻塞的特性 。
凡事有利必有弊 。 缺点就是由于独占锁 , 所以不能同时进行两个操作 , 这样性能上就大打折扣 。 从性能的角度讲LinkedBlockingDeque要比LinkedQueue要低很多 , 比CocurrentLinkedQueue就低更多了 , 这在高并发情况下就比较明显了 。
前面分析足够多的Queue实现后 , LinkedBlockingDeque的原理和实现就不值得一提了 , 无非是在独占锁下对一个链表的普通操作 。
使用案例我们还是来看一个生产者消费者的例子 。
生产者
private static class Producer implements Runnable{private BlockingDeque queue;public Producer(BlockingDeque queue) {this.queue = queue;}@Overridepublic void run() {while(true) {try {Integer num = ThreadLocalRandom.current().nextInt(100);queue.put(num);System.out.println(String.format("%s producer a num %d",Thread.currentThread().getName(),num));Thread.sleep(1000);} catch (InterruptedException e1) {e1.printStackTrace();}}}}
消费者private static class Consumer implements Runnable{private BlockingDeque queue;public Consumer(BlockingDeque queue) {this.queue = queue;}@Overridepublic void run() {while(true) {try {System.out.println(String.format("%s consume a num %d",Thread.currentThread().getName(),queue.take()));} catch (InterruptedException e) {e.printStackTrace();}}}}
测试代码public static void main(String[] args) {BlockingDeque queue = new LinkedBlockingDeque<>(100);new Thread(new Producer(queue),"Producer").start();new Thread(new Consumer(queue),"Consumer").start();}
测试日志:Producer producer a num 62Consumer consume a num 62Producer producer a num 19Consumer consume a num 19Producer producer a num 26Consumer consume a num 26Consumer consume a num 39Producer producer a num 39
序列化有趣的是此类支持序列化 , 但是Node并不支持序列化 , 因此fist/last就不能序列化 , 那么如何完成序列化/反序列化过程呢?private void writeObject(java.io.ObjectOutputStream s)throws java.io.IOException {lock.lock();try {// Write out capacity and any hidden stuffs.defaultWriteObject();// Write out all elements in the proper order.for (Node p = first; p != null; p = p.next)s.writeObject(p.item);// Use trailing null as sentinels.writeObject(null);} finally {lock.unlock();}}private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();count = 0;first = null;last = null;// Read in all elements and place in queuefor (;;) {E item = (E)s.readObject();if (item == null)break;add(item);}}
描述的是LinkedBlockingDeque序列化/反序列化的过程 。 序列化时将真正的元素写入输出流 , 最后还写入了一个null 。读取的时候将所有对象列表读出来 , 如果读取到一个null就表示结束 。
这就是为什么写入的时候写入一个null的原因 , 因为没有将count写入流 , 所以就靠null来表示结束 , 省一个整数空间 。
源码解析接口
/** * @since 1.6 * @authorDoug Lea * @param the type of elements held in this collection */public class LinkedBlockingDequeextends AbstractQueueimplements BlockingDeque, java.io.Serializable {
双向链表节点/** Doubly-linked list node class */static final class Node {/*** The item, or null if this node has been removed.*/E item;/*** One of:* - the real predecessor Node* - this Node, meaning the predecessor is tail* - null, meaning there is no predecessor*/Node
- 二十三、Python队列实现多线程(下篇)
- 突击并发编程JUC系列-阻塞队列 BlockingQueue
- Redis+NodeJS实现能处理海量数据的异步任务队列系统
- 一次 ES-APM 导致的概率性大量线程阻塞问题排查
- 一篇文章搞懂同步、异步、阻塞、非阻塞、BIO、NIO和AIO
- python爬虫高级教程:多线程队列,生产消费模式爬虫
- Redis 异步消息队列与延时队列
- 阻塞队列—LinkedBlockingQueue源码分析
- springboot + rocketmq实现简单消息队列
- SynchronousQueue 同步队列入门使用&源码详解