傻大方


首页 > 潮·科技 > >

高并发进阶 Exchanger 双方栅栏源码深度解析( 四 )



按关键词阅读:

当然也有可以指定超时时间的方法:
public V exchange(V x, long timeout, TimeUnit unit)throws InterruptedException, TimeoutException {Object v;Object item = (x == null) ? NULL_ITEM : x;long ns = unit.toNanos(timeout);if ((arena != null ||(v = slotExchange(item, true, ns)) == null)if (v == TIMED_OUT)throw new TimeoutException();return (v == NULL_ITEM) ? null : (V)v;}这两个方法本身并不难理解 。
因为所有的复杂度都被封装在了 slotExchange 和 arenaExchange 这两个方法中 , 也是本文的重点 。
Unsafe 机制这些知道是通过 Unsafe 机制操作的即可 , 后面会用到 。
// Unsafe mechanicsprivate static final sun.misc.Unsafe U;private static final long BOUND;private static final long SLOT;private static final long MATCH;private static final long BLOCKER;private static final int ABASE;static {int s;try {U = sun.misc.Unsafe.getUnsafe();Class ek = Exchanger.class;Class nk = Node.class;Class ak = Node[].class;Class tk = Thread.class;BOUND = U.objectFieldOffset(ek.getDeclaredField("bound"));SLOT = U.objectFieldOffset(ek.getDeclaredField("slot"));MATCH = U.objectFieldOffset(nk.getDeclaredField("match"));BLOCKER = U.objectFieldOffset(tk.getDeclaredField("parkBlocker"));s = U.arrayIndexScale(ak);// ABASE absorbs padding in front of element 0ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);} catch (Exception e) {throw new Error(e);}if ((s }slotExchange 方法最核心的只有 2 个方法 , 个人的理解就是 slot 进行交换 , 然后等待 arean 唤醒 。
/** * 在启用竞技场之前一直使用交换功能 。请参阅算法笔记 。* * @param item 待交换的元素 * @param timed 是否需要等待 * @param ns 超时时间 * @return 另一个线程的项目; 如果启用了竞技场或线程在完成之前被中断 , 则返回null;或者 或TIMED_OUT(如果超时和超时) * @author 老马啸西风 */private final Object slotExchange(Object item, boolean timed, long ns) {// 获取 threadlocal 中的节点Node p = participant.get();// 获取当前线程Thread t = Thread.currentThread();// 如果线程被中断 , 直接返回 nullif (t.isInterrupted()) // preserve interrupt status so caller can recheckreturn null;for (Node q;;) {// q = slot 不为空if ((q = slot) != null) {// 通过 CAS 设置元素成功if (U.compareAndSwapObject(this, SLOT, q, null)) {// 当前线程待交换的元素Object v = q.item;// exchange 操作传递过来的 item 元素q.match = item;// q 阻塞的线程Thread w = q.parked;// 如果有 , 则进行唤醒if (w != null)U.unpark(w);return v;}// 在竞争中创建竞技场 , 但继续直到 slot 为 null//bound 则是上次Exchanger.boundif (NCPU > 1}else if (arena != null)// 直接返回 , 进入arenaExchange逻辑处理return null; // caller must reroute to arenaExchangeelse {p.item = item;if (U.compareAndSwapObject(this, SLOT, null, p))break;p.item = null;}}// await release// 等待释放// 等待的方式就是:spin+yeild+blockint h = p.hash;long end = timed ? System.nanoTime() + ns : 0L;int spins = (NCPU > 1) ? SPINS : 1;Object v;while ((v = p.match) == null) {// 自旋 , 直至spins不大于0if (spins > 0) {// 伪随机算法 ,目的是等h小于0(随机的)h ^= h << 1;h ^= h >>> 3;h ^= h << 10;if (h == 0)h = SPINS | (int)t.getId();else if (h < 0}else if (slot != p)// 重置自旋的数量 , 并重试spins = SPINS;// 如果线程没被中断 , 且arena还没被创建 , 并且没有超时else if (!t.isInterrupted()// 挂在此结点上的阻塞着的线程p.parked = t;if (slot == p)// 挂起当前线程在 Node 节点 , 等下一个使用该节点交换的线程唤醒U.park(false, ns);// 被唤醒后 , 清空 parked 信息p.parked = null;// 清空对应的阻塞对象U.putObject(t, BLOCKER, null);}// 超时或其他(取消) , 给其他线程腾出slotelse if (U.compareAndSwapObject(this, SLOT, p, null)) {v = timedbreak;}}// 重置归位U.putOrderedObject(p, MATCH, null);p.item = null;p.hash = h;return v;}流程梳理为了便于大家理解 , 我们把流程梳理一遍:

  1. 检查slot是否为空(null) , 不为空 , 说明已经有线程在此等待 , 尝试占领该槽位 , 如果占领成功 , 与等待线程交换数据 , 并唤醒等待线程 , 交易结束 , 返回 。
  2. 如果占领槽位失败 , 创建arena , 但要继续【步骤1】尝试抢占slot , 直至slot为空 , 或者抢占成功 , 交易结束返回 。
  3. 如果slot为空 , 则判断arena是否为空 , 如果arena不为空 , 返回null , 重新路由到arenaExchange方法
  4. 如果arena为空 , 说明当前线程是先到达的 , 尝试占有slot , 如果成功 , 将slot标记为自己占用 , 跳出循环 , 继续【步骤5】 , 如果失败 , 则继续【步骤1】
  5. 当前线程等待被释放 , 等待的顺序是先自旋(spin) , 不成功则让出CPU时间片(yield) , 最后还不行就阻塞(block) , spin -> yield -> block


    稿源:(未知)

    【傻大方】网址:http://www.shadafang.com/c/111T3142H020.html

    标题:高并发进阶 Exchanger 双方栅栏源码深度解析( 四 )


上一篇:预算不够想换新?千元机考虑这三款,性价比高配置强

下一篇:百元蓝牙耳机的又一选择:长续航酷狗M3Pro能量圈颈带式耳机