JUC-并发容器-3-队列

除了集合,还有一种数据结构就是队列,前面在学习线程池的时候已经接触过了,用队列可以在线程之间传递数据,最常见的就是生产者和消费者模式,队列又分为阻塞队列和非阻塞队列

阻塞队列和非阻塞队列

image

队列是一个逻辑结构,对应的物理结构可能是由数组或链表实现的

那么阻塞队列和非阻塞队列有什么区别

举个例子,现在有一个队列,长度是10,现在已经满了,当第11个数据放进去的时候,阻塞队列可以设置一个超时时间,如果在这个时间内,队列中有数据出队了,那他就能放进去了,如果超时就放弃,非阻塞队列直接就丢失了,不会等

对于出队来说,也就是从队列中获取数据,如果是非阻塞队列,队列中没数据的时候取出来的是null,而阻塞队列会等,等到有数据了取出来

阻塞队列

阻塞队列的实现有 ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue, 阻塞队列是线程安全的,阻塞队列又分为有界无界,无界队列的容量是 Integer.MAX_VALUE

常用方法

  • take(): 获取并移除队列头节点,如果队列中没有数据,就阻塞,直到队列中有数据了
  • put(): 向队列中插入元素,如果满了,就阻塞
  • add(): 向队列中添加元素,如果满了就抛异常
  • remove(): 移除元素,如果是空也抛异常
  • element(): 获取头结点,如果是空也会抛异常
  • offer(): 向队列添加元素,如果满了就返回false
  • poll(): 取出元素,如果空就返回null,取出的同时会删除这个元素
  • peek(): 取出元素,如果空就返回null,取出的同时不会删除这个元素

ArrayBlockingQueue

ArrayBlockingQueue 是一个有界阻塞队列,可以指定容量,同时可以指定公平与非公平,如果是保证公平的话,那么等待了最长时间的线程会被优先处理,不过这会同时带来一定的性能损耗.

案例

新建一个长度为3的队列,然后生产者线程往里放,消费者线程往出取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Slf4j
public class ArrayBlockingQueueTest {

public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
new Thread(new Provider(blockingQueue)).start();
new Thread(new Consumer(blockingQueue)).start();
}
}

@AllArgsConstructor
@Slf4j
class Provider implements Runnable {

BlockingQueue<String> queue;

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
queue.put("Candidate:" + (i + 1));
log.info("放入了第" + (i + 1) + "个数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 放一个停止的标记
try {
queue.put("over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@AllArgsConstructor
@Slf4j
class Consumer implements Runnable {

BlockingQueue<String> queue;

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 从队列中取东西
String msg = null;
try {
while(!"over".equals((msg = queue.take()))){
log.info("从队列中取出内容:{}",msg);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("没有数据了...");

}
}

LinkedBlockingQueue

LinkedBlockingQueue 是一个无界的阻塞队列,容量是 Integer.MAX_VALUE ,数据是一个链表结构的,看一下他的源码

首先是一个内部类 Node ,用来存放数据

image

这里就可以看出来他是一个链表结构的

然后是两个重要的属性

image

这里可以看到, put 和 take 是两把锁,所以放入和取出互不干扰,最后再看看 put 方法.

image

具体用法跟上面的 ArrayBlockingQueue 类似

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的阻塞队列,可以自定义顺序,也是一个无界队列, PriorityQueue 的线程安全版本

SynchronousQueue

SynchronousQueue 这个队列的容量为0,它不会取持有元素,做的事情就是直接投递数据,这个队列没有 peek() 等函数,因为它没有头节点,我们之前用的可缓存线程池就是用的这个队列

延迟队列

队列对应的还有延迟队列, Dueue ,根据延迟时间排序.元素需要实现 Delayed 接口

非阻塞并发队列

非阻塞并发队列只有一个 ConcurrentLinkedQueue ,链表数据结构,使用CAS非阻塞算法实现线程安全,适合在性能要求较高的并发场景,用的相对较少