JUC 入门篇


参考文献:https://www.bilibili.com/video/BV1Kw411Z7dF?share_source=copy_web ,编写工具:Typora0.9.98

1 JUC概述

  • **JUC:在 Java 5.0 提供了 java.util.concurrent (简称JUC )包,在此包中增加了在并发编程中很常用的实用工具类,用于定义类似于线程的自定义子系统,包括线程池、异步 IO 和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的 Collection 实现等;

  • 进程:进程是计算机分配资源的基本单位,它是一个具有独立功能的程序,例如QQ运行起来就是一个大的进程,这可以在任务管理器中看到;

  • 线程:线程是CPU调度的最小单位,一个进程里面可以有1个或多个线程,这些线程共享进程中的资源;

  • 线程状态

    public enum Thread.State {
        NEW, //新建
        RUNNABLE, //准备就绪
        BLOCKED, //阻塞
        WAITING, //不见不散
        TIMED_WAITING, //过时不候
        TERMINATED; //终结
    }
    
  • wait和sleep

    (1) sleep() 方法是线程类(Thread)的静态方法,让调用线程进入睡眠状态,让出执行机会给其他线程,等到休眠时间结束后,线程进入就绪状态和其他线程一起竞争cpu的执行时间。因为sleep() 是static静态的方法,他不能改变对象的机锁,当一个synchronized块中调用了sleep() 方法,线程虽然进入休眠,但是对象的机锁没有被释放,其他线程依然无法访问这个对象;

    (2) wait()是Object类的方法,当一个线程执行到wait方法时,它就进入到一个和该对象相关的等待池,同时释放对象的机锁,使得其他线程能够访问,可以通过notify,notifyAll方法来唤醒等待的线程;

    (3) 他们都可以被interrupted方法中断;

  • 并发和并行

    并发:同一时刻多个线程在访问同一资源。

    并行:仅仅是多项工作一起执行。

  • 管程

    Monitor(监视器)是一种同步机制,保证同一时间只有一个线程访问被保护的数据或代码。就是java平时所说的锁。

    JVM的同步操作基于进入和退出管程对象Monitor来实现的,进入加锁,退出解锁;

  • 用户线程和守护线程

    守护线程是一种特殊的线程,在后台默默地完成一些系统性的服务,比如垃圾回收线程、JIT线程都是守护线程。与之对应的是用户线程,用户线程可以理解为是系统的工作线程,它会完成这个程序需要完成的业务操作。如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了。所以当系统只剩下守护进程的时候,java虚拟机会自动退出。

    java线程分为用户线程和守护线程,线程的daemon属性为true表示是守护线程,false表示是用户线程。

    //主线程结束了,用户线程还在运行,jvm还处于存活状态
    public class DaemonTest {
        public static void main(String[] args) {
            Thread thread = new Thread(()->{
                System.out.println( Thread.currentThread().getName() +" is Daemon:"+ Thread.currentThread().isDaemon() );
                while (true) {}
            },"thread-a");
            thread.start();
            System.out.println( Thread.currentThread().getName() + "over!" );
        }
    }
    console输出如下:
    mainover!    #主线程结束了,用户线程还在运行,jvm还处于存活状态
    thread-a is Daemon:false
    
    //没有用户线程了,都是守护线程,jvm结束
    public class DaemonTest {
        public static void main(String[] args) {
            Thread thread = new Thread(()->{
                System.out.println( Thread.currentThread().getName() +" is Daemon:"+ Thread.currentThread().isDaemon() );
                while (true) {}
            },"thread-a");
            thread.setDaemon(true);
            thread.start();
            System.out.println( Thread.currentThread().getName() + "over!" );
        }
    }
    console输出如下:
    mainover!    #没有用户线程了,都是守护线程,jvm结束
    thread-a is Daemon:false
    

2 LOCK接口

2.1 synchronized关键字

synchronized 是 Java 中的关键字,是一种同步锁(对方法或者代码块中存在共享数据的操作),具体修饰的对象有3种方式

3 线程间通信

package study.juc.atguigu.jucstudy.sync;

/**
 * 线程通讯:多线程交替执行案例
 * 实现:两个线程交替+1/-1操作
 */
public class ThreadMessage {
    public static void main(String[] args) {
        Share share = new Share();
        new Thread( ()->{
            for (int i=0; i<10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "thread-A").start();
        new Thread( ()->{
            for (int i=0; i<10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "thread-B").start();
    }
}

//资源类:内部方法一个实现+1,一个实现-1。
class Share {
    private int number = 0;

    public synchronized void incr() throws InterruptedException {
        if (number!=0) {
            this.wait();
        }
        number++;
        System.out.println( Thread.currentThread().getName() +":"+ number );
        this.notifyAll(); //通知其他线程
    }

    public synchronized void decr() throws InterruptedException {
        if (number!=1) {
            this.wait();
        }
        number--;
        System.out.println( Thread.currentThread().getName() +":"+ number );
        this.notifyAll(); //通知其他线程
    }
}

3.2 虚假唤醒问题

借用上一案例的资源类,用4个线程同时处理“加一减一”操作,如下

/**
 * 案例:多线程 ”虚假唤醒问题“
 */
public class ThreadMessage2 {
    public static void main(String[] args) {
        Share share = new Share();
        new Thread( ()->{
            for (int i=0; i<10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "thread-A").start();
        new Thread( ()->{
            for (int i=0; i<10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "thread-B").start();
        new Thread( ()->{
            for (int i=0; i<10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "thread-C").start();
        new Thread( ()->{
            for (int i=0; i<10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "thread-D").start();
    }
}
//执行结果如下:并没有安装预期要求交替执行
thread-A:1 thread-B:0 thread-A:1 thread-B:0 thread-A:1 thread-B:0 thread-A:1 thread-B:0 thread-A:1 thread-B:0 thread-A:1 thread-B:0 thread-C:1 thread-B:0 thread-A:1 thread-B:0 thread-C:1 thread-B:0 thread-A:1 thread-B:0 thread-C:1 thread-A:2 thread-C:3 thread-A:4 thread-C:5

查看JDK文档,关于wait()的描述:

public final void wait() throws InterruptedException
在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,导致当前线程等待。换句话说,此方法的行为就好像它仅执行 wait(0) 调用一样。
当前线程必须拥有此对象监视器。该线程发布对此监视器的所有权并等待,直到其他线程通过调用 notify 方法,或 notifyAll 方法通知在此对象的监视器上等待的线程醒来。然后该线程将等到重新获得对监视器的所有权后才能继续执行。
对于某一个参数的版本,实现中断和虚假唤醒是可能的,而且此方法应始终在循环中使用:
synchronized (obj) {
while ()
obj.wait();
... // Perform action appropriate to condition
     }
此方法只应由作为此对象监视器的所有者的线程来调用。有关线程能够成为监视器所有者的方法的描述,请参阅 notify 方法。

wait方法有一个特点:在哪里睡就在哪里醒,在此案例中由于再次被唤醒后if判断便没生效,参照API所述,将if改为while即可解决此问题。

3.3 Lock接口实现线程间通讯

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

方法摘要 java.util.concurrent.locks.Condition接口
void await() 造成当前线程在接到信号或被中断之前一直处于等待状态。
boolean await(long time, TimeUnit unit) 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout) 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
void awaitUninterruptibly() 造成当前线程在接到信号之前一直处于等待状态。
boolean awaitUntil(Date deadline) 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
void signal() 唤醒一个等待线程。
void signalAll() 唤醒所有等待线程

案例如下:

package study.juc.atguigu.jucstudy.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Lock接口实现线程间通讯
 */
public class ThreadMessage3 {
    public static void main(String[] args) {
        Share share = new Share();
        new Thread(()->{
            for (int i=0; i<=10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread-A").start();
        new Thread(()->{
            for (int i=0; i<=10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread-B").start();
        new Thread(()->{
            for (int i=0; i<=10; i++) {
                try {
                    share.incr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread-C").start();
        new Thread(()->{
            for (int i=0; i<=10; i++) {
                try {
                    share.decr();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread-D").start();
    }
}

class Share {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void incr() throws InterruptedException {
        lock.lock();
        try {
            while (number!=0) {
                condition.await();
            }
            number++;
            System.out.println( Thread.currentThread().getName() +":"+ number );
            condition.signalAll();
        }finally {
            lock.unlock();
        }
    }

    public synchronized void decr() throws InterruptedException {
        lock.lock();
        try {
            if (number!=1) {
                condition.await();
            }
            number--;
            System.out.println( Thread.currentThread().getName() +":"+ number );
            condition.signalAll();
        }finally {
            lock.unlock();
        }
    }
}

4 线程间定制化通信

4.1

5 集合的线程安全

5.1 演示集合的线程不安全

/**
 * 演示集合的不安全
 */
public class CollectsUnsafe {
    public static void main(String[] args) {
        List list = new ArrayList<>();
        for (int i=0; i<10; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID());
                System.out.println( list );
            }, String.valueOf(i)).start();
        }
    }
}
输出如下:
[aaad43a3-b323-405a-8035-6c47f03fccd3, fd7c4489-0c10-47b5-8992-4fcffdad9760, 6cb1f773-bc91-4525-80f3-2b7b51da1b56]
[aaad43a3-b323-405a-8035-6c47f03fccd3, fd7c4489-0c10-47b5-8992-4fcffdad9760, 6cb1f773-bc91-4525-80f3-2b7b51da1b56, 1f60f147-64fa-4e6e-b28f-21fdc50ba3c8, b6f6fe40-bfe4-431b-8b16-7b8f4d79acd1]
[aaad43a3-b323-405a-8035-6c47f03fccd3, fd7c4489-0c10-47b5-8992-4fcffdad9760, 6cb1f773-bc91-4525-80f3-2b7b51da1b56, 1f60f147-64fa-4e6e-b28f-21fdc50ba3c8, b6f6fe40-bfe4-431b-8b16-7b8f4d79acd1, c0bc3f41-ef5c-4d97-94d3-e0afdb7396a3, 27d61ea7-81e8-45fa-9170-cb9e27f344e0, a6cfbdea-b67d-4744-9d81-a2744945e232, b9224f3c-7da9-4244-bd15-689c10ddef24, 0bd10943-b8b6-4a79-93de-35f7a518741a]
Exception in thread "1" Exception in thread "2" Exception in thread "5" Exception in thread "3" Exception in thread "6" Exception in thread "7" Exception in thread "9" java.util.ConcurrentModificationException
	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
	at java.util.ArrayList$Itr.next(ArrayList.java:859)
	at java.util.AbstractCollection.toString(AbstractCollection.java:461)
	at java.lang.String.valueOf(String.java:2994)
	at java.io.PrintStream.println(PrintStream.java:821)
	at study.juc.atguigu.jucstudy.threadsecurity.CollectsUnsafe.lambda$main$0(CollectsUnsafe.java:16)
	at java.lang.Thread.run(Thread.java:748)
java.util.ConcurrentModificationException
	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)

如上可知,https://blog.csdn.net/weixin_40807247/article/details/88413347

5.2 集合线程安全方案-Vector

如下为Vector部分源码,通过加 synchronized 来保证线程安全,不过这种方式比较古老,已不推荐使用。

/**
     * Appends the specified element to the end of this Vector.
     * @param e element to be appended to this Vector
     * @return {@code true} (as specified by {@link Collection#add})
     * @since 1.2
     */
public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}

5.3 集合线程安全方案-Collections

节选部分java.util.Collections的方法摘要如下,此方式使用也较少:

方法摘要
static Collection synchronizedCollection(Collection c) 返回指定 collection 支持的同步(线程安全的)collection
static List synchronizedList(List list) 返回指定列表支持的同步(线程安全的)列表
static Map synchronizedMap(Map m) 返回由指定映射支持的同步(线程安全的)映射
static Set synchronizedSet(Set s) 返回指定 set 支持的同步(线程安全的)set
static SortedMap synchronizedSortedMap(SortedMap m) 返回指定有序映射支持的同步(线程安全的)有序映射
static SortedSet synchronizedSortedSet(SortedSet s) 返回指定有序 set 支持的同步(线程安全的)有序 set

5.4 集合线程安全方案-CopyOnWriteArrayList(重点)

java.util.concurrent.CopyOnWriteArrayList 是推荐使用的方法,它使用了写时复制技术(计算机优化的一种通用思想)。

  • 写时复制技术:conpyOnWrite容器即写时复制容器.往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements, 然后往新的容器newElements里添加元素, 添加完元素后,再讲元容器的引用指向新的容器 setArray(newElements). 这样做的好处是可以对copyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素.所以copyOnWrite也是一种读写分离的思想,读和写不同的容器

    ? 简单来说,不同进程访问同一资源的时候,1. 查询都不需要加锁随便访问,应为没有锁,所以多个线程可以同时访问;2. 在写入/删除的时候加锁,会从原来的数据复制一个副本出来,然后修改这个副本,最后把原数据替换成当前的副本,改操作的同时,读操作不会被阻塞,而是继续 读取旧的数据。这点要跟区分一下。

    优点:对于一些 读多写少的数据,这种做法的确很不错,例如配置、黑名单、物流地址等变化非常少的数据,这是一种 无锁的实现。可以帮我们实现程序更高的并发。

    缺点:这种实现只是保证数据的最终一致性,在添加到拷贝数据而还没进行替换的时候,读到的仍然是旧数据。如果对象比较大,频繁地进行替换会消耗内存从而引发Java的GC问题,这个时候,我们应该考虑其他的容器,例如 ConcurrentHashMap

//节选 java.util.concurrent.CopyOnWriteArrayList 部分源码
public void add(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        if (index > len || index < 0)
            throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len);
        Object[] newElements;
        int numMoved = len - index;
        if (numMoved == 0)
            newElements = Arrays.copyOf(elements, len + 1);
        else {
            newElements = new Object[len + 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index, newElements, index + 1, numMoved);
        }
        newElements[index] = element;
        setArray(newElements);
    } finally {
        lock.unlock();
    }
}

6 多线程锁

6.1 synchronized锁的8种情况

7 Callable接口

  • 目前我们学习了两种创建线程的方法:继承Thread类,实现Runable接口。但Runable缺少一种功能:当线程结束(即run()完成时)无法使线程返回结果,基于此java提供了Callable接口。
  • Callable 可以返回结果并且可以抛出异常的任务。

8 JUC的辅助类

8.1 减少计数CountDownLatch

java.util.concurrent.CountDownLatch 是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

方法摘要
构造 CountDownLatch(int count) 构造一个用给定计数初始化的 CountDownLatch
void await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
boolean await(long timeout, TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
void countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
long getCount() 返回当前计数。
String toString() 返回标识此锁存器及其状态的字符串。

案例如下:

/**
 * 演示:CountDownLatch
 * 案例:10个同学陆续离开教室后班长负责最后锁门
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        /** 错误做法:此法 "main班长锁门了" 可能会在同学未全离开教室前执行
        for (int i=0; i<10; i++) {
            new Thread(()->{
                System.out.println( Thread.currentThread().getName()+"号同学离开教室了" );
            }, String.valueOf(i)).start();
        }
        System.out.println( Thread.currentThread().getName()+"班长锁门了" );
         **/
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i=0; i<10; i++) {
            new Thread(()->{
                System.out.println( Thread.currentThread().getName()+"号同学离开教室了" );
                countDownLatch.countDown(); //计数-1
            }, String.valueOf(i)).start();
        }
        countDownLatch.await(); //等待
        System.out.println( Thread.currentThread().getName()+"班长锁门了" );

    }
}

8.2 循环栅栏CyclicBarrier

java.util.concurrent.CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

方法摘要
构造 CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作
构造 CyclicBarrier(int parties, Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
int await() 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit) 在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting() 返回当前在屏障处等待的参与者数目。
int getParties() 返回要求启动此 barrier 的参与者数目。
boolean isBroken() 查询此屏障是否处于损坏状态。
void reset() 将屏障重置为其初始状态。

案例如下:

/**
 * 演示:CyclicBarrier
 * 案例:集齐7克龙珠召唤神龙
 */
public class CyclicBarrierDemo {
    private static final int number = 7;
    public static void main(String[] args) {
        //创建 CyclicBarrier
        CyclicBarrier cyclicBarrier = new CyclicBarrier(number, ()->{
            System.out.println( "******集齐7颗龙珠召唤神龙" );
        });
        //收集龙珠过程
        for (int i=1; i<=number; i++) {
            new Thread(()->{
                try {
                    System.out.println( Thread.currentThread().getName() +"星龙珠被收集到了" );
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

8.3 信号灯Semaphore

java.util.concurrent.Semaphore:一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

方法摘要
构造 Semaphore(int permits) 创建具有给定的许可数和非公平的公平设置的 Semaphore。
构造 Semaphore(int permits, boolean fair) 创建具有给定的许可数和给定的公平设置的 Semaphore。
void acquire() 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire(int permits) 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquireUninterruptibly() 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly(int permits) 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
int availablePermits() 返回此信号量中当前可用的许可数。
int drainPermits() 获取并返回立即可用的所有许可。
protected Collection getQueuedThreads() 返回一个 collection,包含可能等待获取的线程。
int getQueueLength() 返回正在等待获取的线程的估计数目。
boolean hasQueuedThreads() 查询是否有线程正在等待获取。
boolean isFair() 如果此信号量的公平设置为 true,则返回 true。
protected void reducePermits(int reduction) 根据指定的缩减量减小可用许可的数目。
void release() 释放一个许可,将其返回给信号量。
void release(int permits) 释放给定数目的许可,将其返回到信号量。
String toString() 返回标识此信号量的字符串,以及信号量的状态。
boolean tryAcquire() 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire(int permits) 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit) 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(long timeout, TimeUnit unit) 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。

案例如下:

/**
 * 演示:Semaphore
 * 案例:6辆车抢占3个车位
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        //创建Semaphore,设置许可数量
        Semaphore semaphore = new Semaphore(3);
        for (int i=1; i<=6; i++) {
            new Thread(()->{
                try {
                    //获取一个许可
                    semaphore.acquire();
                    System.out.println( Thread.currentThread().getName()+"抢到了车位" );
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //释放一个许可,将其返回给信号量
                    semaphore.release();
                    System.out.println( Thread.currentThread().getName()+"---离开了车库" );
                }

            }, String.valueOf(i)).start();
        }
    }
}

9 ReentrantReadWriteLock读写锁

9.1悲观锁和乐观锁

9.1.1悲观锁

  • 悲观锁(Pessimistic Lock):顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。
  • 悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作。
  • Java synchronized 就属于悲观锁的一种实现,每次线程要修改数据时都先获得锁,保证同一时刻只有一个线程能操作数据,其他线程则会被block。

9.1.2 乐观锁

  • 乐观锁(Optimistic Lock):顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在提交更新的时候会判断一下在此期间别人有没有去更新这个数据。乐观锁适用于读多写少的应用场景,这样可以提高吞吐量。

  • 乐观锁:假设不会发生并发冲突,只在提交操作时检查是否违反数据完整性。

  • 乐观锁一般来说有以下2种方式:

    1. 使用数据版本(Version)记录机制实现,这是乐观锁最常用的一种实现方式。何谓数据版本?即为数据增加一个版本标识,一般是通过为数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新,否则认为是过期数据。

    2. 使用时间戳(timestamp)。乐观锁定的第二种实现方式和第一种差不多,同样是在需要乐观锁控制的table中增加一个字段,名称无所谓,字段类型使用时间戳(timestamp), 和上面的version类似,也是在更新提交的时候检查当前数据库中数据的时间戳和自己更新前取到的时间戳进行对比,如果一致则OK,否则就是版本冲突。

9.1.3 乐观锁与悲观锁案例演示

//假设有如下一张商品表
create table tb_product(
	id int not null auto_increment primary key,
    stock int not null 
)ENGINE=InnoDB DEFAULT CHARSET=utf8

/**
* 更新库存(多线程并发的情况下可能会出现超卖)
*/
public boolean updateStockRaw(Long productId){
    ProductStock product = query("SELECT * FROM tb_product WHERE id=#{productId}", productId);
    if (product.getNumber() > 0) {
        int updateCnt = update("UPDATE tb_product SET stock=stock-1 WHERE id=#{productId}", productId);
        if(updateCnt > 0){ //更新库存成功
            return true;
        }
    }
    return false;
}

/**
* 更新库存(使用悲观锁)
*/
public boolean updateStock(Long productId){
    ProductStock product = query("SELECT * FROM tb_product WHERE id=#{productId} FOR UPDATE", productId); //先锁定商品库存记录
    if (product.getNumber() > 0) {
        int updateCnt = update("UPDATE tb_product SET stock=stock-1 WHERE id=#{productId}", productId);
        if(updateCnt > 0){ //更新库存成功
            return true;
        }
    }
    return false;
}

/**
* 下单减库存(使用乐观锁)
*/
public boolean updateStock(Long productId){
	int updateCnt = 0;
    while (updateCnt == 0) {
        ProductStock product = query("SELECT * FROM tb_product WHERE product_id=#{productId}", productId);
        if (product.getNumber() > 0) {
            updateCnt = update("UPDATE tb_product SET stock=stock-1 WHERE product_id=#{productId} AND number=#{number}", 
            			productId, product.getNumber());
            if(updateCnt > 0){ //更新库存成功
                return true;
            }
        }else{    
			return false; //卖完啦
        }
	}
    return false;
}

9.2 读写锁

10 BlockingQueue阻塞队列

11 ThreadPool线程池

11.1 线程池概述

线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在短时间任务时创建和销毁线程的代价。线程池不仅能保证内核的充分利用,还能防止过分调度。

线程池优势:

11.2 线程池架构

11.3 线程池使用方式

11.4 线程池底层原理

11.5 线程池的7个参数

11.6 线程池的底层工作流程

11.7 自定义线程池

12 Fork/Join分支合并框架

13 CompletableFuture异步回调

相关