Java阻塞队列学习及LinkedBlockingQueue源码分析

Java阻塞队列

最近在学习Java的多线程编程,对并发编程的认识也从之前用python写简单多线程程序里对并发代码块上锁,到生产者消费者模型,wait(),notify(),阻塞队列这些概念,那么这里要介绍的阻塞队列是什么呢?

考虑生产者消费者模型这种情况,在从队列中取元素与拿元素时加锁,但在这种场景下仅加锁是不够的,因为生产者往里面放元素的时候,还要考虑队列有没有满,如果已经满了的话就要wait一直到队列不满的时候再往里面放,消费者也是一样的,从队列里面拿元素的时候还要考虑队列是不是空的,如果是空的就要一直wait到非空的时候。注意这里我们就从加锁操作引出了wait(), notify()两个概念,在并发编程中结合这两个概念可以实现更多更精巧复杂的多线程场景。

好像扯得有点远,还没有说到我们要介绍的阻塞队列的概念,其实阻塞队列就源自于生产者消费者模型中所涉及到的场景,在多线程下很多场景都要有一些临界资源,这些资源可能被多个线程同时访问,所以就要进行加锁啊,wait(),notify()等操作,其实个人觉得加锁还好,就是将一个对临界资源操作的语句块圈起来就好,但要实现wait,notify操作就有些麻烦了,放东西的时候需要一直判断是不是满了,满了还要wait,然后操作完了后还要记得唤醒各种线程防止死锁等等,如果都自己实现的话就有点太麻烦了,更重要的是容易出错,所以这里就引入了阻塞队列的概念,不是临界资源的操作需要加锁,判满,判空等等各种操作嘛,那好,临界资源你直接用我给你写的类,我在类里面已经对所有需要处理的多线程场景进行了处理,使其变得线程安全了,使用这种类的话,自己完全不需要考虑多线程的线程安全问题,而这种类就是我们要介绍的阻塞队列。

LinkedBlockingQueue源码分析

还是蛮幸运的,划分分析方向的时候我正好被分到去分析这个类,这个类用的场景还是挺多的,最经典的就是消费者生产者问题,所以在源码分析中我们可以接着结合生产者消费者这个场景来理解

对LinkedBlockingQueue简单的介绍

以下摘自Javadoc中的描述

An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
linked nodes.
This queue orders elements FIFO (first-in-first-out).
The head of the queue is that element that has been on the
queue the longest time.
The tail of the queue is that element that has been on the
queue the shortest time. New elements
are inserted at the tail of the queue, and the queue retrieval
operations obtain elements at the head of the queue.
Linked queues typically have higher throughput than array-based queues but
less predictable performance in most concurrent applications.

The optional capacity bound constructor argument serves as a
way to prevent excessive queue expansion. The capacity, if unspecified,
is equal to {@link Integer#MAX_VALUE}. Linked nodes are
dynamically created upon each insertion unless this would bring the
queue above capacity.

简单翻译一下:

这是基于链式表的一个任意长度的队列。这个队列中元素的顺序基于FIFO先进先出的规则,也就是队列里的head元素是最早入队列的元素,队列的tail元素是最晚入队列的元素。新的元素会被插到队列的队尾,并且从队列中取元素的时候取得是队列的head。链式队列相比基于数组的队列具有更好的吞吐量但是在很多并发应用中缺少可预见的性能。

构造函数的参数中提供可选择的队列容量,则可以阻止队列无限的扩充。关于这个容量参数,如果没有特殊指定的话,就按照Integer类里面的MAX_VALUE值来指定(特意去看了下,那个上限值是0x7fffffff)。链表的节点在插入的时候都是动态生成的除非队列的容量达到了之前指定的上限值

这种链表队列与我们之前使用的链表有什么不同?

其实基本都是差不多的啦,我们在链表里能够使用的方法在这个类里面也基本都实现了,只是在实现中需要考虑到如何实现线程安全这一部分的代码。从类的声明中我们可以看到这个类的就是从AbstractQueue<E>中继承的方法,同时实现了阻塞队列BlockingQueue所规定的一些接口

1
2
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable

LinkeBlockingQueue的成员变量以及一些私有方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
//链表的节点对象定义,每个节点有一个存储元素的变量item,还引用节点类型的变量
//next用于指向下一个节点。
//构造方法可以为此节点存储的元素item赋值
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//用于保存链表队列的最大长度,之前的简介里也已经提到过了,如果没有特殊指定的话,就用
//Integer.MAX_VALUE,也就是0x7fffffff
private final int capacity;
//count代表当前链表中元素的数量。需要特别指出来的是,这里并不是一个简单的int类型,
//而是用的AtomicIntegerler类型,这是种原子类型,所有的操作都是连续完成的,
//不会受到干扰,如果不清楚的话,只要知道这个对这个类型的操作是线程安全的就行了。
//为什么要用这种类型呢,直接用int难道不行?是这样的,LinkedBlockingQueue支持同时
//往一个队列里面放元素与取元素,这一点马上就会在下面的分析中得到佐证。所以如果能
//同时存取的话,count值就有可能被同时操作,所以我们要把它设置成原子类型以保证
//count这个临界资源的线程安全
private final AtomicInteger count = new AtomicInteger();
//链表的头,也就是取出数据的位置,head节点的item变量是恒为null的,next则指向下一个节点
transient Node<E> head;
//链表的尾节点,也就是添加数据的位置,last节点的next变量是恒为null的,
//item则存放着最后一个元素,也就是最新进入队列的值
private transient Node<E> last;
//此对象在运行take,poll方法的时候需要加的锁,也就是取锁,防止另一个取进程同时进行操作
private final ReentrantLock takeLock = new ReentrantLock();
//属于取锁takeLock的变量,有await()与signal()方法,用于实现取东西的时候为空所要等待
//以及取完后唤醒其他取东西进程的两种操作
private final Condition notEmpty = takeLock.newCondition();
//此对象在运行put,offer方法的时候要加的锁,也就是放东西的时候要加上的锁,
//防止另一个放东西进程同时进行操作这里应该就很明显能够看出来了,在队列中拿东西与
//放东西使用的是两个不同的锁,所以是支持这两个进程同时操作的
private final ReentrantLock putLock = new ReentrantLock();
//属于放东西锁的变量,与notEmpty差不多用途,用于实现放东西时队列为满则等待
//以及放完后再唤醒下其他放东西进程的这两种操作
private final Condition notFull = putLock.newCondition();
//其实就相当于notEmpty.signal(),用于在放东西完成后调用来唤醒取东西进程,这里
//专门写成函数只是因为在调用notEmpty.signal()的时候是要进行takeLock的
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
//同理于signalNotEmpty()函数,是在取东西完成后用于唤醒放东西线程用的
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
//最基础的入队列过程,相信在数据结构里面这种操作已经写过无数遍了,因此此处不再叙述
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//最基础的出队列过程,同样不再叙述细节
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
//这个函数比较有意思,是把拿东西取东西两个都禁止掉,适用于一种什么场景呢?
//在LinkedBlockedQueue类中还提供了链表中的搜索函数————contains(),
//移除节点函数————remove(),这些函数在执行的时候,出入队列很显然都是不被允许的,
//因此就需要加上这种锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//对应上面的fullyLock()函数
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

LinkedBlockingQueue提供的public接口

生产者消费者模型中最重要的就是放东西,取东西这两种操作,里面涉及到了wait,notify各种行为,所以我们先来看这两类接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//这是LinkedBlockingQueue中提供的放东西接口,其实放东西取东西的操作都是
//差不多的,所以我会把这一个叙述的详细点,其他的函数也就触类旁通了
public void put(E e) throws InterruptedException {
//如果传入的元素是null的话,直接返回空指针异常
if (e == null) throw new NullPointerException();
//这个c命名为-1,是有自己的意义的,之后的函数也都很常见这个c=-1,那么我
//就在这里做统一的描述吧。考虑这样一种场景,在try块里面对c赋值的语句(本函数
//就是c = count.getAndIncrement();)前面运行出现错误了,也就是不会执行到
//对c赋值这个点,那么在if (c == 0) signalNotEmpty();这个语句里就不会唤醒
//那些等待取东西的线程,这就是在最开始将c置为-1的意义。另外推荐去自行分析下
//public boolean offer(E e, long timeout, TimeUnit unit)这个函数里面
//对c赋值为-1的目的场景,是个很有意思的情况。
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可以看到开始加放东西锁了,如果成功等到加上了此锁,接下来的过程中就不会再有
//放东西的线程同时执行了
putLock.lockInterruptibly();
try {
//在count为满时此线程进入等待状态,一直到有拿东西线程从队列中取出来东西
//然后唤醒它为止
while (count.get() == capacity) {
notFull.await();
}
//执行入队列操作
enqueue(node);
//更新c的值,让c+1
c = count.getAndIncrement();
//在成功执行完成放东西进程后就是惯例的唤醒操作啦,尽量唤醒各种线程,
//如果唤醒的不全或者忘记唤醒的话就很容易造成死锁,本来只是加锁的话wait与
//notify操作都是自动Java自动完成的,但如果你自己使用等待、唤醒函数的话,
//就要想着自己唤醒了,不想着的话就可能有的线程一直处于wait状态而产生死锁。
//这里我们有两个线程会可能产生wait状态,一个拿线程,一个放东西线程,
//所以就需要唤醒两次,下面这一次就是为了唤醒放东西线程,
//而之后的signalNotEmpty()就是为了唤醒取东西线程
if (c + 1 < capacity)
notFull.signal();
}
//使用finally操作的意义就是无论try块是顺序执行还是中途哪里出现异常,
//都会执行这里面的解锁操作。
finally {
putLock.unlock();
}
//唤醒取东西线程
if (c == 0)
signalNotEmpty();
}
//拿东西线程,其实和取东西线程都是大同小异的,我们可以通过这个函数再复习一遍
//多线程操作的基本结构
public E take() throws InterruptedException {
//首先定义返回值,用于存储取出来的元素
E x;
//还是c=-1,用于一些特殊场景
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//然后开始正式操作前加锁
takeLock.lockInterruptibly();
try {
//如果没有东西可取则等待,一直到被唤醒后接着往下运行
while (count.get() == 0) {
notEmpty.await();
}
//进行经典的出队列操作
x = dequeue();
c = count.getAndDecrement();
//然后开始各种唤醒,这里唤醒的是取东西线程
if (c > 1)
notEmpty.signal();
} finally {
//解锁操作要放在finally块里面
takeLock.unlock();
}
//这里唤醒的是放东西线程
if (c == capacity)
signalNotFull();
return x;
}

然后除了上面两个最常用的存放操作外,LinkedBlockingQueue对象还提供了几个其他的存放操作用于一些特殊的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//也是用于入队列操作,与put不同的如果需要等待的话,就直接返回false,如果
//不需要等待就可以直接操作的话就进行操作,返回true。
//但其实仔细分析的话,可以发现在一些场景下仍然可能存在等待时间,
//满时返回false的操作也并一定都是在if
//(count.get() == capacity) return false;中完成的,看下面的具体实现分析
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果满的话,就不进行插入操作,返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
//这里进行加锁操作,但要注意的是,因为从判断非空到这里的加锁并不是一个原子操作,
//中间可能插入各种进程,因此可能刚前面判断还没满可以放东西,到这里就发现有线程
//正在处理它而需要等待加锁,所以在这种情况下仍然可能出现等待的时间,只是
//可能时间非常短罢了
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//这里可以看到,为了保证我们的函数能够很快的返回操作能不能插入,所以这里
//就不用满时候的wait操作,如果满了的话就直接跳过入队列进程。留意一点,如果
//没有执行插入操作的话c的值就不会改变,仍然是-1,这点马上我们会用到
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
//这里是唤醒操作
if (c == 0)
signalNotEmpty();
//如果执行了入队操作的话这里的c就不会是空,所以返回true,如果没有执行入队操作
//的话,这里的c仍然是-1,返回false,也就是我们提到的返回false的第二种情况
return c >= 0;
}
//这个函数可以提供等待的上限时间,在等待时间超时的时候就丢弃本次入队操作而返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//将指定的时间长度转换为毫秒来进行处理
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//如果等待的剩余时间小于等于0,那么直接返回
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
//下面的这两个函数就是在取函数上的翻版,所以不再详细介绍了
/* 在取操作的时候可以指定等待时间,超过上限的话就丢弃本次操作,返回null */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/* 在取操作的时候看看需不需要等待,如果需要等待的话就丢弃本次操作,返回null */
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

LinkedBlockingQueue提供的搜索函数,移除函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//这两个函数都比较简单,所以不做具体实现分析,只阐述个大概吧
//首先因为搜索与移除都是要对整个链表进行的遍历操作,因此在锁上是要把
//入队锁出队锁都加上的,这时候就禁止了任何涉及到对队列修改的操作了
//然后因为在执行的时候已经加上锁了就只有这一个线程对队列进行操作,所以
//不会有竞争,也就不需要等待、唤醒之类的操作
//之后就是简单的对链表进行遍历的操作了
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}

一个简单的测试

我们模仿下库房的出库与入库操作吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueTest {
//我们的队列linkedBLockingQueue,使用阻塞队列对象
private LinkedBlockingQueue<String> linkedBlockingQueue;
public LinkedBlockingQueueTest() {
linkedBlockingQueue = new LinkedBlockingQueue<String>();
}
//一个从库房中拿东西的内部类,我们设置其每1s从传动带上拿一次东西
class BuyThread extends Thread{
@Override
public void run() {
String thing;
while(true){
try {
thing = linkedBlockingQueue.take();
System.out.println("取出了" + thing);
sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
//一个往库房中放东西的内部类,我们设置其每0.5s往里面放一个东西
class PutThread extends Thread{
@Override
public void run() {
int thing = 1;
while(true){
try {
linkedBlockingQueue.put(Integer.toString(thing));
System.out.println("放入了" + thing);
thing += 1;
sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//main函数里开始执行,首先初始化这个对象,然后分别开启取线程与拿线程
public static void main(String[] args) {
LinkedBlockingQueueTest linkedBlockingQueueTest = new LinkedBlockingQueueTest();
linkedBlockingQueueTest.new BuyThread().start();
linkedBlockingQueueTest.new PutThread().start();
}
}

OUTPUT:

取出了1
放入了1
放入了2
取出了2
放入了3
放入了4
取出了3
放入了5
放入了6
取出了4
放入了7
放入了8
取出了5
放入了9

可以看到,开始放入了1,然后随即唤醒取线程拿出了1,然后0.5s后放东西线程接着放2….

这里的显示结果与我们想的有些不同的原因是,从读取操作到输出操作并非一个原子操作,所以输出结果可能与内部存放的步骤有些差异。