java并发:线程协作机制之CyclicBarrier


一、初识CyclicBarrier

二、示例

示例一

应用场景:

在某种需求中,比如一个大型的任务,常常需要分配很多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候就可以选择CyclicBarrier了。

示例:

package com.test;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo{
    
    public static void main(String args[]) throws Exception{
        
        CyclicBarrier barrier = new CyclicBarrier(3,new TotalTask());
        
        BillTask worker1 = new BillTask("111",barrier);
        BillTask worker2 = new BillTask("222",barrier);
        BillTask worker3 = new BillTask("333",barrier);
        worker1.start();
        worker2.start();
        worker3.start();
        System.out.println("Main thread end!");
    }
    
    static class TotalTask extends Thread {
        public void run() {
            System.out.println("所有子任务都执行完了,就开始执行主任务了。");
        }
    }
    
    static class BillTask extends Thread {
        private String billName;
        private CyclicBarrier barrier;
public BillTask(String workerName,CyclicBarrier barrier) { this.billName = workerName; this.barrier = barrier; }
@Override
public void run() { try { System.out.println("市区:"+billName +"运算开始:"); Thread.sleep(1000L);//模仿第一次运算; System.out.println("市区:"+billName +"运算完成,等待中..."); barrier.await();//假设一次运算不完,第二次要依赖第一次的运算结果。都到达这个节点之后后面才会继续执行; System.out.println("全部都结束,市区"+billName +"才开始后面的工作。"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }

上述程序运行结果如下:

市区:111运算开始:
市区:333运算开始:
Main thread end!
市区:222运算开始:
市区:333运算完成,等待中...
市区:222运算完成,等待中...
市区:111运算完成,等待中...
所有子任务都执行完了,就开始执行主任务了。//这句话是最后到达wait()方法的那个线程执行的
全部都结束,市区111才开始后面的工作。
全部都结束,市区222才开始后面的工作。
全部都结束,市区333才开始后面的工作。

解说:

A、在这个示例中,构造CyclicBarrier时,传入了内部类TotalTask(TotalTask继承了Thread,是Runnable的实现)的实例对象,其意义在于:当所有的线程都执行到wait()方法时,它们会一起返回继续自己的工作,但是最后一个到达wait()方法的线程会执行TotalTask的run()方法;

B、如果在构造CyclicBarrier时没有传入Runnable的实现对象作为构造参数,则当所有的线程都执行到wait()方法时会直接一起返回继续自己的工作。

示例二

此处展示另一个比较有意思的示例,即如何串行执行step1->step2->step3:

解读:

上述示例中的线程A以及其它线程在第一次调用await处相互等待,即当所有线程都执行该完step1后它们才开始执行step2,然后在第二次调用await处相互等待,然后再一起开始执行step3

三、详解CyclicBarrier

CyclicBarrier相关类图如下:

CyclicBarrier中相关方法如下:

解读:

从上图中lock的定义可知CyclicBanier基于独占锁来实现的(其本质是 AQS)。

Note:

在 Generation 中有一个变量 broken,其用来记录当前屏障是否被打破;这里的 broken 并没有被声明为 volatile 的(因为该变量在锁内使用)。 

构造函数

相关方法定义如下:

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

解读:

变量parties 用来记录总的线程个数,表示多少线程调用 await后,所有线程才会冲破屏障继续往下行。

变量count 的初始值等于 parties,每当有线程调用 await方法其值就递减 1,当 count为 0时表示所有线程都到了屏障点。

问题:为何维护 parties 和 count 两个变量,只使用 count可以吗?

CycleBarier是可以被复用的,即当 count 的值变为 0 后,会将 parties 的值赋给 count 从而进行复用。

await方法

相关方法定义如下:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

解读:

(1)index == 0则说明所有线程都到了屏障点,于是执行初始化时传递的任务,此后调用了nextGeneration重置CyclicBarrier并唤醒所有等待的线程

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

(2)index != 0,如果当前线程调用的是无参数的 await()方法,则这里 timed=false,所以当前线程会被放入条件变量trip 的条件阻塞队列并释放lock;如果调用的是有参数的 await 方法,则这里 timed=true,当前线程同样会被放入条件变量 trip 的条件阻塞队列并释放lock,不同的是当前线程会在指定时间超时后自动被激活。

Note:

当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的其它线程中会竞争lock,获得锁的线程会执行与第一个线程同样的操作,如此往复,直到最后一个线程获取到 lock,最终index == 0。

四、分析总结

(1)CyclicBarrier与CountDownLatch的区别

A、CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待;
B、CountDownLatch的计数器无法被重置;而CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

(1)https://www.baeldung.com/java-cyclic-barrier

相关