java并发:线程协作机制之CyclicBarrier
一、初识CyclicBarrier
二、示例
示例一
应用场景:
在某种需求中,比如一个大型的任务,常常需要分配很多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候就可以选择CyclicBarrier了。
示例:
package com.test; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo{ public static void main(String args[]) throws Exception{ CyclicBarrier barrier = new CyclicBarrier(3,new TotalTask()); BillTask worker1 = new BillTask("111",barrier); BillTask worker2 = new BillTask("222",barrier); BillTask worker3 = new BillTask("333",barrier); worker1.start(); worker2.start(); worker3.start(); System.out.println("Main thread end!"); } static class TotalTask extends Thread { public void run() { System.out.println("所有子任务都执行完了,就开始执行主任务了。"); } } static class BillTask extends Thread { private String billName; private CyclicBarrier barrier;
public BillTask(String workerName,CyclicBarrier barrier) { this.billName = workerName; this.barrier = barrier; }
@Override public void run() { try { System.out.println("市区:"+billName +"运算开始:"); Thread.sleep(1000L);//模仿第一次运算; System.out.println("市区:"+billName +"运算完成,等待中..."); barrier.await();//假设一次运算不完,第二次要依赖第一次的运算结果。都到达这个节点之后后面才会继续执行; System.out.println("全部都结束,市区"+billName +"才开始后面的工作。"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
上述程序运行结果如下:
市区:111运算开始: 市区:333运算开始: Main thread end! 市区:222运算开始: 市区:333运算完成,等待中... 市区:222运算完成,等待中... 市区:111运算完成,等待中... 所有子任务都执行完了,就开始执行主任务了。//这句话是最后到达wait()方法的那个线程执行的 全部都结束,市区111才开始后面的工作。 全部都结束,市区222才开始后面的工作。 全部都结束,市区333才开始后面的工作。
解说:
A、在这个示例中,构造CyclicBarrier时,传入了内部类TotalTask(TotalTask继承了Thread,是Runnable的实现)的实例对象,其意义在于:当所有的线程都执行到wait()方法时,它们会一起返回继续自己的工作,但是最后一个到达wait()方法的线程会执行TotalTask的run()方法;
B、如果在构造CyclicBarrier时没有传入Runnable的实现对象作为构造参数,则当所有的线程都执行到wait()方法时会直接一起返回继续自己的工作。
示例二
此处展示另一个比较有意思的示例,即如何串行执行step1->step2->step3:
解读:
上述示例中的线程A以及其它线程在第一次调用await处相互等待,即当所有线程都执行该完step1后它们才开始执行step2,然后在第二次调用await处相互等待,然后再一起开始执行step3
三、详解CyclicBarrier
CyclicBarrier相关类图如下:
CyclicBarrier中相关方法如下:
解读:
从上图中lock的定义可知CyclicBanier基于独占锁来实现的(其本质是 AQS)。
Note:
在 Generation 中有一个变量 broken,其用来记录当前屏障是否被打破;这里的 broken 并没有被声明为 volatile 的(因为该变量在锁内使用)。
构造函数
相关方法定义如下:
/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties) { this(parties, null); }
解读:
变量parties 用来记录总的线程个数,表示多少线程调用 await后,所有线程才会冲破屏障继续往下行。
变量count 的初始值等于 parties,每当有线程调用 await方法其值就递减 1,当 count为 0时表示所有线程都到了屏障点。
问题:为何维护 parties 和 count 两个变量,只使用 count可以吗?
CycleBarier是可以被复用的,即当 count 的值变为 0 后,会将 parties 的值赋给 count 从而进行复用。
await方法
相关方法定义如下:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
/** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
解读:
(1)index == 0则说明所有线程都到了屏障点,于是执行初始化时传递的任务,此后调用了nextGeneration重置CyclicBarrier并唤醒所有等待的线程
/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
(2)index != 0,如果当前线程调用的是无参数的 await()方法,则这里 timed=false,所以当前线程会被放入条件变量trip 的条件阻塞队列并释放lock;如果调用的是有参数的 await 方法,则这里 timed=true,当前线程同样会被放入条件变量 trip 的条件阻塞队列并释放lock,不同的是当前线程会在指定时间超时后自动被激活。
Note:
当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的其它线程中会竞争lock,获得锁的线程会执行与第一个线程同样的操作,如此往复,直到最后一个线程获取到 lock,最终index == 0。
四、分析总结
(1)CyclicBarrier与CountDownLatch的区别
A、CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待;
B、CountDownLatch的计数器无法被重置;而CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
(1)https://www.baeldung.com/java-cyclic-barrier