并发编程之阻塞队列
一、阻塞队列介绍1 队列是限定在一端进行插入另一端进行删除的特殊线性表。先进先出(FIFO)线性表。允许出队的一端称为队头允许入队的一端称为队尾。数据结构演示网站https://www.cs.usfca.edu/~galles/visualization/Algorithms.htmlQueue接口public interface QueueE extends CollectionE { //添加一个元素添加成功返回true, 如果队列满了就会抛出异常 boolean add(E e); //添加一个元素添加成功返回true, 如果队列满了返回false boolean offer(E e); //返回并删除队首元素队列为空则抛出异常 E remove(); //返回并删除队首元素队列为空则返回null E poll(); //返回队首元素但不移除队列为空则抛出异常 E element(); //获取队首元素但不移除队列为空则返回null E peek(); }2 阻塞队列阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构BlockingQueue提供了线程安全的队列访问方式当阻塞队列插入数据时如果队列已满线程将会阻塞等待直到队列非满从阻塞队列取数据时如果队列已空线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。2.1 BlockingQueue接口方法抛出异常返回特定值阻塞阻塞特定时间入队add(e)offer(e)put(e)offer(e, time, unit)出队remove()poll()take()poll(time, unit)获取队首元素element()peek()不支持不支持2.2 应用场景阻塞队列在实际应用中有很多场景以下是一些常见的应用场景线程池线程池中的任务队列通常是一个阻塞队列。当任务数超过线程池的容量时新提交的任务将被放入任务队列中等待执行。线程池中的工作线程从任务队列中取出任务进行处理如果队列为空则工作线程会被阻塞直到队列中有新的任务被提交。生产者-消费者模型在生产者-消费者模型中生产者向队列中添加元素消费者从队列中取出元素进行处理。阻塞队列可以很好地解决生产者和消费者之间的并发问题避免线程间的竞争和冲突。消息队列消息队列使用阻塞队列来存储消息生产者将消息放入队列中消费者从队列中取出消息进行处理。消息队列可以实现异步通信提高系统的吞吐量和响应性能同时还可以将不同的组件解耦提高系统的可维护性和可扩展性。缓存系统缓存系统使用阻塞队列来存储缓存数据当缓存数据被更新时它会被放入队列中其他线程可以从队列中取出最新的数据进行使用。使用阻塞队列可以避免并发更新缓存数据时的竞争和冲突。并发任务处理在并发任务处理中可以将待处理的任务放入阻塞队列中多个工作线程可以从队列中取出任务进行处理。使用阻塞队列可以避免多个线程同时处理同一个任务的问题并且可以将任务的提交和执行解耦提高系统的可维护性和可扩展性。总之阻塞队列在实际应用中有很多场景它可以帮助我们解决并发问题提高程序的性能和可靠性。3 JUC包下的阻塞队列BlockingQueue 接口的实现类都被放在了 juc 包中它们的区别主要体现在存储结构上或对元素操作上的不同但是对于take与put操作的原理却是类似的。队列描述ArrayBlockingQueue基于数组结构实现的一个有界阻塞队列LinkedBlockingQueue基于链表结构实现的一个无界阻塞队列指定容量为有界阻塞队列PriorityBlockingQueue支持按优先级排序的无界阻塞队列DelayQueue基于优先级队列PriorityBlockingQueue实现的无界阻塞队列SynchronousQueue不存储元素的阻塞队列LinkedTransferQueue基于链表结构实现的一个无界阻塞队列LinkedBlockingDeque基于链表结构实现的一个双端阻塞队列技术栈二、ArrayBlockingQueueArrayBlockingQueue是最典型的有界阻塞队列其内部是用数组存储元素的初始化时需要指定容量大小利用 ReentrantLock 实现线程安全。ArrayBlockingQueue可以用于实现数据缓存、限流、生产者-消费者模式等各种应用。在生产者-消费者模型中使用时如果生产速度和消费速度基本匹配的情况下使用ArrayBlockingQueue是个不错选择当如果生产速度远远大于消费速度则会导致队列填满大量生产线程被阻塞。1 ArrayBlockingQueue使用BlockingQueue queue new ArrayBlockingQueue(1024); queue.put(1); //向队列中添加元素 Object object queue.take(); //从队列中取出元素2 ArrayBlockingQueue的原理ArrayBlockingQueue使用独占锁ReentrantLock实现线程安全入队和出队操作使用同一个锁对象也就是只能有一个线程可以进行入队或者出队操作这也就意味着生产者和消费者无法并行操作在高并发场景下会成为性能瓶颈。2.1 数据结构利用了Lock锁的Condition通知机制进行阻塞控制。2.2 核心一把锁两个条件//数据元素数组 final Object[] items; //下一个待取出元素索引 int takeIndex; //下一个待添加元素索引 int putIndex; //元素个数 int count; //内部锁 final ReentrantLock lock; //消费者 private final Condition notEmpty; //生产者 private final Condition notFull; public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { ... lock new ReentrantLock(fair); //公平非公平 notEmpty lock.newCondition(); notFull lock.newCondition(); }2.3 入队put方法public void put(E e) throws InterruptedException { //检查是否为空 checkNotNull(e); final ReentrantLock lock this.lock; //加锁如果线程中断抛出异常 lock.lockInterruptibly(); try { //阻塞队列已满则将生产者挂起等待消费者唤醒 //设计注意点 用while不用if是为了防止虚假唤醒 while (count items.length) notFull.await(); //队列满了使用notFull等待生产者阻塞 // 入队 enqueue(e); } finally { lock.unlock(); // 唤醒消费者线程 } } private void enqueue(E x) { final Object[] items this.items; //入队 使用的putIndex items[putIndex] x; if (putIndex items.length) putIndex 0; //设计的精髓 环形数组putIndex指针到数组尽头了返回头部 count; //notEmpty条件队列转同步队列准备唤醒消费者线程因为入队了一个元素肯定不为空了 notEmpty.signal(); }2.4 为什么ArrayBlockingQueue对数组操作要设计成双指针使用双指针的好处在于可以避免数组的复制操作。如果使用单指针每次删除元素时需要将后面的元素全部向前移动这样会导致时间复杂度为 O(n)。而使用双指针我们可以直接将 takeIndex 指向下一个元素而不需要将其前面的元素全部向前移动。同样地插入新的元素时我们可以直接将新元素插入到 putIndex 所指向的位置而不需要将其后面的元素全部向后移动。这样可以使得插入和删除的时间复杂度都是 O(1) 级别提高了队列的性能。2.5 出队take方法public E take() throws InterruptedException { final ReentrantLock lock this.lock; //加锁如果线程中断抛出异常 lock.lockInterruptibly(); try { //如果队列为空则消费者挂起 while (count 0) notEmpty.await(); //出队 return dequeue(); } finally { lock.unlock();// 唤醒生产者线程 } } private E dequeue() { final Object[] items this.items; SuppressWarnings(unchecked) E x (E) items[takeIndex]; //取出takeIndex位置的元素 items[takeIndex] null; if (takeIndex items.length) takeIndex 0; //设计的精髓 环形数组takeIndex 指针到数组尽头了返回头部 count--; if (itrs ! null) itrs.elementDequeued(); //notFull条件队列转同步队列准备唤醒生产者线程此时队列有空位 notFull.signal(); return x; }三、LinkedBlockingQueueLinkedBlockingQueue是一个基于链表实现的阻塞队列默认情况下该阻塞队列的大小为Integer.MAX_VALUE由于这个数值特别大所以LinkedBlockingQueue 也被称作无界队列代表它几乎没有界限队列可以随着元素的添加而动态增长但是如果没有剩余内存则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现在使用的时候建议手动传一个队列的大小。1 LinkedBlockingQueue使用//指定队列的大小创建有界队列 BlockingQueueInteger boundedQueue new LinkedBlockingQueue(100); //无界队列 BlockingQueueInteger unboundedQueue new LinkedBlockingQueue();2 LinkedBlockingQueue原理LinkedBlockingQueue内部由单链表实现只能从head取元素从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞添加元素和获取元素都有独立的锁也就是说LinkedBlockingQueue是读写分离的读写操作可以并行执行。2.1 数据结构// 容量,指定容量就是有界队列 private final int capacity; // 元素数量 private final AtomicInteger count new AtomicInteger(); // 链表头 本身是不存储任何元素的初始化时item指向null transient NodeE head; // 链表尾 private transient NodeE last; // take锁 锁分离提高效率 private final ReentrantLock takeLock new ReentrantLock(); // notEmpty条件 // 当队列无元素时take锁会阻塞在notEmpty条件上等待其它线程唤醒 private final Condition notEmpty takeLock.newCondition(); // put锁 private final ReentrantLock putLock new ReentrantLock(); // notFull条件 // 当队列满了时put锁会会阻塞在notFull上等待其它线程唤醒 private final Condition notFull putLock.newCondition(); //典型的单链表结构 static class NodeE { E item; //存储元素 NodeE next; //后继节点 单链表结构 Node(E x) { item x; } }2.2 构造器public LinkedBlockingQueue() { // 如果没传容量就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity 0) throw new IllegalArgumentException(); this.capacity capacity; // 初始化head和last指针为空值节点 last head new NodeE(null); }2.3 入队put方法public void put(E e) throws InterruptedException { // 不允许null元素 if (e null) throw new NullPointerException(); int c -1; // 新建一个节点 NodeE node new NodeE(e); final ReentrantLock putLock this.putLock; final AtomicInteger count this.count; // 使用put锁加锁 putLock.lockInterruptibly(); try { // 如果队列满了就阻塞在notFull上等待被其它线程唤醒阻塞生产者线程 while (count.get() capacity) { notFull.await(); } // 队列不满就入队 enqueue(node); c count.getAndIncrement();// 队列长度加1返回原值 // 如果现队列长度小于容量notFull条件队列转同步队列准备唤醒一个阻塞在notFull条件上的线程(可以继续入队) // 这里为啥要唤醒一下呢 // 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒 if (c 1 capacity) notFull.signal(); } finally { putLock.unlock(); // 真正唤醒生产者线程 } // 如果原队列长度为0现在加了一个元素后立即唤醒阻塞在notEmpty上的线程 if (c 0) signalNotEmpty(); } private void enqueue(NodeE node) { // 直接加到last后面,last指向入队元素 last last.next node; } private void signalNotEmpty() { final ReentrantLock takeLock this.takeLock; takeLock.lock();// 加take锁 try { notEmpty.signal();// notEmpty条件队列转同步队列准备唤醒阻塞在notEmpty上的线程 } finally { takeLock.unlock(); // 真正唤醒消费者线程 } }3.4 出队take方法public E take() throws InterruptedException { E x; int c -1; final AtomicInteger count this.count; final ReentrantLock takeLock this.takeLock; // 使用takeLock加锁 takeLock.lockInterruptibly(); try { // 如果队列无元素则阻塞在notEmpty条件上消费者线程阻塞 while (count.get() 0) { notEmpty.await(); } // 否则出队 x dequeue(); c count.getAndDecrement();//长度-1返回原值 if (c 1)// 如果取之前队列长度大于1notEmpty条件队列转同步队列准备唤醒阻塞在notEmpty上的线程原因与入队同理 notEmpty.signal(); } finally { takeLock.unlock(); // 真正唤醒消费者线程 } // 为什么队列是满的才唤醒阻塞在notFull上的线程呢 // 因为唤醒是需要加putLock的这是为了减少锁的次数,所以这里索性在放完元素就检测一下未满就唤醒其它notFull上的线程, // 这也是锁分离带来的代价 // 如果取之前队列长度等于容量已满则唤醒阻塞在notFull的线程 if (c capacity) signalNotFull(); return x; } private E dequeue() { // head节点本身是不存储任何元素的 // 这里把head删除并把head下一个节点作为新的值 // 并把其值置空返回原来的值 NodeE h head; NodeE first h.next; h.next h; // 方便GC head first; E x first.item; first.item null; return x; } private void signalNotFull() { final ReentrantLock putLock this.putLock; putLock.lock(); try { notFull.signal();// notFull条件队列转同步队列准备唤醒阻塞在notFull上的线程 } finally { putLock.unlock(); // 解锁这才会真正的唤醒生产者线程 } }3 与ArrayBlockingQueue对比LinkedBlockingQueue是一个阻塞队列内部由两个ReentrantLock来实现出入队列的线程安全由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于队列大小有所不同ArrayBlockingQueue是有界的初始化必须指定大小而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE)对于后者而言当添加速度大于移除速度时在无界的情况下可能会造成内存溢出等问题。数据存储容器不同ArrayBlockingQueue采用的是数组作为数据存储容器而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。由于ArrayBlockingQueue采用的是数组的存储容器因此在插入或删除元素时不会产生或销毁任何额外的对象实例而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时对于GC可能存在较大影响。两者的实现队列添加或移除的锁不一样ArrayBlockingQueue实现的队列中的锁是没有分离的即添加操作和移除操作采用的同一个ReenterLock锁而LinkedBlockingQueue实现的队列中的锁是分离的其添加采用的是putLock移除采用的则是takeLock这样能大大提高队列的吞吐量也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据以此来提高整个队列的并发性能。四、SynchronousQueueSynchronousQueue是一个没有数据缓冲的BlockingQueue生产者线程对其的插入操作put必须等待消费者的移除操作take。如图所示SynchronousQueue 最大的不同之处在于它的容量为 0所以没有一个地方来暂存元素导致每次取数据都要先阻塞直到有数据被放入同理每次放数据的时候也会阻塞直到有消费者来取。需要注意的是SynchronousQueue 的容量不是 1 而是 0因为 SynchronousQueue 不需要去持有元素它所做的就是直接传递direct handoff。由于每当需要传递的时候SynchronousQueue 会把元素直接从生产者传给消费者在此期间并不需要做存储所以如果运用得当它的效率是很高的。1 应用场景SynchronousQueue非常适合传递性场景做交换工作生产者的线程和消费者的线程同步传递某些信息、事件或者任务。SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量但是这些请求需要很快的处理掉那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue这个线程池根据需要新任务到来时创建新的线程如果有空闲线程则会重复使用线程空闲了60秒后会被回收。2 SynchronousQueue使用#构建同步队列 SynchronousQueueInteger queue new SynchronousQueue();需要注意的是SynchronousQueue 的使用需要谨慎因为它非常容易导致死锁如果没有恰当地设计和同步生产者和消费者线程可能会造成程序无法继续执行。因此在使用 SynchronousQueue 时要注意线程同步和错误处理。2.1 死锁场景示例public class SynchronousQueueDeadlockDemo { public static void main(String[] args) { final SynchronousQueueInteger queue new SynchronousQueue(); Thread thread1 new Thread(() - { try { // 线程1尝试将数据放入队列 int data 42; queue.put(data); System.out.println(线程1放入数据 data); // 接着线程1尝试从队列中获取数据但此时没有其他线程来获取 int result queue.take(); System.out.println(线程1获取数据 result); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread thread2 new Thread(() - { try { // 接着线程2尝试将数据放入队列但此时没有其他线程来获取 int result 100; queue.put(result); System.out.println(线程2放入数据 result); // 线程2尝试从队列中获取数据但此时没有数据可用 int data queue.take(); System.out.println(线程2获取数据 data); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); thread2.start(); } }五、PriorityBlockingQueuePriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列数组的默认长度是11虽然指定了数组的长度但是可以无限的扩充直到资源消耗尽为止每次出队都返回优先级别最高的或者最低的元素。默认情况下元素采用自然顺序升序排序当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。优先级队列PriorityQueue 队列中每个元素都有一个优先级出队的时候优先级最高的先出。1 应用场景电商抢购活动会员级别高的用户优先抢购到商品银行办理业务vip客户插队2 PriorityBlockingQueue使用//创建优先级阻塞队列 Comparator为null,自然排序 PriorityBlockingQueueInteger queuenew PriorityBlockingQueueInteger(5); //自定义Comparator PriorityBlockingQueue queuenew PriorityBlockingQueueInteger( 5, new ComparatorInteger() { Override public int compare(Integer o1, Integer o2) { return o2-o1; } });思考如何实现一个优先级队列3 如何构造优先级队列使用普通线性数组(无序)来表示优先级队列执行插入操作时直接将元素插入到数组末端需要的成本为O(1),获取优先级最高元素我们需要遍历整个线性队列匹配出优先级最高元素需要的成本为o(n)删除优先级最高元素我们需要两个步骤第一找出优先级最高元素第二步删除优先级最高元素然后将后面的元素依次迁移填补空缺需要的成本为O(n)O(n)O(n)使用一个按顺序排列的有序向量实现优先级队列获取优先级最高元素O(1)删除优先级最高元素O(1)插入一个元素需要两个步骤第一步我们需要找出要插的位置这里我们可以使用二分查找成本为O(logn)第二步是插入元素之后将其所有后继进行后移操作成本为O(n)所有总成本为O(logn)O(n)O(n)二叉堆完全二叉树除了最后一行其他行都满的二叉树而且最后一行所有叶子节点都从左向右开始排序。二叉堆完全二叉树的基础上加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同二叉堆又可以分为两个类型大顶堆和小顶堆。大顶堆最大堆父节点的键值总是大于或等于任何一个子节点的键值小顶堆最小堆父节点的键值总是小于或等于任何一个子节点的键值。最小堆演示https://www.cs.usfca.edu/~galles/visualization/Heap.html4、如何快速从1亿数据取出最大的前100大厂高频TopK面试题如何快速从1 亿个数据中取出最大的前 100 个六、DelayQueueDelayQueue 是一个支持延时获取元素的阻塞队列 内部采用优先队列 PriorityQueue 存储元素同时元素必须实现 Delayed 接口在创建元素时可以指定多久才可以从队列中获取当前元素只有在延迟期满时才能从队列中提取元素。延迟队列的特点是不是先进先出而是会按照延迟时间的长短来排序下一个即将执行的任务会排到队列的最前面。它是无界队列放入的元素必须实现 Delayed 接口而 Delayed 接口又继承了 Comparable 接口所以自然就拥有了比较和排序的能力代码如下public interface Delayed extends ComparableDelayed { //getDelay 方法返回的是“还剩下多长的延迟时间才会被执行” //如果返回 0 或者负数则代表任务已过期。 //元素会根据延迟时间的长短被放到队列的不同位置越靠近队列头代表越早过期。 long getDelay(TimeUnit unit); }1 DelayQueue使用DelayQueue 实现延迟订单在实现一个延迟订单的场景中我们可以定义一个 Order 类其中包含订单的基本信息例如订单编号、订单金额、订单创建时间等。同时我们可以让 Order 类实现 Delayed 接口重写 getDelay 和 compareTo 方法。在 getDelay 方法中我们可以计算订单的剩余延迟时间而在 compareTo 方法中我们可以根据订单的延迟时间进行比较。下面是一个简单的示例代码演示了如何使用 DelayQueue 来实现一个延迟订单的场景public class DelayQueueExample { public static void main(String[] args) throws InterruptedException { DelayQueueOrder delayQueue new DelayQueue(); // 添加三个订单分别延迟 5 秒、2 秒和 3 秒 delayQueue.put(new Order(order1, System.currentTimeMillis(), 5000)); delayQueue.put(new Order(order2, System.currentTimeMillis(), 2000)); delayQueue.put(new Order(order3, System.currentTimeMillis(), 3000)); // 循环取出订单直到所有订单都被处理完毕 while (!delayQueue.isEmpty()) { Order order delayQueue.take(); System.out.println(处理订单 order.getOrderId()); } } static class Order implements Delayed{ private String orderId; private long createTime; private long delayTime; public Order(String orderId, long createTime, long delayTime) { this.orderId orderId; this.createTime createTime; this.delayTime delayTime; } public String getOrderId() { return orderId; } Override public long getDelay(TimeUnit unit) { long diff createTime delayTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } Override public int compareTo(Delayed o) { long diff this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return Long.compare(diff, 0); } } }由于每个订单都有不同的延迟时间因此它们将会按照延迟时间的顺序被取出。当延迟时间到达时对应的订单对象将会被从队列中取出并被处理。2 DelayQueue原理2.1 数据结构//用于保证队列操作的线程安全 private final transient ReentrantLock lock new ReentrantLock(); // 优先级队列,存储元素用于保证延迟低的优先执行 private final PriorityQueueE q new PriorityQueueE(); // 用于标记当前是否有线程在排队仅用于取元素时 leader 指向的是第一个从队列获取元素阻塞的线程 private Thread leader null; // 条件用于表示现在是否有可取的元素 当新元素到达或新线程可能需要成为leader时被通知 private final Condition available lock.newCondition(); public DelayQueue() {} public DelayQueue(Collection? extends E c) { this.addAll(c); }2.2 入队put方法public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock this.lock; lock.lock(); try { // 入队 q.offer(e); if (q.peek() e) { // 若入队的元素位于队列头部说明当前元素延迟最小 // 将 leader 置空 leader null; // available条件队列转同步队列,准备唤醒阻塞在available上的线程 available.signal(); } return true; } finally { lock.unlock(); // 解锁真正唤醒阻塞的线程 } }2.3 出队take方法public E take() throws InterruptedException { final ReentrantLock lock this.lock; lock.lockInterruptibly(); try { for (;;) { E first q.peek();// 取出堆顶元素( 最早过期的元素但是不弹出对象) if (first null)// 如果堆顶元素为空说明队列中还没有元素直接阻塞等待 available.await();//当前线程无限期等待直到被唤醒并且释放锁。 else { long delay first.getDelay(NANOSECONDS);// 堆顶元素的到期时间 if (delay 0)// 如果小于0说明已到期直接调用poll()方法弹出堆顶元素 return q.poll(); // 如果delay大于0 则下面要阻塞了 // 将first置为空方便gc first null; // 如果有线程争抢的Leader线程则进行无限期等待。 if (leader ! null) available.await(); else { // 如果leader为null把当前线程赋值给它 Thread thisThread Thread.currentThread(); leader thisThread; try { // 等待剩余等待时间 available.awaitNanos(delay); } finally { // 如果leader还是当前线程就把它置为空让其它线程有机会获取元素 if (leader thisThread) leader null; } } } } } finally { // 成功出队后如果leader为空且堆顶还有元素就唤醒下一个等待的线程 if (leader null q.peek() ! null) // available条件队列转同步队列,准备唤醒阻塞在available上的线程 available.signal(); // 解锁真正唤醒阻塞的线程 lock.unlock(); } }当获取元素时先获取到锁对象。获取最早过期的元素但是并不从队列中弹出元素。最早过期元素是否为空如果为空则直接让当前线程无限期等待状态并且让出当前锁对象。如果最早过期的元素不为空获取最早过期元素的剩余过期时间如果已经过期则直接返回当前元素如果没有过期也就是说剩余时间还存在则先获取Leader对象如果Leader已经有线程在处理则当前线程进行无限期等待如果Leader为空则首先将Leader设置为当前线程并且让当前线程等待剩余时间。最后将Leader线程设置为空如果Leader已经为空并且队列有内容则唤醒一个等待的队列。七、如何选择适合的阻塞队列1 选择策略通常我们可以从以下 5 个角度考虑来选择合适的阻塞队列功能第 1 个需要考虑的就是功能层面比如是否需要阻塞队列帮我们排序如优先级排序、延迟执行等。如果有这个需要我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。容量第 2 个需要考虑的是容量或者说是否有存储的要求还是只需要“直接传递”。在考虑这一点的时候我们知道前面介绍的那几种阻塞队列有的是容量固定的如 ArrayBlockingQueue有的默认是容量无限的如 LinkedBlockingQueue而有的里面没有任何容量如 SynchronousQueue而对于 DelayQueue 而言它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的我们需要根据任务数量来推算出合适的容量从而去选取合适的 BlockingQueue。能否扩容第 3 个需要考虑的是能否扩容。因为有时我们并不能在初始的时候很好的准确估计队列的大小因为业务可能有高峰期、低谷期。如果一开始就固定一个容量可能无法应对所有的情况也是不合适的有可能需要动态扩容。如果我们需要动态扩容的话那么就不能选择 ArrayBlockingQueue 因为它的容量在创建时就确定了无法扩容。相反PriorityBlockingQueue 即使在指定了初始容量之后后续如果有需要也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。内存结构第 4 个需要考虑的点就是内存结构。我们分析过 ArrayBlockingQueue 的源码看到了它的内部结构是“数组”的形式。和它不同的是LinkedBlockingQueue 的内部是用链表实现的所以这里就需要我们考虑到ArrayBlockingQueue 没有链表所需要的“节点”空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。性能第 5 点就是从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁它的操作粒度更细在并发程度高的时候相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外SynchronousQueue 性能往往优于其他实现因为它只需要“直接传递”而不需要存储的过程。如果我们的场景需要直接传递的话可以优先考虑 SynchronousQueue。2 线程池对于阻塞队列的选择线程池有很多种不同种类的线程池会根据自己的特点来选择适合自己的阻塞队列。Executors类下的线程池类型FixedThreadPoolSingleThreadExecutor 同理选取的是 LinkedBlockingQueueCachedThreadPool 选取的是 SynchronousQueueScheduledThreadPoolSingleThreadScheduledExecutor同理选取的是延迟队列