12.JUC并发编程中的并发工具类


目录
  • 1.等待多线程完成的CountDownLatch(倒计数闭锁)
    • 1.1.目标
    • 1.2.基本使用
      • 介绍
      • api介绍
      • CountDownLatch 的使用
      • 示例需求
    • 1.3.CountDownLatch原理深入
      • 目标
      • 原理介绍
      • 原理1:计数器的初始化
      • 原理2:countDown() 计数-1的实现
      • 原理3:await() 阻塞等待计数为0
      • 小结
    • 1.4.CountDownLatch 的使用场景
      • 实现最大的并行性
      • 开始执行前等待 N 个线程完成各自任务
    • 1.5.小结
  • 2.同步屏障CyclicBarrier
    • 目标
    • 基本使用
      • 介绍
      • 方法api
      • 示例代码
      • 示例效果
    • CyclicBarrier的应用场景
    • 小结
  • 3.控制并发线程数的Semaphore
    • 目标
    • 介绍
    • 应用场景
    • api方法
    • 示例代码
    • 示例效果
    • Semaphore的特点
    • 小结
  • 4.线程间交换数据的Exchanger
    • 目标
    • 介绍
    • 交换数据原理
      • api方法
    • 示例代码
    • 示例效果
    • 应用场景
    • 小结
  • 5.总结

1.等待多线程完成的CountDownLatch(倒计数闭锁)

1.1.目标

1.掌握CountDownLatch使用(实现等待其他线程处理完才继续运行当前线程)

2.理解CountDownLatch实现原理

3.CountDownLatch实现闭锁比join的优势是什么

4.CountDownLatch的应用场景

1.2.基本使用

介绍
CountDownLatch是一个同步辅助类,也叫倒计数闭锁,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数初始化 CountDownLatch。这个辅助类可以进行计算递减,所以在当前计数到达零之前,可以让现场一直受阻塞。到达0之后,会释放所有等待的线程,执行后续操作。

最常见的使用场景: 等待其他线程处理完才继续当前线程。

api介绍
CountDownLatch类内方法api 说明
CountDownLatch(int count) 创建CountDownLatch 实例并设置预定计数次数
void countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少 1
void await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。如果当前的计数为零,则此方法立即返回
CountDownLatch 的使用
CountDownLatch 是通过一个计数器来实现的,计数器的初始值为线程的数量。
每当一个线程完成了自己的任务后,计数器的值就会减 1。
当计数器值到达 0 时,表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

示例需求
假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,
等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join()方法

传统join阻塞案例代码

public class CountDownLatchTest1_Join {

    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(()->{
            System.out.println("parser1 finish");
        });
        Thread parser2 = new Thread(()->{
            System.out.println("parser2 finish");
        });
        parser1.start();
        parser2.start();
        parser1.join();  //join阻塞
        parser2.join();  //join阻塞
        System.out.println("join方式:all parser finish");
    }
}

join阻塞效果

分析

join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中,wait(0)表示永远等待下去,代码片段如下:

while (isAlive()) {
	wait(0);
}

直到join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM里实现的,所以在JDK里看不到,大家可以查看JVM源码。

使用CountDownLatch实现阻塞优化代码

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest2_CountDownLatch {

    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(()->{
            System.out.println("parser1 finish");
            countDownLatch.countDown();  //计算递减1
        });
        Thread parser2 = new Thread(()->{
            System.out.println("parser2 finish");
            countDownLatch.countDown(); //计算递减1
        });
        parser1.start();
        parser2.start();
        countDownLatch.await();  //阻塞,计算为0释放阻塞,运行后面的代码

        System.out.println("CountDownLatch方式:all parser finish");
    }
}

使用CountDownLatch实现阻塞效果

1.3.CountDownLatch原理深入

目标

掌握CountDownLatch实现原理

原理介绍

同ReentrantLock一样,依然是借助AQS的双端队列,来实现原子的计数-1,线程阻塞和唤醒

AbstractQueuedSynchronizer (简称AQS)

AQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题

await()方法使用AQS,一个FIFO的同步器队列表示排队等待锁的线程,队列头节点不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus

原理1:计数器的初始化

CountDownLatch内部实现了AQS,并覆盖了tryAcquireShared()tryReleaseShared()两个方法,下面说明干嘛用的

通过前面的使用,清楚了计数器的构造必须指定计数值,这个直接初始化了 AQS内部的state变量

Sync(int count) {
    setState(count);
}

后续的计数-1/判断是否可用都是基于state进行的

原理2:countDown() 计数-1的实现

源码

// 计数-1
public void countDown() {
    sync.releaseShared(1);
}


public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 首先尝试释放锁
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0) //如果计数已经为0,则返回失败
            return false;
        int nextc = c-1;
        // 原子操作实现计数-1
        if (compareAndSetState(c, nextc)) 
            return nextc == 0;
    }
}

// 唤醒被阻塞的线程
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 队列非空,表示有线程被阻塞
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { 
            // 头结点如果为SIGNAL,则唤醒头结点下个节点上关联的线程,并出队
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head) // 没有线程被阻塞,直接跳出
            break;
    }
}

上面截出计数减1的完整调用链

  1. 尝试释放锁tryReleaseShared,实现计数-1
  • 若计数已经小于0,则直接返回false
  • 否则执行计数(AQS的state)减一
  • 若减完之后,state==0,表示没有线程占用锁,即释放成功,然后就需要唤醒被阻塞的线程了
  1. 释放并唤醒阻塞线程 doReleaseShared
  • 如果队列为空,即表示没有线程被阻塞(也就是说没有线程调用了 CountDownLatch#wait()方法),直接退出
  • 头结点如果为SIGNAL, 则依次唤醒头结点下个节点上关联的线程,并出队
原理3:await() 阻塞等待计数为0

源码

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
    

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted()) // 若线程中端,直接抛异常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


// 计数为0时,表示获取锁成功
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// 阻塞,并入队
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); // 入队
    boolean failed = true;
    try {
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 获取锁成功,设置队列头为node节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) // 线程挂起
              && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed) {
            cancelAcquire(node);
        }
    }
}

阻塞的逻辑相对简单

  1. 判断state计数是否为0,为0,则直接放过执行后面的代码
  2. 大于0,则表示需要阻塞等待计数为0
  3. 当前线程封装Node对象,进入阻塞队列
  4. 然后就是循环尝试获取锁,直到成功(即state为0)后出队,继续执行线程后续代码
小结

CountDownLatch原理

  • 使用AQS的state的值作为计算器
  • countDown()方法,计算器递减的时候,是state进行更新递减,采用CAS更新,如果递减为0,就会使用unsafe的unpark唤醒同步器队列等待的线程运行
  • await()方法,计算器不为0,将当前线程加入同步器队里进行阻塞,当计数器为0的时候进行运行

1.4.CountDownLatch 的使用场景

实现最大的并行性
  • 想同时启动多个线程,实现最大程度的并行性。
    • 例如,想测试一个单例类。如果创建一个初始计数器为 1 的 CountDownLatch,并让其他所有线程都在这个锁上等待,只需要调用一次 countDown() 方法就可以让其他所有等待的线程同时恢复执行。
开始执行前等待 N 个线程完成各自任务
  • 例如应用程序启动类要确保在处理用户请求前,所有 N 个外部系统都已经启动和运行了。

1.5.小结

  1. 掌握使用CountDownLatch实现等待其他线程处理完才继续运行当前线程

  1. 理解CountDownLatch实现原理

    通过初始化一个计算器,利用递减数字的方式控制其他线程运行完成,计算器为0最后运行当前的线程

    • 使用AQS的state的值作为计算器
    • countDown()方法,计算器递减的时候,是state进行更新递减,采用CAS更新,如果递减为0,就会使用unsafe的unpark唤醒同步器队列等待的线程运行
    • await()方法,计算器不为0,将当前线程加入同步器队里进行阻塞,当计数器为0的时候进行运行

3.CountDownLatch实现闭锁比join的优势是什么

  • CountDownLatch更加灵活,join使用比较复杂

    调用join方法需要等待thread执行完毕才能继续向下执行;

    而CountDownLatch只需要检查计数器的值为零就可以继续向下执行,相比之下,CountDownLatch更加灵活一些,可以实现一些更加复杂的业务场景;

  • 使用线程池时,join() 方法无法使用,CountDownLatch 依然可以实现功能。

4.countDownLatch实践应用

  • CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,不能再次被使用。
  • CountDownLatch 类主要使用的场景有明显的顺序要求。
    比如只有等跑完步才能计算排名,只有等所有记录都写入才能进行统计工作等等,因此 CountDownLatch 完善的是某种逻辑上的功能,使得线程按照正确的逻辑进行。

5.countDownLatch应用场景

  • 实现最大的并行运行
  • 当前线程开始执行前等待 N 个线程完成各自任务

2.同步屏障CyclicBarrier

目标

  1. 掌握CyclicBarrier的使用
  2. CyclicBarrier 和 CountdownLatch 的区别
  3. 说说 CyclicBarrier 原理

基本使用

介绍

CyclicBarrier 是 JDK 1.5 的 java.util.concurrent 并发包中提供的一个并发工具类。

  • 所谓 Cyclic 即循环的意思,所谓 Barrier 即屏障的意思。

  • CyclicBarrier(可重用屏障/栅栏) 类似于 CountDownLatch,功能一样;

    都有让多个线程等待同步然后再开始下一步动作

  • CyclicBarrier 可以使一定数量的线程反复地在屏障位置处汇集。当线程到达屏障位置时将调用 await() 方法,这个方法将阻塞直到所有线程都到达屏障位置。
    如果所有线程都到达屏障位置,那么屏障将打开,此时所有的线程都将被释放,而屏障将被重置以便下次使用。

方法api
java.util.concurrent.CyclicBarrier
CyclicBarrier(int parties) 创建对象,参数表示屏障拦截的线程数量;初始化相互等待的线程数量
int await() 告诉CyclicBarrier自己已经到达了屏障,然后当前线程被阻塞
返回值int为达到屏障器的索引:
索引=未达到屏障线程数量-1,0表示最后一个达到屏障;
int getParties() 获取 CyclicBarrier 打开屏障的线程数量
void reset() 使 CyclicBarrier 回归初始状态,它做了两件事。
如果有正在等待的线程,则会抛出 BrokenBarrierException 异常,且这些线程停止等待,继续执行。
将是否破损标志位 broken 置为 false。
boolean isBroken() 获取是否破损标志位 broken 的值,此值有以下几种情况。1.CyclicBarrier 初始化时,broken=false,表示屏障未破损。
2.如果正在等待的线程被中断,则broken=true,表示屏障破损。
3.如果正在等待的线程超时,则broken=true,表示屏障破损。
4.如果有线程调用 CyclicBarrier.reset() 方法,则 broken=false,表示屏障回到未破损状态。
int getNumberWaiting() 获取达到屏障阻塞等待的线程数
CyclicBarrier(int parties,Runnable barrierAction) 用于所有线程到达屏障时,优先执行barrierAction的线程
示例代码
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);
    public static void main(String[] args) {
        new Thread(()->{
            try {
                System.out.println("达到屏障阻塞线程数:"+c.getNumberWaiting());
                c.await(); //达到屏障阻塞,+1
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("运行结束1"); //不会运行
        }).start();

        new Thread(()->{
            try {
                System.out.println("达到屏障阻塞线程数:"+c.getNumberWaiting());
                c.await();//达到屏障阻塞,+1
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("运行结束2"); //不会运行
        }).start();

        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("主线程完成,拦截线程数:"+c.getParties()+",达到屏障阻塞线程数:"+c.getNumberWaiting()); //会运行,没有到达屏障,不会阻塞
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
示例效果

因为主线程和2个子线程的调度是由CPU决定的,两个子线程都有可能先执行,所以会产生两种
输出,第一种可能输出如下

由于所有线程都达到屏障,所有阻塞线程被释放,所以阻塞线程为0

第二种可能输出如下:

如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

static CyclicBarrier c = new CyclicBarrier(3);

运行效果

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障恢复运行时,优先执行barrierAction,方便处理更复杂的业务场景

代码

//1.实例屏障工具类设置线程数量2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
    System.out.println("所有阻塞线程中优先执行的线程");//会运行在线程1与线程2运行结束之前
});

运行效果

CyclicBarrier的应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景

例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,
得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水

示例代码

import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

public class _04DemoTest implements Runnable {

    //1.创建同步屏障器,设置屏障线程数量为4,(4个表格sheet,每个线程统计一个sheet数据)
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this);

    //2.创建并发容器存储每个表格的日均流水
    private ConcurrentHashMap concurrentHashMap= new ConcurrentHashMap<>();

    //3.运行4个线程,分别处理每个sheet表格的日均流水,将结果放入并发容器
    public void count(){
        for (int i = 0; i < 4; i++) {
            new Thread(()->{
                //将当前解析表格sheet的日均流水写入容器中
                concurrentHashMap.put(Thread.currentThread().getName(),1);

                //设置当前线程到达屏障位置
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    //4.使用barrierAction进行合并计算,统计出所有账户的日均流水
    @Override
    public void run() {

        //定义统计结果变量
        int total = 0;

        //遍历容器每个元素进行累加日均流水
        for(Map.Entry map : concurrentHashMap.entrySet()){
            total += map.getValue();
        }

        //输出打印结果
        System.out.println("统计所有账户的日均流水:"+total);
    }

    public static void main(String[] args) {
        //是当前类对象
        _04DemoTest demoTest = new _04DemoTest();
        //运行当前对象的count方法
        demoTest.count();
    }
}

运行效果

小结

  1. CyclicBarrier 和 CountdownLatch 有什么区别?

    CountDownLatch和CyclicBarrier都有让多个线程等待同步然后再开始下一步动作的意思,但是

    • CountDownLatch的下一步的动作实施者是主线程,具有不可重复性;
    • 而CyclicBarrier的下一步动作实施者还是“其他线程”本身,具有往复多次实施动作的特点。
  2. CyclicBarrier的应用场景

    屏障/栅栏在涉及固定大小的线程数,多线程之间偶尔互相等待的程序中很有用

    进行多线程计算数据,最后合并计算

  3. 说说 CyclicBarrier 原理

    使用ReentrantLock重入锁

    使用Condition等待/通知的线程通信

3.控制并发线程数的Semaphore

目标

  1. 掌握Semaphore是什么
  2. 掌握Semaphore的使用

介绍

Semaphore(信号量)限制着访问某些资源的线程数量,在到达限制的线程数量之前,线程能够继续进行资源的访问,一旦访问资源的数量到达限制的线程数量,这个时候线程就不能够再去获取资源,只有等待有线程退出资源的获取。

应用场景

比如模拟一个停车场停车信号,假设停车场只有两个车位,一开始两个车位都是空的。这时同时来了两辆车,看门人允许它们进入停车场,然后放下车拦。
以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。

api方法

信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者让其运行。
但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

所以一个Semaphore 信号量有且仅有 3 种操作,且它们全部是原子的。

  • 初始化许可集、增加许可、获取许可。
  • 增加许可,release()方法释放一个阻塞,增加一个许可。
  • 获取许可,acquire()方法获取许可,再获取许可前处于阻塞等待。
方法名 说明
Semaphore(int permits) permits是允许同时运行的线程数目,创建指定数据线程的信号量
Semaphore(int permits, boolean fair) permits是允许同时运行的线程数目,创建指定数据线程的信号量;fair指定是公平模式还是非公平模式,默认非公平模式
void acquire() 方法阻塞,直到申请获取到许可证才可以运行当前线程
void release() 释放当前线程一个阻塞的 acquire() 方法,方法增加一个许可证,这可能会
intavailablePermits() 返回此信号量中当前可用的许可证数
intgetQueueLength() 返回正在等待获取许可证的线程数
booleanhasQueuedThreads() 是否有线程正在等待获取许可证
void reducePermits(int reduction) 减少reduction个许可证,是个protected方法。
Collection getQueuedThreads() 返回所有等待获取许可证的线程集合,是个protected方

示例代码

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class _05DemoTest {

    public static void main(String[] args) {

        //1.实例并发线程数量控制工具类信号量,设置许可数5个(同时运行5个线程)
        Semaphore semaphore = new Semaphore(5);

        //2.循环运行15个线程(会看到每次只允许5个线程)
        for (int i = 0; i < 15; i++) {
            new Thread(()->{

                try {
                    //2.1 申请获取许可
                    semaphore.acquire();

                    //2.2 运行业务
                    System.out.println(Thread.currentThread().getName()+"车,进入停车场");
                    TimeUnit.SECONDS.sleep(3);//让当前线程休眠(让线程多运行一会,方便观察效果)
                    System.out.println(Thread.currentThread().getName()+"车,离开停车场");

                    //2.3 释放阻塞,增加一个许可(让下一个阻塞的线程运行)
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }

            }).start();
        }
    }
}

示例效果

Semaphore的特点

Semaphore 在计数器不为 0 的时候对线程就放行,一旦达到 0,那么所有请求资源的新线程都会被阻塞,包括增加请求到许可的线程,Semaphore 是不可重入的。

  • 每一次请求一个许可都会导致计数器减少 1,同样每次释放一个许可都会导致计数器增加 1,一旦达到 0,新的许可请求线程将被挂起。

Semaphore 有两种模式,公平模式非公平模式(默认使用)

  • 公平模式就是调用 acquire 的顺序就是获取许可证的顺序,遵循 FIFO。

    Semaphore semaphore = new Semaphore(许可数, true);//公平模式
    
  • 非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。

    Semaphore semaphore = new Semaphore(许可数, false);//非公平模式,不设置默认也是使用非公平模式
    
  • Semaphore 是 JUC 包提供的一个典型的共享锁控制并发线程数

小结

  1. Semaphore 是什么?

    Semaphore(信号量)限制着访问某些资源的线程数量,在到达限制的线程数量之前,线程能够继续进行资源的访问,一旦访问资源的数量到达限制的线程数量,这个时候线程就不能够再去获取资源,只有等待有线程退出资源的获取。

  2. Semaphore 的使用

    使用3个操作:

    • 初始化许可集
    • 增加许可,release()方法释放一个阻塞,增加一个许可。
    • 获取许可,acquire()方法获取许可,再获取许可前处于阻塞等待。

4.线程间交换数据的Exchanger

目标

  1. 掌握Exchanger的使用
  2. 理解exchanger的应用场景
  3. 理解Exchanger的实现原理

介绍

Exchanger(交换者)是一个用于线程间协作的工具类,可以用于进行线程间的数据交换。

交换数据原理

  1. Exchanger提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
  2. 两个线程通过 exchange() 方法交换数据
  3. 第一个线程先执行 exchange() 方法,会一直等待第二个线程也执行exchange()到达同步点时,两个线程交换数据,将本线程生产出来的数据传递给对方。
  4. 使用 Exchanger 的重点是成对的线程使用 exchange() 方法。

api方法
Exchanger类方法
V exchange(V x) 用于进行线程间的数据交换
V exchange(V x, long timeout, TimeUnit unit) 设置交换数据并等待超时时间

示例代码

import java.util.concurrent.Exchanger;

public class _06DemoTest {

    public static void main(String[] args) {


        //1.定义交换数据工具类,并设置传输数据的类型
        Exchanger exchanger = new Exchanger<>();

        //2.启动2个线程进行交换数据
        //线程1
        new Thread(()->{

            //2.1 定义交换的数据
            String a = "鸡腿";
            System.out.println(Thread.currentThread().getName()+"说:我买了"+a);
            System.out.println(Thread.currentThread().getName()+"说:等待线程2交换数据");

            //2.2 将数据交换给线程2
            try {
                String b = exchanger.exchange(a);//注意:如果线程2没有到达同步点,当前线程会被阻塞一直等到
                //成功获取线程2的数据后
                System.out.println(Thread.currentThread().getName()+"说:我拿到了"+b);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        //线程2
        new Thread(()->{

            //2.3 定义交换的数据
            String b = "汉堡";
            System.out.println(Thread.currentThread().getName()+"说:我买了"+b);
            System.out.println(Thread.currentThread().getName()+"说:等待线程1交换数据");

            //2.4 将数据交换给线程1并拿到线程1的数据
            try {
                String a = exchanger.exchange(b);
                //成功获取线程1的数据后
                System.out.println(Thread.currentThread().getName()+"说:我拿到了"+a);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();
    }
}

示例效果

注意

如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发
生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

应用场景

  • 场景1:遗传算法

    Exchanger可以用于遗传算法,遗传算法需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。

  • 场景2:校对数据

    Exchanger也可以用于校对数据工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致

小结

  1. 说说 Exchanger 原理

    简单原理:

    ? Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据

    深入源码分析原理:

    ? 使用unsafe.park()/unsafe.unpark()实现线程挂起与唤醒交互数据线程

    ? 使用ThreadLocal存储线程数据,Node用于存储线程信息与交换数据

    ? 单组成对两个线程情况下使用单槽位交换,以及多组成对线程情况下使用多槽位交换;

    ? Exchanger类成员

    ? 执行逻辑

5.总结

并发工具类 应用场景
CountDownLatch 1.实现最大的并行性
2.开始执行前等待 N 个线程完成各自任务
CyclicBarrier CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景
Semaphore Semaphore(信号量)限制着访问某些资源的线程数量,比如停车场车位控制
Exchanger 1.遗传算法
2.数据校对