4.线程间的通信


目录
  • 4.1 什么是等待通知机制
  • 4.2 等待/通知机制的实现
  • 4.3 wait()
    • wait(long)
    • wait与sleep区别
  • 4.4 notify()
    • interrupt()方法会中断 wait()
    • notifyAll()
  • 4.5 wait与notify总结
  • 4.6 通知过早
  • 4.7 多线程实现生产消费者模式(wait、notify实现)

4.1 什么是等待通知机制

在单线程编程中,要执行的操作需要满足一定的条件才能执行,可以把这个操作放在 if 语句块中。在多线程编程中,可能 A 线程的条件没有满足只是暂时的,稍后其他的线程 B 可能会更新条件使得 A 线程的条件得到满足。可以将 A 线程暂停,直到它的条件得到满足后再将 A 线程唤醒。

4.2 等待/通知机制的实现

Object 类中的 wait() 方法可以使执行当前代码的线程等待,暂停执行,直到接到通知或被中断为止。

注意:

wait()方法的调用必须放在synchronized方法或synchronized块中由锁对象调用

锁对象调用 wait() 方法,当前线程会释放锁

伪代码如下:

//在调用 wait()方法前获得对象的内部锁
synchronized( 锁对象 ) {
    while( 条件不成立 ) {
    	// 通过锁对象调用 wait() 方法暂停线程,会释放锁对象
        锁对象.wait();
    }
    // 线程的条件满足了继续向下执行
}

Object 类的 notify() 可以唤醒线程,该方法也必须在同步代码块中由 锁对象 调用;没有使用锁对象调用 wait()、notify() 会抛出java.lang.IllegalMonitorStateException 异常。如果有多个等待的线程,notify()方法只能唤醒其中的一个,在同步代码块中调用 notify()方法后,并不会立即释放锁对象,需要等当前同步代码块执行完后才会释放锁对象,一般将 notify()方法放在同步代码块的最后。

4.3 wait()

/**
 * 没有在同步代码块中执行(没有使用锁对象调用)抛IllegalMonitorStateException异常
 */
public class MyWaitTest1 {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("线程开始执行====");
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("线程结束执行====");
            }
        }).start();
    }
}

/**
 * 在同步代码块中锁对象调用wait()方法,线程处于等待状态 WAITING
 */
public class MyWaitTest2 {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (this) {
                    System.out.println("线程" + Thread.currentThread().getName() + "开始执行====");
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程结束执行====");
                }
            }
        }).start();
    }
}

import lombok.extern.slf4j.Slf4j;

/**
 * 
 * 程序目的:观察线程的WAITING状态
 * 模拟:只有一个售票窗口的售票厅,有两个粉丝都想买票。
 * 如果没有票,他们就继续等待、如果有票,则买票、然后离开售票厅。
 * 其中,工作人员会补票,补票之后,粉丝就可以买到票了。
 * 
* created at 2020-06-26 19:09 * * @author lerry */ @Slf4j public class ThreadWaitingStateDemo { public static void main(String[] args) throws InterruptedException { Ticket ticket = new Ticket(); Thread threadA = new Thread(() -> { synchronized (ticket) { while (ticket.getNum() == 0) { try { ticket.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } ticket.buy(); } }, "粉丝A"); Thread threadB = new Thread(() -> { synchronized (ticket) { while (ticket.getNum() == 0) { try { ticket.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } ticket.buy(); } }, "粉丝B"); threadA.start(); threadB.start(); // 确保A和B线程都运行起来 Thread.sleep(10); log.info("粉丝A线程的状态为:{}", threadA.getState()); log.info("粉丝B线程的状态为:{}", threadB.getState()); Thread employeeThread = new Thread(() -> { synchronized (ticket) { if (ticket.getNum() == 0) { ticket.addTickt(); ticket.notifyAll(); } } }, "补票员"); employeeThread.start(); } } @Slf4j class Ticket { /** * 票的张数 */ private int num = 0; public int getNum() { return num; } public void addTickt() { try { Thread.sleep(2_000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("补充票"); this.num += 2; } /** * 停顿10毫秒、模拟方法执行耗时 */ public void buy() { log.info("[{}]:购买了一张票", Thread.currentThread().getName()); log.info("[{}]:退出售票厅", Thread.currentThread().getName()); } }
2020-06-26 21:26:37.938 [main    ] INFO  com.hua.threadtest.state.ThreadWaitingStateDemo - 粉丝A线程的状态为:WAITING
2020-06-26 21:26:37.945 [main    ] INFO  com.hua.threadtest.state.ThreadWaitingStateDemo - 粉丝B线程的状态为:WAITING
2020-06-26 21:26:39.948 [补票员     ] INFO  com.hua.threadtest.state.Ticket - 补充票
2020-06-26 21:26:39.949 [粉丝B     ] INFO  com.hua.threadtest.state.Ticket - [粉丝B]:购买了一张票
2020-06-26 21:26:39.949 [粉丝B     ] INFO  com.hua.threadtest.state.Ticket - [粉丝B]:退出售票厅
2020-06-26 21:26:39.949 [粉丝A     ] INFO  com.hua.threadtest.state.Ticket - [粉丝A]:购买了一张票
2020-06-26 21:26:39.949 [粉丝A     ] INFO  com.hua.threadtest.state.Ticket - [粉丝A]:退出售票厅

当修改ticket.wait();ticket.wait(10);后,输出结果如下:

2020-06-26 21:27:10.704 [main    ] INFO  com.hua.threadtest.state.ThreadWaitingStateDemo - 粉丝A线程的状态为:TIMED_WAITING
2020-06-26 21:27:10.709 [main    ] INFO  com.hua.threadtest.state.ThreadWaitingStateDemo - 粉丝B线程的状态为:TIMED_WAITING
2020-06-26 21:27:12.714 [补票员     ] INFO  com.hua.threadtest.state.Ticket - 补充票
2020-06-26 21:27:12.714 [粉丝B     ] INFO  com.hua.threadtest.state.Ticket - [粉丝B]:购买了一张票
2020-06-26 21:27:12.714 [粉丝B     ] INFO  com.hua.threadtest.state.Ticket - [粉丝B]:退出售票厅
2020-06-26 21:27:12.715 [粉丝A     ] INFO  com.hua.threadtest.state.Ticket - [粉丝A]:购买了一张票
2020-06-26 21:27:12.715 [粉丝A     ] INFO  com.hua.threadtest.state.Ticket - [粉丝A]:退出售票厅

关于wait()放在while循环的疑问

为什么ticket.wait();要放在while (ticket.getNum() == 0)代码块中呢?既然这行代码时让线程等待着,那使用if不就行了?

我们设想一下,如果使用if,则在线程被唤醒后,会继续往下执行,不再判断条件是否符合,这时还是没有票,粉丝也就购买不到票了。

我们看一下Object.wait()的官方doc说明:

As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:
           synchronized (obj) {
               while ()
                   obj.wait();
               ... // Perform action appropriate to condition
           }

在一个参数版本中(wait方法),中断和虚假的唤醒是可能的,这个方法应该总是在循环中使用。

我们再继续看Object.wait(long timeout)的文档说明:

线程也可以在没有通知、中断或超时的情况下被唤醒,这就是所谓的假唤醒。虽然这种情况在实践中很少发生,但应用程序必须通过测试导致线程被唤醒的条件来防止这种情况发生,如果条件不满足,则继续等待。换句话说,等待应该总是在循环中发生

所以,为了避免很少发生的假唤醒出现时程序发生不可预知的错误,建议把wait()调用放在循环语句中。这样就算被假唤醒,也有条件语句的限制。

这也是为何wait要放在循环语句中的一个原因。

wait(long)

wait(long)带有 long 类型参数的 wait()等待,如果在参数指定的时间内没有被唤醒,超时后会自动唤醒。

public class Test07 {
    public static void main(String[] args) {
        final Object obj = new Object();
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (obj) {
                    try {
                        System.out.println("thread begin wait");
                        obj.wait(5000); //如果 5000 毫秒内没有被唤醒 ,会自动唤醒
                        System.out.println("end wait....");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        t.start();
    }
}

wait与sleep区别

1.对于sleep()方法,我们首先要知道该方法是属于Thread类中的。而wait()方法,则是属于Object类中的。

2.sleep()方法导致了程序暂停执行指定的时间,让出cpu给其他线程,但是他的监控状态依然保持着,当指定的时间到了又会自动恢复运行状态。

3.在调用sleep()方法的过程中,线程不会释放对象锁。

4.当调用wait()方法的时候,线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用notify()方法后,本线程才进入对象锁定池准备获取对象锁进入运行状态。

4.4 notify()

notify()方法后不会立即释放锁对象

interrupt()方法会中断 wait()

当线程处于 wait()等待状态时,调用线程对象的 interrupt()方法会中断线程的等待状态,会产生InterruptedException异常。

/**
 * Interrupt()会中断线程的 wait()等待
 */
public class Test05 {
    public static void main(String[] args) throws InterruptedException {
        SubThread t = new SubThread();
        t.start();
        Thread.sleep(2000); //主线程睡眠 2 秒, 确保子线程处于 Wait 等待状态
        t.interrupt();
    }

    private static final Object LOCK = new Object(); //定义常量作为锁对象

    static class SubThread extends Thread {
        @Override
        public void run() {
            synchronized (LOCK) {

                try {
                    System.out.println("begin wait...");
                    LOCK.wait();
                    System.out.println("end wait..");
                } catch (InterruptedException e) {
                    System.out.println("wait 等待被中断了****");
                    e.printStackTrace();
                }
            }
        }
    }
}

notifyAll()

notify()一次只能唤醒一个线程,如果有多个等待的线程,只能随机唤醒其中的某一个;想要唤醒所有等待线程,需要调用 notifyAll()。

/**
 * notify()与 notifyAll()
 */
public class Test06 {
    public static void main(String[] args) throws InterruptedException {
        //定义一个对象作为子线程的锁对象
        Object lock = new Object();

        SubThread t1 = new SubThread(lock);
        SubThread t2 = new SubThread(lock);
        SubThread t3 = new SubThread(lock);
        t1.setName("t1");
        t2.setName("t2");
        t3.setName("t3");
        t1.start();
        t2.start();
        t3.start();
        Thread.sleep(2000);
        //调用 notify()唤醒 子线程
        synchronized (lock) {
//            lock.notify();

            /*
             * 调用一次 notify()只能唤醒其中的一个线程,
             * 其他等待的线程依然处于等待状态,
             * 对于处于等待状态的线程来说,错过了通知信号,
             * 这种现象也称为信号丢失
             * */
            lock.notifyAll(); // 唤醒所有的线程
        }
    }

    static class SubThread extends Thread {
        // 定义实例变量作为锁对象
        private Object lock;

        public SubThread(Object lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    System.out.println(Thread.currentThread().getName() + " -- begin wait...");
                    lock.wait();
                    System.out.println(Thread.currentThread().getName() + " -- end wait...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4.5 wait与notify总结

为什么wait和notify方法放在Object?

因为wait和notify需要使用锁对象来调用,而任何对象都可以作为锁,所以放在Object类中.

1、wait()、notify/notifyAll() 方法是Object的本地final方法,无法被重写。

2、wait()使当前线程阻塞,前提是 必须先获得锁,一般配合synchronized 关键字使用,
即,一般在synchronized 同步代码块里使用 wait()、notify/notifyAll() 方法。

3、由于 wait()、notify/notifyAll() 在synchronized 代码块执行,说明当前线程一定是获取了锁的。
当线程执行wait()方法时候,会释放当前的锁,然后让出CPU,进入等待状态。只有当 notify/notifyAll() 被执行时候,才会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。也就是说,notify/notifyAll() 的执行只是唤醒沉睡的线程,而不会立即释放锁,锁的释放要看代码块的具体执行情况。所以在编程中,尽量在使用了notify/notifyAll() 后立即退出临界区,以唤醒其他线程让其获得锁

4、wait() 需要被try catch包围,以便发生异常中断也可以使wait等待的线程唤醒。

5、notify 和 wait 的顺序不能错,如果A线程先执行notify方法,B线程在执行wait方法,那么B线程是无法被唤醒的。

6、notify 和 notifyAll的区别

notify方法只唤醒一个等待(对象的)线程并使该线程开始执行。所以如果有多个线程等待一个对象,这个方法只会唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。

notifyAll 会唤醒所有等待(对象的)线程,尽管哪一个线程将会第一个处理取决于操作系统的实现。如果当前情况下有多个线程需要被唤醒,推荐使用notifyAll 方法。

4.6 通知过早

线程 wait()等待后,可以调用 notify()唤醒线程,如果 notify()唤醒的过早,在等待之前就调用了 notify()可能会打乱程序正常的运行逻辑。

/**
 * notify()通知过早
 */
public class Test08 {
    public static void main(String[] args) {
        final Object Lock = new Object(); //定义对象作为锁对象
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock) {
                    try {
                        System.out.println("begin wait");
                        Lock.wait();
                        System.out.println("wait end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock) {
                    System.out.println("begin notify");
                    Lock.notify();
                    ;
                    System.out.println("end nofity");
                }
            }
        });
        //如果先开启 t1,再开启 t2 线程,大多数情况下, t1 先等待,t1 再把 t1 唤醒
        // t1.start();
        // t2.start();
        //如果先开启 t2 通知线程,再开启 t1 等待线程,可能会出现 t1 线程等待没有收到通知的情况,
        t2.start();
        t1.start();
    }
}
/**
 * notify()通知过早, 就不让线程等待了
 */
public class Test09 {
    static boolean isFirst = true; //定义静态变量作为是否第一个运行的线程标志

    public static void main(String[] args) {
        final Object Lock = new Object(); //定义对象作为锁对象
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock) {
                    while (isFirst) { //当线程是第一个开启的线程就等待
                        try {
                            System.out.println("begin wait");
                            Lock.wait();
                            System.out.println("wait end...");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock) {
                    System.out.println("begin notify");
                    Lock.notify();
                    ;
                    System.out.println("end nofity");
                    isFirst = false; //通知后,就把第一个线程标志修改为 false
                }
            }
        });
        //如果先开启 t1,再开启 t2 线程,大多数情况下, t1 先等待,t1 再把 t1 唤醒
        // t1.start();
        // t2.start();
        //如果先开启 t2 通知线程,再开启 t1 等待线程,可能会出现 t1 线程等待没有收到通知的情况,
        t2.start();
        t1.start();
        //实际上,调用 start()就是告诉线程调度器,当前线程准备就绪,线程调度器在什么时候开启这
        // 个线程不确定,即调用 start()方法的顺序,并不一定就是线程实际开启的顺序.
        //在当前示例中,t1 等待后让 t2 线程唤醒 , 如果 t2 线程先唤醒了,就不让 t1 线程等待了
    }
}

4.7 多线程实现生产消费者模式(wait、notify实现)

生产消费模式概念

生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和消费者可以以不同的速度生产和消费数据。

生产者消费者模式的好处:
1. 它简化的开发,你可以独立地或并发的编写消费者和生产者,它仅仅只需知道共享对象是谁
2. 生产者不需要知道谁是消费者或者有多少消费者,对消费者来说也是一样
3. 生产者和消费者可以以不同的速度执行
4. 分离的消费者和生产者在功能上能写出更简洁、可读、易维护的代码

经典 生产消费者面试题 (wait、notify 实现)
* 生产线加工生产汽车
* 4S店 存放生产的汽车
* 消费者在4S店购买汽车

设计思路

代码实现

/**
    经典 生产消费者面试题 (wait、notify 实现)
 * 4S店存放生产的汽车
 * 生产者线程生产汽车
 * 消费者线程购买汽车
 *
 * 4S店最多只能存放4台车,如果达到4台车生产者停止生产
 * 如果4S店没有库存车,消费需等待有库存后,方可购买
 */
public class D2_线程通信_生产消费4S店案例 {
    static final int CAR_STORE_NUM = 4;
    static  int CAR_NUM = 1;
    public static void main(String[] args) {
        Car4S car4S = new Car4S();
        Thread product1 = new Thread(new ProductCar(car4S));
        Thread consumer1 = new Thread(new ConsumerCar(car4S));
        product1.start();
        consumer1.start();
    }
    // 生产者 生产汽车的任务
    static class ProductCar implements Runnable{
        Car4S car4S;
        public ProductCar(Car4S car4S) {//通过构造器传入4s店对象
            this.car4S = car4S;
        }
        @Override
        public void run() {
            while (true){
                synchronized (car4S){
                    if(car4S.cars.size()>=CAR_STORE_NUM){
                        System.out.println("当前库存已满,生产者休息");
                        try {
                            car4S.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }else {
                        Car car = new Car();
                        car.setName(CAR_NUM+"");
                        car4S.cars.push(car);
                        System.out.println("生产汽车==>"+car.getName());
                        CAR_NUM++;
                        car4S.notify();
                    }
                }
            }
        }
    }
    // 消费者 购买汽车的任务
    static class ConsumerCar implements Runnable{
        Car4S car4S;
        public ConsumerCar(Car4S car4S) {//通过构造器传入4s店对象
            this.car4S = car4S;
        }
        @Override
        public void run() {
            while (true){
                synchronized (car4S){
                    if(car4S.cars.size()==0){
                        System.out.println("当前4S店无库存,消费者休息");
                        try {
                            car4S.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }else {
                        Car poll = car4S.cars.poll();
                        System.out.println("购买到汽车==>"+poll.getName());
                        car4S.notify();
                    }
                }
            }
        }
    }
    static class Car4S {
        LinkedList cars = new LinkedList();
    }
    // 车实体类
    static class Car{
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    }
}