Java高并发,阻塞队列BlockingQueue


BlockingQueue简介

BlockingQueue:不是新的东西,父接口java.util.Queue

当队列为空时,从队列中获取元素将阻塞。
当队列为满时,从队列中添加元素将阻塞。
因为是队列,所以我们理应想到先进先出。

BlockingQueue常用API

boolean add(E e) 将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, true在成功后返回 IllegalStateException如果当前没有可用空间,则抛出IllegalStateException。
boolean contains(Object o) 如果此队列包含指定的元素,则返回 true
int drainTo(Collection<? super E> c) 从该队列中删除所有可用的元素,并将它们添加到给定的集合中。
int drainTo(Collection<? super E> c, int maxElements) 最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中。
boolean offer(E e) 将指定的元素插入到此队列中,如果可以立即执行此操作,而不会违反容量限制, true在成功时 false如果当前没有可用空间,则返回false。
boolean offer(E e, long timeout, TimeUnit unit) 将指定的元素插入到此队列中,等待指定的等待时间(如有必要)才能使空间变得可用。
E poll(long timeout, TimeUnit unit) 检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。
void put(E e) 将指定的元素插入到此队列中,等待空格可用。
int remainingCapacity() 返回该队列最好可以(在没有存储器或资源约束)接受而不会阻塞,或附加的元素的数量 Integer.MAX_VALUE如果没有固有的限制。
boolean remove(Object o) 从该队列中删除指定元素的单个实例(如果存在)。
E take() 检索并删除此队列的头,如有必要,等待元素可用。 
  • Methods inherited from interface java.util.Queue

    element, peek, poll, remove
  • Methods inherited from interface java.util.Collection

    addAll, clear, containsAll, equals, hashCode, isEmpty, iterator, parallelStream, removeAll, removeIf, retainAll, size, spliterator, stream, toArray, toArray
  • Methods inherited from interface java.lang.Iterable

    forEach

将上述API总结为三组:分别是插入、移除、检查

抛出异常组API

add(抛出异常):

/**
 * @author Cocowwy
 * @create 2020-05-05-14:53
 */
public class BlockingQueueDemo {
    public static void main(String[] args) {
    BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.add("x"));
    }
}

结果如下:

true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
    at java.util.AbstractQueue.add(AbstractQueue.java:98)
    at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
    at juc.BlockingQueueDemo.main(BlockingQueueDemo.java:16)

remove(抛出异常):

public class BlockingQueueDemo {
    public static void main(String[] args) {
    BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
//        System.out.println(blockingQueue.add("x"));
        System.out.println(blockingQueue.remove()); //返回值是E(即对象) 因为是队列 所以是a
        System.out.println(blockingQueue.remove()); 
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
    }
}

这里需要注意的是remove的返回类型是E(Entity对象)
结果如下:

true
true
true
a
b
c
Exception in thread "main" java.util.NoSuchElementException
    at java.util.AbstractQueue.remove(AbstractQueue.java:117)
    at juc.BlockingQueueDemo.main(BlockingQueueDemo.java:20)

Process finished with exit code 1

我们可以看到如果添加的元素大于3之后,即当队列空了之后再删会抛异常:java.util.NoSuchElementException

element(抛出异常):

public class BlockingQueueDemo {
    public static void main(String[] args) {
    BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.element());
    }
}

结果如下:

true
true
true
a

作用:检查队首元素是什么

特殊值组API

offer(特殊值):

public class BlockingQueueDemo {
    public static void main(String[] args) {
    BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("d"));
    }
}

结果如下:

true
true
true
false

poll(特殊值):

public class BlockingQueueDemo {
    public static void main(String[] args) {
    BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));

        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }
}

结果如下:

true
true
true
a
b
c
null

peek(特殊值):

public class BlockingQueueDemo {
    public static void main(String[] args) {
    BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));

        System.out.println(blockingQueue.peek());
    }
}

结果如下:

true
true
true
a

阻塞组API

put(阻塞):

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
    BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
    blockingQueue.put("a");
    blockingQueue.put("b");
    blockingQueue.put("c");
    blockingQueue.put("d");
    }
}

结果如下:

1.

就是啥都没有一片空白,因为此时一直阻塞,等待

take(阻塞):

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        System.out.println(blockingQueue.take());

        blockingQueue.put("e");
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
    }
}

结果如下:

a
b
c
e

与上面的put对比一下,这里abc添加进去后如果想加e是加不进去的,会阻塞,而take进行取出后就能加入e了

超时组API

offer(超时):

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("d", 3L, TimeUnit.SECONDS));
    }
}

结果如下:

true
true
true
false  //等待了3s

让其等待3s,返回值为boolean

poll(超时):

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll(3L, TimeUnit.SECONDS));
    }
}

结果如下:

true
true
true
a
b
c
null //等待了3s
JUC