SynchronousQueue 同步队列入门使用&源码详解( 三 )
> k = QNode.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}}transfer 核心方法基本算法是循环尝试执行以下两个操作之一:
- 如果队列明显为空或持有相同模式的节点 , 请尝试将节点添加到等待队列中 , 等待被实现(或取消)并返回匹配项 。
- 如果队列显然包含等待项 , 并且此调用是互补模式 , 请尝试通过对等待节点的CAS'ing item字段进行出队并使其出队 , 然后返回匹配项来实现 。
循环以空检查开始 , 以防止看到未初始化的头或尾值 。
这在当前的 SynchronousQueue 中永远不会发生 , 但是如果调用者持有对传输者的非易失性/最终引用(non-volatile/final) , 则可能会发生这种情况 。
无论如何 , 这里的检查是因为将空检查放在循环的顶部 , 通常比隐式散布(implicitly interspersed)检查要快 。
/** * 放入或者获取一个元素 * @author 老马啸西风 */@SuppressWarnings("unchecked")E transfer(E e, boolean timed, long nanos) {QNode s = null; // constructed/reused as neededboolean isData = http://kandian.youth.cn/index/(e != null);for (;;) {QNode t = tail;QNode h = head;// 元素未初始化完成 , 自旋 。if (t == null || h == null)continue;// 如果为空 , 或者持有相同模式的节点if (h == t || t.isData == isData) {QNode tn = t.next;// 元素被其他线程修改 , 重来if (t != tail)continue;// 尝试节点加入到 tailif (tn != null) {advanceTail(t, tn);continue;}// 时间不等人 , 直接返回 nullif (timed// 设置新元素失败 , 重来if (s == null)s = new QNode(e, isData);if (!t.casNext(null, s))continue;// 添加到尾部等待advanceTail(t, s);// 旋转/阻塞 , 直到满足节点s为止 。Object x = awaitFulfill(s, e, timed, nanos);// 等待被取消 , 清空元素 , 并且返回 nullif (x == s) {clean(t, s);return null;}// 元素已经不再队列中了 ,if (!s.isOffList()) {// not already unlinked// 设置 s 为新的头结点advanceHead(t, s);if (x != null)// and forget fieldss.item = s;s.waiter = null;}return (x != null) ? (E)x : e;} else {// 互补模式 complementary-mode// 这里就是上面说的第二种算法模式QNode m = h.next;// 已经被其他线程修改 , 重来if (t != tail || m == null || h != head)continue;// inconsistent readObject x = m.item;// 元素 CAS 失败 , 执行出队 , 进行重试 。if (isData == (x != null) ||// m already fulfilledx == m ||// m cancelled!m.casItem(x, e)) {// lost CASadvanceHead(h, m);// dequeue and retrycontinue;}// 成功设置advanceHead(h, m);// successfully fulfilledLockSupport.unpark(m.waiter);return (x != null) ? (E)x : e;}}}
awaitFulfill 等待直到节点满足条件/** * 旋转/阻止 , 直到满足节点s为止 。* * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled * @author 老马啸西风 */Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {/* Same idea as TransferStack.awaitFulfill */// deadline 是第一生产力// 我们设置一个超时时间 , 避免一直等待下去 。final long deadline = timed ? System.nanoTime() + nanos : 0L;// 这 3 目运算符用的人脑袋疼 。Thread w = Thread.currentThread();int spins = ((head.next == s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (;;) {// 如果当前线程被打断 , 尝试取消 。if (w.isInterrupted())s.tryCancel(e);Object x = s.item;// 如果 s.item 与 e 不等 , 直接返回 xif (x != e)return x;if (timed) {// 计算超时时间 , 超时之后尝试取消 。nanos = deadline - System.nanoTime();if (nanos <= 0L) {s.tryCancel(e);continue;}}// 更新自旋次数if (spins > 0)--spins;// 如果 waiter 为 null,设置为当前线程 。else if (s.waiter == null)s.waiter = w;else if (!timed)// 通过 LockSupport 进行 park , 区别只是是否有超时时间 。LockSupport.park(this);else if (nanos > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanos);}}
clean 清空操作在任何给定时间 , 列表中的一个节点都不能删除-最后插入的节点 。为了解决这个问题 , 如果我们不能删除s , 我们将其前身保存为 cleanMe , 首先删除之前保存的版本 。
可以始终删除节点s或先前保存的节点中的至少一个 , 因此该操作始终终止 。
/** * 使用原始的前任pred摆脱已取消的节点s 。* @author 老马啸西风 */void clean(QNode pred, QNode s) {// 清空 waiter 信息s.waiter = null;while (pred.next == s) { //如果已取消链接 , 请提早返回QNode h = head;QNode hn = h.next;//设置已取消的第一个节点为headif (hn != nullcontinue;}// 保证一致性读QNode t = tail;// Ensure consistent read for tail// 为空 , 直接返回if (t == h)return;// 不一致 , 重试QNode tn = t.next;if (t != tail)continue;// 尝试设置 tail 信息if (tn != null) {advanceTail(t, tn);continue;}// 如果 s 节点不是尾巴节点 , 尝试进行 unspliceif (s != t) {QNode sn = s.next;if (sn == s || pred.casNext(s, sn))return;}QNode dp = cleanMe;//尝试取消链接先前取消的节点if (dp != null) {QNode d = dp.next;QNode dn;if (d == null ||// d is gone ord == dp ||// d is off list or!d.isCancelled() ||// d not cancelled or(d != t// s 已经保存在信息中 , 直接返回if (dp == pred)return;} else if (casCleanMe(null, pred))return;// Postpone cleaning s}}
- 新机|中兴公布全新渠道及市场策略,两款新机同步亮相
- 公安|共享位置文图同步声像同传!大连公安推出微信微博在线报警
- 太方便了!手机照片同步到电脑竟然这么简单,整理归类更轻松
- 摄像头|软硬件同步升级 2021年手机还能有什么新花样?
- 24问:一主多从的半同步复制,到底是哪个slave拖慢了性能
- 二十三、Python队列实现多线程(下篇)
- 揭秘|虾秘功能大揭秘之店铺同步&数据总览
- 越南|富士康计划部分生产线迁至越南,供应商或同步迁移,苹果布局多年
- 门店|新零售添生力军 绿叶购全国200家门店今日同步开业
- 升级|抢走华为订单!台积电2022年量产4nm芯片,苹果A16将同步升级