并发控制工具相位器Phaser实现原理( 二 )

接着看相位器的register注册方法 , 核心是通过自旋来实现无锁更新state , 需要按照不同位范围进行状态更新 。 其中counts是state的低32位 , 然后通过偏移和掩码能够得到参与者数量和未到达数量 , 而state右移32位则得到当前阶段数 。 然后通过 counts != EMPTY 来判断相位器是否为空 , 为空时则设置参与者数量为1 , 然后通过compareAndSet设置state变量 , 成功则退出自旋 。 不为空时则分两种情况:一种是如果未到达数量等于0则调用internalAwaitAdvance方法等待下一个阶段 , 其中Thread.onSpinWait()是JVM提供的等待方法 。 第二种情况则将参与者数量和未到达数量都加1 , 然后通过compareAndSet方法设置state变量 。
public int register() {long adjust = ((long) 1 << PARTIES_SHIFT) | 1;int phase;for (;;) {int counts = (int) state;int parties = counts >>> PARTIES_SHIFT;int unarrived = countsphase = (int) (state >>> PHASE_SHIFT);if (counts != EMPTY) {if (unarrived == 0)this.internalAwaitAdvance(phase);else if (STATE.compareAndSet(this, state, state + adjust))break;} else {long next = ((long) phase << PHASE_SHIFT) | adjust;if (STATE.compareAndSet(this, state, next))break;}}return phase; } private int internalAwaitAdvance(int phase) {int p;while ((p = (int) (state >>> PHASE_SHIFT)) == phase)Thread.onSpinWait();return p; }继续看arriveAndAwaitAdvance方法 , 核心逻辑是通过自旋将未到达数量减一并等待其它参与者到来后一起进阶 。 通过偏移得到当前阶段数和未到达数 , 然后将未到达数减一并通过compareAndSet设置state 。 成功设置后判断如果未到达数量大于1则调用internalAwaitAdvance等待其它参与者一起进阶 , 然后调用onAdvance方法判断是否要终止相位器 , 记得前面我们的onAdvance例子吗?就是这里触发的 。 根据情况分别设置终止位和未到达状态 , 主要通过或运算 。 最后将当前阶段数进行加一 , 并尝试通过compareAndSet设置state , 只有一个线程能够设置成功 , 最终返回值为新的当前阶段数 。
public int arriveAndAwaitAdvance() {for (;;) {long s = state;int phase = (int) (s >>> PHASE_SHIFT);int counts = (int) s;int unarrived = (counts == EMPTY) ? 0 : (countsif (STATE.compareAndSet(this, s, s -= 1)) {if (unarrived > 1)return this.internalAwaitAdvance(phase);long n = sint nextUnarrived = (int) n >>> PARTIES_SHIFT;if (onAdvance(phase, nextUnarrived))n |= TERMINATION_BIT;else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;int nextPhase = (phase + 1)n |= (long) nextPhase << PHASE_SHIFT;if (!STATE.compareAndSet(this, s, n))return (int) (state >>> PHASE_SHIFT);return nextPhase;}} }arriveAndDeregister方法核心逻辑是通过自旋将未到达数量和参与者数量都减一 , 注意此方法不进行阻塞等待 。 通过偏移即掩码得到当前阶段数、未到达数 , 然后将state的参与者数量和未到达数量分别减一 , 并通过compareAndSet设置state 。 如果未到达数量等于1 , 则说明该线程是最后一个到达的参与者 , 此时会调用onAdvance方法判断是否要终止相位器 。 最后通过或运算设置终止位、未到达数、当前阶段数 , 并通过compareAndSet设置state 。
public int arriveAndDeregister() {for (;;) {long s = state;int phase = (int) (s >>> PHASE_SHIFT);if (phase < 0)return phase;int counts = (int) s;int unarrived = (counts == EMPTY) ? 0 : (countsif (STATE.compareAndSet(this, s, s -= (1 | 1 << PARTIES_SHIFT))) {if (unarrived == 1) {long n = sint nextUnarrived = (int) n >>> PARTIES_SHIFT;if (onAdvance(phase, nextUnarrived))n |= TERMINATION_BIT;else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;int nextPhase = (phase + 1)n |= (long) nextPhase