java并发:并发控制机制之Semaphore


一、初识Semaphore

小结:

A、可以将信号量可视化为一个计数器,它可以递增或递减。

B、从概念上讲,信号量维护了一个许可集合,Semaphore对可用的许可进行计数。

C、当计数器的值为0时,它能够使线程等待。

D、Semaphore 的计数器是不可以自动重置的。

二、示例

The three steps you must follow when you use a semaphore to implement a critical section and protect the access to a shared resource:

  • First, you acquire the semaphore, with the acquire() method.
  • Then, you do the necessary operations with the shared resource.
  • Finally, release the semaphore with the release() method.

示例一

场景:

假设一个服务器资源有限,任意某一时刻只允许3个人同时访问,这时一共来了10个人

package com.test;

import java.util.concurrent.Semaphore;

public class SemaphoreDemo{
    
    public static void main(String args[]) throws Exception{
        
        final Semaphore semaphore = new Semaphore(3);//一次只允许3个人进行访问
        
        for(int i=0;i<10;i++) {
            final int no = i;

            Runnable thread = new Runnable() {
                public void run (){
                    try {
                        System.out.println("用户"+no+"连接上了:");
                        Thread.sleep(300L);
                        semaphore.acquire();//获取执行的许可
                        System.out.println("用户"+no+"开始访问后台程序...");
                        Thread.sleep(1000L);//模仿用户访问服务过程
                        semaphore.release();//释放,允许下一个线程访问后台
                        System.out.println("用户"+no+"访问结束。");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };

            new Thread(thread).start();
        }
        
        System.out.println("Main thread end!");
    }
}

上述代码运行结果如下:

用户1连接上了:
用户3连接上了:
用户4连接上了:
用户2连接上了:
用户0连接上了:
用户5连接上了:
用户7连接上了:
Main thread end!
用户6连接上了:
用户8连接上了:
用户9连接上了:
用户3开始访问后台程序...
用户4开始访问后台程序...
用户2开始访问后台程序...
用户4访问结束。
用户3访问结束。
用户7开始访问后台程序...
用户0开始访问后台程序...
用户8开始访问后台程序...
用户2访问结束。
用户5开始访问后台程序...
用户0访问结束。
用户7访问结束。
用户1开始访问后台程序...
用户8访问结束。
用户6开始访问后台程序...
用户1访问结束。
用户9开始访问后台程序...
用户5访问结束。
用户6访问结束。
用户9访问结束。

从结果上可以看出来,10个人同时进来,但是只能同时3个人访问资源,释放一个允许进来一个

Note:

When a thread has finished the use of the shared resource, it must release the semaphore so that the other threads can access the shared resource.

That operation increases the internal counter of the semaphore.

示例二

三、详解Semaphore

其类图如下:

解读:

由上图可知,Semaphore的内部类Sync继承了AbstractQueuedSynchronizer(进而可以Semaphore的底层是AQS),且Sync有两个子类NonfairSync和FairSync

Semaphore中相关方法如下图所示:

构造函数

对应定义如下:

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

解读:

仅指定permits的情况下,Semaphore默认采用非公平策略。

Sync、NonfairSync和FairSync

    /**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

解读:

通过NonfairSync和FairSync的定义可知,通过构造函数传递的信号量个数 permits被赋给了 AQS 的 state状态变量。

acquire方法

当前线程调用该方法的目的是希望获取一个信号量资源:

  • 如果当前信号量个数大于 0,则当前信号量的计数会减 1,然后该方法直接返回;
  • 如果当前信号量个数等于 0,则当前线程会被放入 AQS 的阻塞队列。

当其他线程调用了当前线程的 interrupt ()方法中断了当前线程时,则当前线程会抛出 InterruptedException 异常返回。

对应代码如下:

    /**
     * Acquires a permit from this semaphore, blocking until one is
     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * 

Acquires a permit, if one is available and returns immediately, * reducing the number of available permits by one. * *

If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * one of two things happens: *

    *
  • Some other thread invokes the {@link #release} method for this * semaphore and the current thread is next to be assigned a permit; or *
  • Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. *
* *

If the current thread: *

    *
  • has its interrupted status set on entry to this method; or *
  • is {@linkplain Thread#interrupt interrupted} while waiting * for a permit, *
* then {
@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }

解读:

上述方法调用了父类AbstractQueuedSynchronizer的acquireSharedInterruptibly方法,代码如下:

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

解读:

根据 AQS 的原理,子类需要实现tryAcquireShared方法,此处即NonfairSync和FairSync实现了tryAcquireShared方法(请查看本文前面相关类的定义)。

Note:

公平策略是查看当前线程节点的前驱节点是否在等待获取该资源,如果是则当前线程放弃争夺并被放入 AQS 阻塞队列,否则争夺资源。

release方法

release方法的作用是把当前 Semaphore对象的信号量值增加 1,如果当前有线程因为调用 aquire方法被阻塞而被放入了 AQS 的阻塞队列,则会根据策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。

对应代码如下:

    /**
     * Releases a permit, returning it to the semaphore.
     *
     * 

Releases a permit, increasing the number of available permits by * one. If any threads are trying to acquire a permit, then one is * selected and given the permit that was just released. That thread * is (re)enabled for thread scheduling purposes. * *

There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link #acquire}. * Correct usage of a semaphore is established by programming convention * in the application. */ public void release() { sync.releaseShared(1); }

解读:

上述方法调用了父类AbstractQueuedSynchronizer的releaseShared方法,代码如下:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

解读:

根据 AQS 的原理,子类需要实现tryReleaseShared方法,此处即Sync实现了tryReleaseShared方法(请查看本文前面相关类的定义)。

Note:

带参数的 release方法会在原来信号量值的基础上增加 permits。

四、参考资料

(1)https://howtodoinjava.com/java/multi-threading/binary-semaphore-tutorial-and-example/

(2)https://howtodoinjava.com/java/multi-threading/control-concurrent-access-to-multiple-copies-of-a-resource-using-semaphore/

相关