傻大方


首页 > 潮·科技 > >

高并发进阶 Exchanger 双方栅栏源码深度解析( 五 )



按关键词阅读:

  • 如果超时(设置超时的话)或被中断 , 则退出循环 。
  • 最后 , 重置数据 , 下次重用 , 返回结果 , 结束 。
  • arenaExchange 方法/** * 启用竞技场时交换功能 。请参阅上面算法说明 。* * @param item 待交换的非 null 元素 * @param timed 如果等待已计时 , 则为true * @param ns 如果定时 , 则为最大等待时间 , 否则为0L * @return 另一个线程的项目; 或null(如果被中断); 或TIMED_OUT(如果超时和超时) */private final Object arenaExchange(Object item, boolean timed, long ns) {// arena 这个变量是在 slotExchange 中初始化的 。Node[] a = arena;// 获取当前线程的 Node 节点信息Node p = participant.get();for (int i = p.index;;) {// access slot at iint b, m, c; long j;// j is raw array offset// 从场地中选出偏移地址为(i << ASHIFT) + ABASE的内存值 , 也即真正可用的Node 。 这是一个 volatile 变量 。Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);// 如果对象存在 , 且通过 CAS 设置成功 。// 此槽位不为null, 说明已经有线程在这里等了 , 重新将其设置为null, CAS操作if (q != null// release// 将当前线程携带的数据交给等待线程q.match = item;// 可能存在的等待线程Thread w = q.parked;if (w != null)// 唤醒等待的线程U.unpark(w);// 返回结果 ,交易成功return v;}// 有效交换位置 , 且槽位为空else if (i <= (m = (b = bound)// offer// 通过 CAS 设置 slot 成功if (U.compareAndSwapObject(a, j, null, p)) {// 计算超时时间long end = (timed// 当前线程Thread t = Thread.currentThread(); // wait// 一直循环 , 直到有别的线程来交换 , 或超时 , 或中断for (int h = p.hash, spins = SPINS;;) {// 检查是否有别的线程来交换数据Object v = p.match;// 有则返回if (v != null) {// match重置 , 等着下次使用U.putOrderedObject(p, MATCH, null);// 清空 , 下次接着使用p.item = null;// clear for next usep.hash = h;// 交换结束 , 返回成功 。return v;}else if (spins > 0) {// 自旋 , 这个和 slotExchange 类似 , 是一个伪随机 。h ^= h << 1;h ^= h >>> 3;h ^= h << 10; // xorshiftif (h == 0)// initialize hashh = SPINS | (int)t.getId();// SPINS >>> 1, 一半的概率else if (h < 0// two yields per wait}// 别的线程已经到来 , 正在准备数据 , 自旋等待else if (U.getObjectVolatile(a, j) != p)spins = SPINS;// releaser hasn't set match yetelse if (!t.isInterrupted()// emulate LockSupport// 挂在此结点上的阻塞着的线程p.parked = t;// minimize windowif (U.getObjectVolatile(a, j) == p)// 阻塞 ,等着被唤醒或中断U.park(false, ns);// 被唤醒后 , 清空数据p.parked = null;// 解除阻塞对象U.putObject(t, BLOCKER, null);}else if (U.getObjectVolatile(a, j) == p// 重置元素p.item = null;p.hash = h;// 索引减半 , 为的是快速找到汇合点(最左侧)i = p.index >>>= 1;// descend// 保留中断状态 , 以便调用者可以重新检查 , Thread.interrupted() 会清除中断状态标记if (Thread.interrupted())return null;// 超时if (timed// 重新开始break;// expired; restart}}}else// 重置p.item = null;// clear offer}else {// 别的线程更改了bound , 重置collides为0, i的情况如下:当i != m, 或者m = 0时 , i = m; 否则 , i = m-1; 从右往左遍历if (p.bound != b) {// stale; resetp.bound = b;p.collides = 0;// index 左移i = (i != m || m == 0) ? m : m - 1;}else if ((c = p.collides) < m || m == FULL ||!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {p.collides = c + 1;// 更新bound, 高位递增 , 低位 +1i = (i == 0) ? m : i - 1;// cyclically traverse}else// 槽位增长i = m + 1;// growp.index = i;}}}流程梳理为了便于大家理解 , 流程梳理如下:
    1. 从场地中选出偏移地址为(i << ASHIFT) + ABASE的内存值 , 也即第i个真正可用的Node , 判断其槽位是否为空 , 为空 , 进入【步骤2】;不为空 , 说明有线程在此等待 , 尝试抢占该槽位 , 抢占成功 , 交换数据 , 并唤醒等待线程 , 返回 , 结束;没有抢占成功 , 进入【步骤9】
    2. 检查索引(i vs m)是否越界 , 越界 , 进入【步骤9】;没有越界 , 进入下一步 。
    3. 尝试占有该槽位 , 抢占失败 , 进入【步骤1】;抢占成功 , 进入下一步 。
    4. 检查match , 是否有线程来交换数据 , 如果有 , 交换数据 , 结束;如果没有 , 进入下一步 。
    5. 检查spin是否大于0 , 如果不大于0 , 进入下一步;如果大于0 , 检查hash是否小于0 , 并且spin减半或为0 , 如果不是 , 进入【步骤4】;如果是 , 让出CPU时间 , 过一会儿 , 进入【步骤4】
    6. 检查是否中断 , m达到最小值 , 是否超时 , 如果没有中断 , 没有超时 , 并且m达到最小值 , 阻塞 , 过一会儿进入【步骤4】;否则 , 下一步 。
    7. 没有线程来交换数据 , 尝试丢弃原有的槽位重新开始 , 丢弃失败 , 进入【步骤4】;否则 , 下一步 。


      稿源:(未知)

      【傻大方】网址:http://www.shadafang.com/c/111T3142H020.html

      标题:高并发进阶 Exchanger 双方栅栏源码深度解析( 五 )


    上一篇:预算不够想换新?千元机考虑这三款,性价比高配置强

    下一篇:百元蓝牙耳机的又一选择:长续航酷狗M3Pro能量圈颈带式耳机