阻塞队列—LinkedBlockingQueue源码分析


阻塞队列—LinkedBlockingQueue源码分析文章插图
作者公众号:一角钱技术(org_yijiaoqian)
前言
阻塞队列—LinkedBlockingQueue源码分析文章插图
LinkedBlockingQueue 由链接节点支持的可选有界队列 , 是一个基于链表的无界队列(理论上有界) , 队列按照先进先出的顺序进行排序 。 LinkedBlockingQueue不同于ArrayBlockingQueue , 它如果不指定容量 , 默认为 Integer.MAX_VALUE , 也就是无界队列 。 所以为了避免队列过大造成机器负载或者内存爆满的情况出现 , 我们在使用的时候建议手动传一个队列的大小 。
队列创建BlockingQueue blockingQueue = new LinkedBlockingQueue<>();上面这段代码中 , blockingQueue 的容量将设置为 Integer.MAX_VALUE。
应用场景多用于任务队列 , 单线程发布任务 , 任务满了就停止等待阻塞 , 当任务被完成消费少了又开始负责发布任务 。
我们来看一个例子:
package com.niuh.queue.linked;import org.apache.commons.lang.RandomStringUtils;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicLong;public class TestLinkedBlockingQueue {private static LinkedBlockingQueue queue = new LinkedBlockingQueue();// 线程控制开关private final CountDownLatch latch = new CountDownLatch(1);// 线程池private final ExecutorService pool;// AtomicLong 计数 生产数量private final AtomicLong output = new AtomicLong(0);// AtomicLong 计数销售数量private final AtomicLong sales = new AtomicLong(0);// 是否停止线程private final boolean clear;public TestLinkedBlockingQueue(boolean clear) {this.pool = Executors.newCachedThreadPool();this.clear = clear;}public void service() throws InterruptedException {Consumer a = new Consumer(queue, sales, latch, clear);pool.submit(a);Producer w = new Producer(queue, output, latch);pool.submit(w);latch.countDown();}public static void main(String[] args) {TestLinkedBlockingQueue t = new TestLinkedBlockingQueue(false);try {t.service();} catch (InterruptedException e) {e.printStackTrace();}}}/** * 消费者(销售产品) */class Consumer implements Runnable {private final LinkedBlockingQueue queue;private final AtomicLong sales;private final CountDownLatch latch;private final boolean clear;public Consumer(LinkedBlockingQueue queue, AtomicLong sales, CountDownLatch latch, boolean clear) {this.queue = queue;this.sales = sales;this.latch = latch;this.clear = clear;}public void run() {try {latch.await(); // 放闸之前老实的等待着for (; ; ) {sale();Thread.sleep(500);}} catch (InterruptedException e) {if (clear) { // 响应中断请求后,如果有要求则销售完队列的产品后再终止线程cleanWarehouse();} else {System.out.println("Seller Thread will be interrupted...");}}}public void sale() {System.out.println("==取take=");try {String item = queue.poll(50, TimeUnit.MILLISECONDS);System.out.println(item);if (item != null) {sales.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数}} catch (InterruptedException e) {e.printStackTrace();}}/*** 销售完队列剩余的产品*/private void cleanWarehouse() {try {while (queue.size() > 0) {sale();}} catch (Exception ex) {System.out.println("Seller Thread will be interrupted...");}}}/** * 生产者(生产产品) * */class Producer implements Runnable {private LinkedBlockingQueue queue;private CountDownLatch latch;private AtomicLong output;public Producer() {}public Producer(LinkedBlockingQueue queue, AtomicLong output, CountDownLatch latch) {this.queue = queue;this.latch = latch;this.output = output;}public void run() {try {latch.await(); // 线程等待for (; ; ) {work();Thread.sleep(100);}} catch (InterruptedException e) {System.out.println("Producer thread will be interrupted...");}}/*** 工作*/public void work() {try {String product = RandomStringUtils.randomAscii(3);boolean success = queue.offer(product, 100, TimeUnit.MILLISECONDS);if (success) {output.incrementAndGet();// 可以声明long型的参数获得返回值,作为日志的参数}} catch (InterruptedException e) {e.printStackTrace();}}}工作原理LinkedBlockingQueue内部由单链表实现 , 只能从head取元素 , 从tail添加元素 。 添加元素和获取元素都有独立的锁 , 也就是说LinkedBlockingQueue是读写分离的 , 读写操作可以并行执行 。 LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全 。
向无限队列添加元素的所有操作都将永远不会阻塞 , [注意这里不是说不会加锁保证线程安全] , 因此它可以增长到非常大的容量 。
使用无限 BlockingQueue 设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。 否则 , 内存可能会填满 , 然后就会得到一个 OutOfMemory 异常 。