SynchronousQueue 同步队列入门使用&源码详解

SynchronousQueue
SynchronousQueue 同步队列入门使用&源码详解文章插图
思维导图
是什么SynchronousQueue 是这样一种阻塞队列 , 其中每个 put 必须等待一个 take , 反之亦然 。
简而言之:线程安全 , 阻塞 。
入门案例我们定义两个线程 , 一个负责写入 , 一个负责读取 。
import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;/** * @author 老马啸西风 * @since 1.0.0 */public class SynchronousQueueDemo {public static void main(String[] args) {SynchronousQueue queue = new SynchronousQueue<>();new Writer(queue).start();new Reader(queue).start();}private static class Writer extends Thread {SynchronousQueue queue;public Writer(SynchronousQueue queue) {this.queue = queue;}@Overridepublic void run() {for (int i = 0; i < 5; i++) {System.out.println("开始设置第 " + i + " 个元素");try {TimeUnit.SECONDS.sleep(2);queue.put(i);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消息读取者*/private static class Reader extends Thread {SynchronousQueue queue;public Reader(SynchronousQueue queue) {this.queue = queue;}@Overridepublic void run() {while (true) {try {System.out.println("读取信息: " + queue.take() + "\n");} catch (InterruptedException e) {e.printStackTrace();}}}}}对应的日志信息如下:
开始设置第 0 个元素开始设置第 1 个元素读取信息: 0开始设置第 2 个元素读取信息: 1开始设置第 3 个元素读取信息: 2开始设置第 4 个元素读取信息: 3读取信息: 4可以看到当元素被设置之后 , 就会被立刻读取 。 这个在实际使用过程中还是非常便利的 , 比轮训优雅多了 。
源码分析类定义public class SynchronousQueue extends AbstractQueueimplements BlockingQueue, java.io.Serializable {// 转换类实现 , 也是最核心的一个属性 。// 后面会详细讲解private transient volatile Transferer transferer;}【SynchronousQueue 同步队列入门使用&源码详解】实现了阻塞队列接口 , 继承自 AbstractQueue 抽象队列 。
实际上对应的 put/take 方法 , 经过 transfer 的封装之后 , 都变得非常简单 。
我们本文的核心在于对 Transferer 的解析 。
构造器SynchronousQueue 也是支持是否为公平锁模式的 。
默认为非公平模式 。
是否公平取决于使用的 Transfer 实现子类 。
public SynchronousQueue() {this(false);}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue() : new TransferStack();}put 方法/** * Adds the specified element to this queue, waiting if necessary for * another thread to receive it. */public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();throw new InterruptedException();}}take 方法/** * Retrieves and removes the head of this queue, waiting if necessary * for another thread to insert it. */public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();}算法笔记下面是源码中的一部分算法笔记 , 不会出现在文档中 。
此类实现W. N. Scherer III和M. L. Scott所著的“不带条件同步的并发对象的无阻塞”中描述的双堆栈和双队列算法的扩展 。
第十八届年度大会 (2004年10月 , 分布式计算)(另请参见) 。
(Lifo)堆栈用于非公平模式 , (Fifo)队列用于公平模式 。
两者的性能通常相似 。
Fifo通常在竞争下支持更高的吞吐量 , 但是Lifo在常见应用程序中保持更高的线程局部性 。
双队列(和类似的堆栈)是在任何给定时间保存“数据”(由put操作提供的项 , 或“请求”)的插槽 , 表示 take 操作 , 或者为空 。
对 fulfill 的调用(即 , 从保存数据的队列中请求元素的调用 , 反之亦然)使互补节点出队 。
SynchronousQueue 同步队列入门使用&amp;源码详解文章插图
互补节点
这些队列最有趣的功能是 , 任何操作都可以弄清楚队列所处的模式 , 并且无需锁就可以采取相应的措施 。
队列和堆栈都扩展了抽象类Transferer , 它们定义了执行放置或取出操作的单个方法 。
将它们统一为一个方法 , 因为在双重数据结构中 , 放置和取出操作是对称的 , 因此几乎所有代码都可以合并 。
最终的传输方法长远来看 , 但比分解成几乎重复的部分要容易得多 。
队列和堆栈数据结构在概念上有很多相似之处 , 但具体细节很少 。
为简单起见 , 它们保持不同 , 以便以后可以分别发展 。