大家好,又见面了,我是你们的朋友全栈君。
目录
从初学者的角度,认真地学习Java中队列的使用和设计。
参考
Queue
继承Collection
接口
Deque
一个支持两端插入和删除的线性集合,此接口支持容量受限和不受限的双端队列(大多数实现容量不受限)。
该接口定义了访问两端元素的方法,主要是插入、删除、检查元素方法。这些方法主要有两种形式,一种在操作失败时引发异常,一种在操作失败时返回特殊值(null
或者false
)。这里着重提一下插入操作,只有当队列容量受限时,插入操作才可能失败。
12个方法如下
该接口扩展了Queue接口。 当双端队列被用作队列时,将导致FIFO(先进先出)行为。 元素在双端队列的末尾添加,并从开头删除。 从Queue接口继承的方法与Deque方法完全等效,如下表所示:
双端队列也可以用作LIFO(后进先出)堆栈。 此接口应优先于旧版Stack类使用。 当双端队列被用作堆栈时,元素从双端队列的开始处被压入并弹出。 堆栈方法等同于Deque方法如下表所示:
强烈建议不要在队列中插入null
,因为null
是队列中某些方法的返回值,具有特殊意义,比如队列中没有元素了。
BlockingQueue
BlockingQueue支持队列访问时阻塞
(如果检索队列时队列已空,等待其有元素后再返回;如果存放元素时队列已满,等待队列有空间存放元素后再返回)。
接口内部的方法主要可以分为以下4类
该接口的方法是线程安全的
,因为实现类内部都使用了锁机制.
ArrayBlockingQueue
由数组
支持的有界阻塞队列
。该队列对元素FIFO(先进先出)进行排序。队列的开头是已在队列中停留最长时间的元素。队列的尾部是最短时间位于队列中的元素。新元素插入到队列的尾部,并且队列检索操作在队列的开头获取元素。
这是经典的“有界缓冲区”,其中固定大小的数组包含由生产者插入并由消费者提取的元素。创建后,容量将无法更改。试图将一个元素放入一个完整的队列将导致操作阻塞(put方法
)。试图从空队列中取出一个元素的尝试也会类似地阻塞(take方法
)。
此类支持给予等待的生产者和使用者线程一个可选的公平性策略。默认情况下,不保证此排序(公平性策略为false
)。但是,将公平性设置为true构造的队列将按FIFO顺序授予线程访问权限。公平通常会降低吞吐量,但会减少可变性并避免饥饿
。
分析一段代码
public void test() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<String>(4);
new Thread(new Runnable() {
@Override
public void run() {
try {
blockingQueue.put("1");
blockingQueue.put("2");
blockingQueue.put("3");
blockingQueue.put("4");
blockingQueue.put("5");
blockingQueue.put("6");
}catch (Exception e){
}
}
},"thread1").start();
Thread.sleep(2*1000);
//使thread1执行到put(5)时阻塞住
//此时数组元素为[1,2,3,4]
new Thread(new Runnable() {
@Override
public void run() {
try {
//移除[1,2,3,4]中的1,2
blockingQueue.take();
blockingQueue.take();
//接着thread1执行put(5),put(6)
//数组元素此时为[5,6,3,4]
}catch (Exception e){
}
}
},"thread2").start();
while (true){
}
}
内部维护了一个putIndex
, takeIndex
,表示此次put(take)元素的位置(从0开始), 到达数组元素size后,重置为0,重新来算。
所以FIFO,元素出队列后,并没有随后的元素都往前移动一位,而是将take的指针takeIndex往后移动一位,逻辑上等价于元素都往前移动一位。
LinkedBlockingQueue
链表组成的有界阻塞队列,内部使用两个可冲入锁, put操作可重入锁, take操作可冲入锁, 以及两个锁上相应的Condition
。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//队列已满,阻塞当前
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//队列不满,唤醒put操作线程
//这里是必要的,因为假设上面的线程A notFull.await释放了put锁,
// 其他的线程B也进入了while死循环,当线程C执行take()后
// A线程获取到put锁,跳出while循环, count原子性+1,
// 线程C仍处在while循环中
// A signal唤醒C并且A释放put锁后,C才有可能从while中恢复
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
..................................................
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
PriorityBlockingQueue
无界数组结构优先级队列(无界是doc上描述的,实际上数组长度长度不得超过Integer.MAX_VALUE-8)
使用了最小堆,put元素时,维护一个最小堆;take元素时,移除队首元素,则是堆顶元素(最小值),优先级最高(或者最低,具体看comparable接口方法的实现)的元素首先被移除
有几个关键方法,put元素构造最小堆的siftUpZComparable
方法,如下:
//将元素x置于k位置(k从0算起)
//方法执行完后,维护着一个数组结构的最小堆
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
//表示当前元素>=父节点元素,不做堆调整
if (key.compareTo((T) e) >= 0)
break;
//当前元素 < 父节点元素,要做调整
array[k] = e;
k = parent;
} //直到不比父节点小,才跳出循环
//将入参元素置于节点k中
//1. 若一进来,x元素就>=父节点,则k=入参中的k
//2. 若while循环执行过,则k就是入参k的某级父节点,将入参元素x设置到其位置上
array[k] = key;
}
take元素移除首节点(移除优先级最低的元素), siftDownComparable
//将x元素插入到k位置(k从0算起),向下调整
//执行take时,最终会调用到siftDownComparable(0,x,array, size-1)
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
//得到叶子节点的上一层下标
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
//c表示k节点左右孩子节点中较小的那一个元素
//child表示k节点左右孩子中较小的元素的下标
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
举例:
移除队首元素前,队列是最小堆结构,如下:
take方法执行后,内部方法siftDownComparable执行前,结构如下:
执行siftDownComparable(0, 5, [2,7,3,8,9,4], 6)方法,
while循环第一次执行完后,结构如下,此时half=3, array[0]=3,k=2,
while循环第二次执行后,结构如下,此时half=3, array[2]=4, k = 5
while第三次执行时,while条件不满足,跳出,执行
array[5]=5,结束,结构如下:
至此,一个新的最小堆又生成了。
移除下标i的元素,总是等价于将数组最后一个下标元素值A取出来,但是下标引用置为null, 将A插入到i位置,但是这个插入的位置涉及到堆的向下调整,这是本质。
PriorityBlockingQueue构造方法支持入参传递一个集合对象(集合对象自然要实现Comparable接口),则构造方法内部就需要立刻调整元素,形成最小堆,如下
private void heapify() {
Object[] array = queue;
int n = size;
int half = (n >>> 1) - 1;
//可以看出,从叶子节点的上一层开始,向下调整形成局部最小堆
// i--,同样的调整操作直至操作到顶部节点,就调整生成了全局的最小堆
Comparator<? super E> cmp = comparator;
if (cmp == null) {
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
DelayQueue
无界延迟性阻塞队列,jdk中的java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
就是一个实现DelayQueue接口的子类
参考 DelayQueue详解
测试用例
@Test
public void testScheduleTask() throws InterruptedException {
ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(5);
for (int i = 0; i < 10; i++) {
int finalI = i;
service.schedule(new Runnable() {
@Override
public void run() {
//十个任务,每个任务执行时间相差1s
System.out.println("i=" + finalI +
", and thread=" +Thread.currentThread().getName() +
",time=" + TestUtils.getTime());
}
}, i * 1000, TimeUnit.MILLISECONDS);
}
Thread.sleep(20*1000);
}
SynchronousQueue
参考 Java 并发 — 阻塞队列之SynchronousQueue源码分析, 非常好的一篇从源码角度分析SynchronousQueue
的博客。
LinkedTransferQueue
无界阻塞队列
Java 并发 — 阻塞队列之LinkedTransferQueue源码分析
LinkedBlockingQueue
链表结构的双向阻塞队列
Java 并发 — 阻塞队列之LinkedBlockingDeque源码分析
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/156208.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...