Java并发编程 - Phaser类的使用( 三 )


9、forceTermination():强制终止 , 此后Phaser对象将不可用 , 即register等将不再有效 。 此方法将会导致Queue中所有的waiter线程被唤醒 。
10、register():新注册一个party , 导致Phaser内部registerPaties数量加1;如果此时onAdvance方法正在执行 , 此方法将会等待它执行完毕后才会返回 。 此方法返回当前的phase周期数 , 如果Phaser已经中断 , 将会返回负数 。
11、bulkRegister(int parties):批量注册多个parties数组 , 规则同10、 。
12、getArrivedParties():获取已经到达的parties个数 。
13、getPhase():获取当前phase周期数 。 如果Phaser已经中断 , 则返回负值 。
14、getRegisteredParties():获取已经注册的parties个数 。
15、getUnarrivedParties():获取尚未到达的parties个数 。
16、onAdvance(int phase,int registeredParties):这个方法比较特殊 , 表示当进入下一个phase时可以进行的事件处理 , 如果返回true表示此Phaser应该终止(此后将会把Phaser的状态为termination , 即isTermination()将返回true 。 ) , 否则可以继续进行 。 phase参数表示当前周期数 , registeredParties表示当前已经注册的parties个数 。
默认实现为:return registeredParties == 0;在很多情况下 , 开发者可以通过重写此方法 , 来实现自定义的advance时间处理机制 。
protected boolean onAdvance(int phase, int registeredParties) {return registeredParties == 0;}内部原理 , 比较简单(简述):
1)两个计数器:分别表示parties个数和当前phase 。 register和deregister会触发parties变更(CAS) , 全部parties到达(arrive)会触发phase变更 。
2)一个主要的阻塞队列:非AQS实现 , 对于arriveAndWait的线程 , 会被添加到队列中并被park阻塞 , 直到当前phase中最后一个party到达后触发唤醒 。
四、父子关系【Java并发编程 - Phaser类的使用】Phaser通过status字段来实现同步逻辑 , status是一个64位的long变量 , 它有包含了四个维度的语义:
1、第0-15位 , 当前未到达的parties , 调用arriveXXX时 , 该值-1 , 调用register时+1;
2、第16-31位 , 当前总parties , 调用register时+1 , deRegister时-1;
3、第32-62位 , phase , 即Phaser的年龄 , 当未到达的parties减到0(即所有parties已到达)时 , phase自动加1 , 并且把16-31位的parties数复制到0-15位 , 从而该Phaser可以继续复用;
Java并发编程 - Phaser类的使用文章插图
当Phaser的parties数比较大的高并发场景下 , Phaser的status变量的竞争会非常激烈 , register、arrive等操作发起的CAS操作预测将会大概率失败导致大量CAS操作被重复调用 , 增加CPU开销 。 可以通过构造Phaser分层树的方式来分离竞争 , 子Phaser第一次register时 , 把该子Phaser注册到父Phaser , 当子Phaser所有parties都已经arrive时 , 把它从父Phaser中反注册 。
Java并发编程 - Phaser类的使用文章插图
当根Phaser的所有子Phaser的parties都已经arrive时 , 整个Phaser树升级phase递增 , 通过这种方式 , 所有的arrive、register操作在子Phaser进行就可以 , 根Phaser只需负责Phaser的升级 , 这样可以把部分对status的访问修改分离到子Phaser中 , 通过分散竞争点提高Phaser的吞吐量 。
Java并发编程 - Phaser类的使用文章插图
五、使用案例1、例子1(基本使用)
使用默认的onAdvance , 线程自己注册party和注销party , 只要registeredParties为0 , phaser将会终止 。
private static void test1() {Phaser phaser = new Phaser() {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println(Thread.currentThread().getName() + ": onAdvance , registeredParties=" + getRegisteredParties() + ", phase=" + getPhase() + ", isTerminated=" + isTerminated() );return super.onAdvance(phase, registeredParties);}};System.out.println(Thread.currentThread().getName() + ": 主线程开始执行异步任务 , registeredParties=" + phaser.getRegisteredParties() + ", phase=" + phaser.getPhase() + ", isTerminated=" + phaser.isTerminated());phaser.register();for (int i = 0; i < 5; i++) {phaser.register();System.out.println(Thread.currentThread().getName() + ": 注册一个屏障 , registeredParties=" + phaser.getRegisteredParties() + ", phase=" + phaser.getPhase() + ", isTerminated=" + phaser.isTerminated());int sleep = i;new Thread(() -> {try {TimeUnit.SECONDS.sleep(sleep);System.out.println(Thread.currentThread().getName() + ": 到达屏障 , 等待其他线程 , " + sleep + ", registeredParties=" + phaser.getRegisteredParties() + ", phase=" + phaser.getPhase() + ", isTerminated=" + phaser.isTerminated());phaser.arriveAndAwaitAdvance();TimeUnit.SECONDS.sleep(sleep);System.out.println(Thread.currentThread().getName() + ": 屏障打开 , 开始执行剩下任务 , " + sleep + ", registeredParties=" + phaser.getRegisteredParties() + ", phase=" + phaser.getPhase() + ", isTerminated=" + phaser.isTerminated());phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}},"thread-" + i).start();}phaser.arriveAndDeregister();System.out.println(Thread.currentThread().getName() + ": 主线程执行完毕 , registeredParties=" + phaser.getRegisteredParties() + ", phase=" + phaser.getPhase() + ", isTerminated=" + phaser.isTerminated() );}