抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

java并发工具类-队列

PriorityBlockingQueue

PriorityBlockingQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

PriorityBlockingQueue是一个无限的并发队列。它使用与java.util.PriorityQueue类相同的排序规则。你不能将null插入此队列。

​ 插入java.util.PriorityQueue的所有元素必须实现java.lang.Comparable接口。因此,元素根据你在Comparable 中的实现进行优先级排序。

​ 注意,对于具有相同优先级的元素(compare()== 0),不会强制执行任何特定行为。

​ 另请注意,如果你从PriorityBlockingQueue得到一个IteratorIterator不保证按优先级顺序迭代元素。

示例

以下是使用PriorityBlockingQueue的示例:

1
2
3
4
5
6
BlockingQueue<String> queue = new PriorityBlockingQueue<String>();

//String implements java.lang.Comparable
queue.put("Value");

String value = queue.take();

源码

PriorityBlockingQueue内部使用了一个以数组为基础的二叉堆,所有的公共操作使用一个锁来进行保护。当对数组进行扩容时,放弃主锁,使用一个简单的自旋锁进行扩容,这样做是为了让扩容和提取元素同步进行。

成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 默认初始化大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
* 数组可分配的最大容量。
* 一些虚拟机在数组中分配了对象头,尝试分配更大的容量
* 可能会导致OOM,请求的数组容量超过了允许的上限。
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* 优先级队列使用Comparator进行排序,或者通过元素的自然顺序,
* 即实现了Comparable接口。如果没有比较器:对于在堆中的每个结点n,
* 以及它的后代 d,n <= d。
*/
private transient Object[] queue;

// 队列元素数量
private transient int size;

// 比较器,为null代表使用自然顺序排序
private transient Comparator<? super E> comparator;

private final ReentrantLock lock;

private final Condition notEmpty;

/**
* 分配时的自旋锁,通过CAS获得
*/
private transient volatile int allocationSpinLock;

/**
* 只为序列化操作使用
*/
private PriorityQueue<E> q;
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}

/**
* 如果给定的集合是 SortedSet或者 PriorityQueue, 这个优先级
* 队列根据同样的顺序排序。
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true 如果不知道二叉堆的顺序
boolean screen = true; // true 如果必须检查null

// 针对 SortedSet和 PriorityQueue处理
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}

Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
增加操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 因为PriorityBlockingQueue本身拒绝插入null,所以offer也需要抛出NPE,
// 复用offer方法即可
public boolean add(E e) {
return offer(e);
}

// 此队列是无界的,不会返回false
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 将元素插入二叉堆中
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 唤醒等待获取元素的线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}

private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 获取自旋锁
if (allocationSpinLock == 0 &&
ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
try {
// 计算新容量。如果当前容量很小,那么直接扩容一倍多一点,
// 因为此时容量可能会迅速增长,否则扩容50%即可
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 如果新容量大于最大容量,那么计算当前最小容量(+1),
// 如果依然大于最大容量,抛出OOM
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
// CAS竞争自旋锁失败,调度此线程
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 加锁,拷贝元素
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}



// 此队列是无界的,所以永远不会被阻塞,复用offer方法即可
public void put(E e) {
offer(e); // never need to block
}

public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
删除操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
int n = size - 1;
// 队列为空返回null
if (n < 0)
return null;
else {
// 获取头元素
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
// 整理二叉堆
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
访问操作
1
2
3
4
5
6
7
8
9
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
迭代器

PriorityBlockingQueue类中的迭代器和DelayQueue中的迭代器一样,都不会与原组件保证一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 调用toArray()方法获取当前的二叉堆
public Iterator<E> iterator() {
return new Itr(toArray());
}

/**
* Snapshot iterator that works off copy of underlying q array.
*/
final class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such

Itr(Object[] array) {
lastRet = -1;
this.array = array;
}

public boolean hasNext() {
return cursor < array.length;
}

public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
return (E)array[lastRet = cursor++];
}

public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
核心要点
  1. 必须提供要Comparator接口或者队列元素实现Comparable接口。
  2. 可以同时进行扩容和提取元素的操作,不过只能有一个线程进行扩容
  3. 数组大小小于64时,进行双倍容量的扩展,否则扩容1.5倍
  4. 使用迭代器访问元素的顺序不会按指定的比较器顺序
  5. 迭代器不会与原数组保持一致性

LinkedBlockingDeque

LinkedBlockingDeque类实现了BlockingDeque接口。阅读BlockingDeque文本以获取有关的更多信息。

Deque来自“双端队列” 这个词。Deque是一个队列,你可以在插入和删除队列两端的元素。

LinkedBlockingDeque是一个Deque,如果一个线程试图从中获取一个元素,而队列空的,不管线程从哪一端试图获取元素,都会被阻塞。

示例

以下是实例化和使用LinkedBlockingDeque的例子:

1
2
3
4
5
6
7
BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

deque.addFirst("1");
deque.addLast("2");

String two = deque.takeLast();
String one = deque.takeFirst();

源码

整体介绍

LinkedBlockingDequeLinkedBlockingQueue的实现大体上类似,区别在于LinkedBlockingDeque提供的操作更多。并且LinkedBlockingQueue内置两个锁分别用于put和take操作,而LinkedBlockingDeque只使用一个锁控制所有操作。因为队列能够同时在头尾进行put和take操作,所以使用两个锁也需要将两个锁同时加锁才能保证操作的同步性,不如只使用一个锁的性能好

同步节点相比LinkedBlockingQueue多了一个prev字段。

1
2
3
4
5
6
7
8
9
10
11
static final class Node<E> {
E item;

Node<E> prev;

Node<E> next;

Node(E x) {
item = x;
}
}
增加操作

增加操作相比LinkedBlockingQueue只能在队列尾部增加,它能在队列的头尾两端都进行增加操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
public void addFirst(E e) {
// 复用offer方法
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}

public void addLast(E e) {
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}

public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
// 构造节点
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入到队列头部
return linkFirst(node);
} finally {
lock.unlock();
}
}

private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();

// 如果队列已满,返回false
if (count >= capacity)
return false;
// 获取头节点,将自己的 next字段指向头节点,然后设置自己为头节点
Node<E> f = first;
node.next = f;
first = node;
// 如果队列为空,尾节点也指向自己
if (last == null)
last = node;
else
f.prev = node;
++count;
// 唤醒等待获取元素的线程
notEmpty.signal();
return true;
}

public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入到队列尾部
return linkLast(node);
} finally {
lock.unlock();
}
}

private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();

// 如果队列已满,返回false
if (count >= capacity)
return false;
// 将自己设置为尾节点
Node<E> l = last;
node.prev = l;
last = node;
// 如果队列为空,头节点也指向自己
if (first == null)
first = node;
else
l.next = node;
++count;
// 唤醒等待获取元素的线程
notEmpty.signal();
return true;
}

public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列已满,等待
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}

public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列已满,等待
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}

public boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
// 计算超时时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已满,超时等待
while (!linkFirst(node)) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}

public boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (!linkLast(node)) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
删除操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
public E removeFirst() {
// 复用poll操作
E x = pollFirst();
if (x == null) throw new NoSuchElementException();
return x;
}

public E removeLast() {
E x = pollLast();
if (x == null) throw new NoSuchElementException();
return x;
}

public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取头节点的值,并删除它
return unlinkFirst();
} finally {
lock.unlock();
}
}

private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();

// 如果队列为空,返回null
Node<E> f = first;
if (f == null)
return null;
// 重置头节点
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
// 唤醒等待插入的线程
notFull.signal();
return item;
}

public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}

private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
// 队列为空,返回null
if (l == null)
return null;
// 更新尾节点
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}

public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 如果队列为空,等待
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 如果队列为空,等待
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}

public E pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkLast()) == null) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}

访问操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public E getFirst() {
// 复用peek方法
E x = peekFirst();
if (x == null) throw new NoSuchElementException();
return x;
}

public E getLast() {
E x = peekLast();
if (x == null) throw new NoSuchElementException();
return x;
}

public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列不为空,返回头元素
return (first == null) ? null : first.item;
} finally {
lock.unlock();
}
}

public E peekLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列不为空,返回尾元素
return (last == null) ? null : last.item;
} finally {
lock.unlock();
}
}
BlockingQueue 方法

由于BlockingDeque继承自BlockingQueue接口,所以需要实现BlockingQueue中的方法,具体只需要复用前面提到的方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public boolean add(E e) {
addLast(e);
return true;
}

public boolean offer(E e) {
return offerLast(e);
}

public void put(E e) throws InterruptedException {
putLast(e);
}

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return offerLast(e, timeout, unit);
}

public E remove() {
return removeFirst();
}

public E poll() {
return pollFirst();
}

public E take() throws InterruptedException {
return takeFirst();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return pollFirst(timeout, unit);
}

public E element() {
return getFirst();
}

public E peek() {
return peekFirst();
}

核心要点
  1. 内部使用一个双向链表
  2. 可以在链表两头同时进行put和take操作,只能使用一个锁
  3. 插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;take线程同理。
  4. 迭代器与内部的双向链表保持弱一致性,调用remove(T)方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。
  5. 迭代器的forEachRemaining(Consumer<? super E> action)以64个元素为一批进行操作
  6. forEach(Consumer<? super E> action)removeIfremoveAllretainAll都是64个元素为一批进行操作

SynchronousQueue

SynchronousQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

SynchronousQueue是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。

​ 将这个类称为队列有点夸大其词。这更像是一个点。

源码

SynchronousQueue的内部实现了两个类,一个是TransferStack类,使用LIFO顺序存储元素,这个类用于非公平模式;还有一个类是TransferQueue,使用FIFI顺序存储元素,这个类用于公平模式。这两个类继承自”Nonblocking Concurrent Objects with Condition Synchronization”算法,此算法是由W. N. Scherer III 和 M. L. Scott提出的,关于此算法的理论内容在这个网站中:http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html。两个类的性能差不多,FIFO通常用于在竞争下支持更高的吞吐量,而LIFO在一般的应用中保证更高的线程局部性。

​ 队列(或者栈)的节点在任何时间要么是”date”模式 —— 通过put操作提供的元素的模式,要么是”request”模式 —— 通过take操作取出元素的模式,要么为空。还有一个模式是”fulfill”模式,当队列有一个data节点时,请求从队列中获取一个元素就会构造一个”fulfill”模式的节点,反之亦然。这个类最有趣的特性在于任何操作都能够计算出现在队列头节点处于什么模式,然后根据它进行操作而无需使用锁。

​ 队列和栈都继承了抽象类Transferer,这个类只定义了一个方法transfer,此方法可以既可以执行put也可以执行take操作。这两个操作被统一到了一个方法中,因为在dual数据结构中,put和take操作是对称的,所以相近的所有结点都可以被结合。使用transfer方法是从长远来看的,它相比分为两个几乎重复的部分来说更加容易理解。

​ 队列和栈数据结构在概念上有许多相似性,但是在真正的实现细节上却几乎没有什么相似的地方。为了简单起见,它们保持清晰,这样在以后它们能以不同的方法扩展。

​ 在SynchronousQueue中使用的队列和栈的算法和”Nonblocking Concurrent Objects with Condition Synchronization”算法相比是不同的版本,包括对取消的处理。主要的差别如下:

  1. 最初的算法使用了位标记指针,但是此类在结点中使用了模式位,这导致了很多深入的改变。

  2. SynchronousQueue必须阻塞线程,直到变为fulfilled模式。

  3. 支持取消操作,通过超时和中断方式,包括清除被取消的结点/线程,以避免无法进行垃圾回收和无用的内存消耗。

​ 阻塞主要通过LockSupportpark/unpark方法完成,除了下一个结点将要变为fulfilled模式的情况,这时会在多处理器机器中使用自旋等待。在非常忙碌的SynchronousQueue中,自旋可以显著改变吞吐量。而在不忙碌的情况下,自旋的次数就会变的足够小,不会影响性能。

​ 清除操作在队列和栈中以不同的方式完成。对于队列,当结点被取消时,我们总是可以在O(1)时间立刻删除它。但是如果它被固定在队尾,它就必须等待直到其他取消操作完成。对于栈来说,我们需要以O(n)时间遍历来确保能够删除这个结点,不过这个操作可以和其他访问栈的线程同时进行。

队列和栈的父类为Transferer。它只定义了一个通用方法。

1
2
3
4
5
6
7
8
9
10
11
12
abstract static class Transferer<E> {
/**
* 执行put或者take操作/
* 如果参数e非空,这个元素将被交给一个消费线程;如果为null,
* 则请求返回一个被生产者提交的元素。
* 如果返回的结果非空,那么元素被提交了或被接受了;如果为null,
* 这个操作可能因为超时或者中断失败了。调用者可以通过检查
* Thread.interrupted来区分到底是因为什么元素失败。
*/
abstract E transfer(E e, boolean timed, long nanos);
}

TransferStack

这个类继承自Scherer-Scott的 dual stack 算法,但不完全相同,它使用结点而不是位标记指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class TransferStack<E> extends Transferer<E> {

/* Modes for SNodes, ORed together in node fields */
/** 表示一个未满足的消费者 */
static final int REQUEST = 0;
/** 表示一个未满足的生产者 */
static final int DATA = 1;
/** Node is fulfilling another unfulfilled DATA or REQUEST */
static final int FULFILLING = 2;

static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

/** Node class for TransferStacks. */
static final class SNode {
volatile SNode next; // 栈中的下一个结点
volatile SNode match; // 匹配此结点的结点
volatile Thread waiter; // 控制 park/unpark
Object item; // 数据
int mode;

核心算法 transfer

使用put操作时参数e不为空,而使用take操作时参数e为null,而timednanos指定是否使用超时。

  1. 如果头节点为空或者已经包含了相同模式的结点,那么尝试将结点
    增加到栈中并且等待匹配。如果被取消,返回null

  2. 如果头节点是一个模式不同的结点,尝试将一个fulfilling结点加入到栈中,匹配相应的等待结点,然后一起从栈中弹出,并且返回匹配的元素。匹配和弹出操作可能无法进行,由于其他线程正在执行操作3

  3. 如果栈顶已经有了一个fulfilling结点,帮助它完成它的匹配和弹出操作,然后继续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
E transfer(E e, boolean timed, long nanos) {
/*
* 基础算法,循环尝试下面三种操作中的一个:
*
* 1. 如果头节点为空或者已经包含了相同模式的结点,尝试将结点
* 增加到栈中并且等待匹配。如果被取消,返回null
*
* 2. 如果头节点是一个模式不同的结点,尝试将一个`fulfilling`结点加入
* 到栈中,匹配相应的等待结点,然后一起从栈中弹出,
* 并且返回匹配的元素。匹配和弹出操作可能无法进行,
* 由于其他线程正在执行操作3
*
* 3. 如果栈顶已经有了一个`fulfilling`结点,帮助它完成
* 它的匹配和弹出操作,然后继续。
*/

SNode s = null; // constructed/reused as needed
// 传入参数为null代表请求获取一个元素,否则表示插入元素
int mode = (e == null) ? REQUEST : DATA;

for (;;) {
SNode h = head;
// 如果头节点为空或者和当前模式相同
if (h == null || h.mode == mode) { // empty or same-mode
// 设置超时时间为 0,立刻返回
if (timed && nanos <= 0L) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
// 构造一个结点并且设为头节点
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 等待满足
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 检查头节点是否为FULFILLIING
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
// 更新头节点为自己
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 循环直到匹配成功
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
// 帮助满足的结点匹配
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}

​ 当一个结点插入到栈中,它要么能和其他结点匹配然后一起出栈,否则就需要等待一个匹配的结点到来。在等待的过程中,一般使用自旋等待代替阻塞(在多处理器环境下),因为很有可能会有相应结点到来。如果自旋结束还没有匹配,那么就设置waiter然后阻塞自己,在阻塞自己之前还会再检查至少一次是否有匹配的结点。

如果等待的过程中由于超时到期或者中断,那么需要取消此节点,方法是将match字段指向自己,然后返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = shouldSpin(s)
? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
: 0;
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0) {
Thread.onSpinWait();
spins = shouldSpin(s) ? (spins - 1) : 0;
}
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanos);
}
}

使用TransferStack即SynchronousQueue的非公平模式时,先put再take结点变化如下(注意DATA节点是插入线程构造的,而REQUEST是提取元素的线程的模式,此节点在构造时会变为FULFILLING节点,此处依然使用REQUEST以指代是take线程):

​ 如果先take再put时,插入线程则会构建一个模式为[11]的结点,而11 & FULFILLING != 0, 所以isFulfilling(h.mode)方法会返回true。

清除

在最坏的情况我们需要遍历整个栈来删除节点s。如果有多个线程并发调用clean方法,我们不会知道其他线程可能已经删除了此节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread

/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
* it. But we can stop when we see any node known to
* follow s. We use s.next unless it too is cancelled, in
* which case we try the node one past. We don't check any
* further because we don't want to doubly traverse just to
* find sentinel.
*/

SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;

// 删除头部被取消的节点
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);

// 移除中间的节点
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
TransferQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static final class TransferQueue<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual queue algorithm, differing,
* among other ways, by using modes within nodes rather than
* marked pointers. The algorithm is a little simpler than
* that for stacks because fulfillers do not need explicit
* nodes, and matching is done by CAS'ing QNode.item field
* from non-null to null (for put) or vice versa (for take).
*/

/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;

transfer方法
  1. 如果队列为空或者头节点模式和自己的模式相同,尝试将自己增加到队列的等待者中,等待被满足或者被取消
  2. 如果队列包含了在等待的节点,并且本次调用是与之模式匹配的调用,尝试通过CAS修改等待节点item字段然后将其出队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/

QNode s = null; // constructed/reused as needed
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin

// 如果队列为空或者模式与头节点相同
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 如果有其他线程修改了tail,进入下一循环重读
if (t != tail) // inconsistent read
continue;
// 如果有其他线程修改了tail,尝试cas更新尾节点,进入下一循环重读
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
// 超时返回
if (timed && nanos <= 0L) // can't wait
return null;
// 构建一个新节点
if (s == null)
s = new QNode(e, isData);
// 尝试CAS设置尾节点的next字段指向自己
// 如果失败,重试
if (!t.casNext(null, s)) // failed to link in
continue;

// cas设置当前节点为尾节点
advanceTail(t, s); // swing tail and wait
// 等待匹配的节点
Object x = awaitFulfill(s, e, timed, nanos);
// 如果被取消,删除自己,返回null
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}

// 如果此节点没有被模式匹配的线程出队
// 那么自己进行出队操作
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;

} else { // complementary-mode
QNode m = h.next; // node to fulfill
// 数据不一致,重读
if (t != tail || m == null || h != head)
continue; // inconsistent read

Object x = m.item;
if (isData == (x != null) || // m already fulfilled m已经匹配成功了
x == m || // m cancelled m被取消了
!m.casItem(x, e)) { // lost CAS CAS竞争失败
// 上面三个条件无论哪一个满足,都证明m已经失效无用了,
// 需要将其出队
advanceHead(h, m); // dequeue and retry
continue;
}

// 成功匹配,依然需要将节点出队
advanceHead(h, m); // successfully fulfilled
// 唤醒匹配节点,如果它被阻塞了
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (head.next == s)
? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
: 0;
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
// item被修改后返回
// 如果put操作在此等待,item会被更新为null
// 如果take操作再次等待,item会由null变为一个值
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0) {
--spins;
Thread.onSpinWait();
}
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanos);
}
}

使用TransferQueue即公平模式插入节点,队列的变化如下:

注意匹配的时候item的变化。

public operations

SynchronousQueue类的公共操作都是依赖于transfer方法完成的,注意不同的方法调用transfer方法时提供的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}

------------------------------------------------------------------

public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}

public E poll() {
return transferer.transfer(null, true, 0);
}
核心要点
  1. 可以指定锁的公平性
  2. 队列内部不会存储元素,所以尽量避免使用add,offer此类立即返回的方法,除非有特殊需求

评论