ArrayBlockingQueue 源码分析
约 1347 字大约 4 分钟
ArrayBlockingQueue 源码分析
1. 概述
ArrayBlockingQueue 是 Java 中的一个阻塞队列实现,它基于数组实现,并提供了一个固定大小的容量。该队列在多线程环境下提供了线程安全的 put 和 take 操作,用于实现生产者-消费者模型。它的主要特点是通过数组来存储元素,并且支持阻塞操作,既可以在队列为空时等待,也可以在队列满时等待。
ArrayBlockingQueue 是一个 有界队列,它的容量在创建时就被固定,并且提供了如下操作:
- 阻塞的入队和出队操作:
put和take,当队列满时,put会阻塞,直到有空间;当队列为空时,take会阻塞,直到有元素。 - 非阻塞的入队和出队操作:
offer和poll,它们不会阻塞,返回操作的成功与否。
2. 主要成员变量
private final Object[] items;
private int takeIndex;
private int putIndex;
private int count;
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;items:存储队列元素的数组,队列的元素存储在该数组中。takeIndex:从队列中获取元素时的索引。putIndex:插入元素时的索引。count:当前队列中的元素数量。lock:ReentrantLock锁,用于确保线程安全。notEmpty:当队列为空时,消费者线程会被挂起,直到有元素可供消费。notFull:当队列已满时,生产者线程会被挂起,直到有空间可以插入元素。
3. 构造方法
public ArrayBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.items = new Object[capacity];
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
}- 构造方法:
capacity是队列的最大容量,必须大于零。- 队列内部通过一个数组
items来存储元素,数组的大小由capacity决定。 - 使用
ReentrantLock来提供线程安全,notEmpty和notFull是两个条件变量,用于实现阻塞操作。
4. 核心方法分析
4.1 入队操作:put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (count == items.length) // 队列满,阻塞
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}- 步骤:
- 获取
lock锁,保证线程安全。 - 如果队列已满,当前线程会被阻塞,直到队列有空余空间(即
count < items.length)。 - 如果队列不满,调用
enqueue(e)将元素插入队列。 - 释放锁。
- 获取
- 内部方法
enqueue:
private void enqueue(E e) {
items[putIndex] = e; // 将元素放入数组
if (++putIndex == items.length) putIndex = 0; // 如果到达数组末尾,重置 putIndex
++count;
notEmpty.signal(); // 通知消费者线程队列不为空
}- 步骤:
- 将元素插入队列,并更新
putIndex和count。 - 如果
putIndex到达队列的末尾,重置为 0,形成循环队列。 - 调用
notEmpty.signal()唤醒等待的消费者线程。
- 将元素插入队列,并更新
4.2 出队操作:take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (count == 0) // 队列为空,阻塞
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}步骤:
- 获取
lock锁,保证线程安全。 - 如果队列为空,当前线程会被阻塞,直到队列有元素可供消费(即
count > 0)。 - 如果队列不为空,调用
dequeue()从队列中取出元素。 - 释放锁。
- 获取
内部方法
dequeue:
private E dequeue() {
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; // 获取队列头部元素
items[takeIndex] = null; // 清空该元素
if (++takeIndex == items.length) takeIndex = 0; // 如果到达数组末尾,重置 takeIndex
--count;
notFull.signal(); // 通知生产者线程队列不满
return x;
}- 步骤:
- 获取队列头部元素,更新
takeIndex和count。 - 如果
takeIndex到达队列的末尾,重置为 0,形成循环队列。 - 调用
notFull.signal()唤醒等待的生产者线程。
- 获取队列头部元素,更新
4.3 非阻塞操作:offer 和 poll
offer(E e):如果队列没有满,插入元素并返回true;如果队列已满,返回false。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) // 队列已满,返回 false
return false;
enqueue(e);
return true;
} finally {
lock.unlock();
}
}poll(long timeout, TimeUnit unit):尝试从队列中取出元素,如果队列为空则等待指定时间。如果超时仍未取到元素,返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0) {
if (!notEmpty.await(timeout, unit)) // 等待超时
return null;
}
return dequeue();
} finally {
lock.unlock();
}
}5. 性能特性
读操作:
take和put方法会在队列为空或已满时阻塞线程,采用的是基于ReentrantLock和条件变量的机制。这些操作的时间复杂度是 O(1),但需要考虑锁的竞争。写操作:
offer是非阻塞的,如果队列满,直接返回false;put是阻塞的,直到队列有空间。它们都涉及到锁和条件变量,因此写操作的性能受到锁竞争的影响。适用场景:
- 适用于生产者-消费者模型等需要固定大小缓冲区的场景。
- 多线程环境下需要安全的队列操作时,可以使用
ArrayBlockingQueue。
总结
ArrayBlockingQueue 提供了线程安全的阻塞队列实现,使用数组作为底层数据结构,并通过 ReentrantLock 和条件变量 (notEmpty、notFull) 来保证线程安全和高效的阻塞操作。它适用于那些需要处理大量生产和消费任务的场景,特别是当队列容量已知且固定时。在性能上,读操作非常高效,但写操作(如 put 和 take)可能会受到锁的竞争影响。
