抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

JAVA中的死锁

什么是死锁

​ 在多线程环境中,多个进程可以竞争有限数量的资源。当一个进程申请资源时,如果这时没有可用资源,那么这个进程进入等待状态。有时,如果所申请的资源被其他等待进程占有,那么该等待进程有可能再也无法改变状态。这种情况称为死锁

​ 在Java中使用多线程,就会有可能导致死锁问题。死锁会让程序一直住,不再程序往下执行。我们只能通过中止并重启的方式来让程序重新执行。

造成死锁的原因

  • 当前线程拥有其他线程需要的资源
  • 当前线程等待其他线程已拥有的资源
  • 都不放弃自己拥有的资源

死锁的必要条件

互斥

​ 进程要求对所分配的资源(如打印机)进行排他性控制,即在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。

不可剥夺

​ 进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能由获得该资源的进程自己来释放(只能是主动释放)。

请求与保持

​ 进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。

循环等待

​ 是指进程发生死锁后,必然存在一个进程–资源之间的环形链,通俗讲就是你等我的资源,我等你的资源,大家一直等。

死锁的分类

静态顺序型死锁

线程之间形成相互等待资源的环时,就会形成顺序死锁lock-ordering deadlock,多个线程试图以不同的顺序来获取相同的锁时,容易形成顺序死锁,如果所有线程以固定的顺序来获取锁,就不会出现顺序死锁问题

​ 经典案例是LeftRightDeadlock,两个方法,分别是leftRigth、rightLeft。如果一个线程调用leftRight,另一个线程调用rightLeft,且两个线程是交替执行的,就会发生死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class LeftRightDeadLock {

//左边锁
private static Object left = new Object();
//右边锁
private static Object right = new Object();

/**
* 现持有左边的锁,然后获取右边的锁
*/
public static void leftRigth() {
synchronized (left) {
System.out.println("leftRigth: left lock,threadId:" + Thread.currentThread().getId());
//休眠增加死锁产生的概率
sleep(100);
synchronized (right) {
System.out.println("leftRigth: right lock,threadId:" + Thread.currentThread().getId());
}
}
}

/**
* 现持有右边的锁,然后获取左边的锁
*/
public static void rightLeft() {
synchronized (right) {
System.out.println("rightLeft: right lock,threadId:" + Thread.currentThread().getId());
//休眠增加死锁产生的概率
sleep(100);
synchronized (left) {
System.out.println("rightLeft: left lock,threadId:" + Thread.currentThread().getId());
}
}
}

/**
* 休眠
*
* @param time
*/
private static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
//创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(() -> leftRigth());
executorService.execute(() -> rightLeft());
executorService.shutdown();
}
}

输出

1
2
leftRigth: left lock,threadId:12
rightLeft: right lock,threadId:13

我们发现,12号线程锁住了左边要向右边获取锁,13号锁住了右边,要向左边获取锁,因为两边都不释放自己的锁,互不相让,就产生了死锁。

解决方案

固定加锁的顺序(针对锁顺序死锁)

只要交换下锁的顺序,让线程来了之后先获取同一把锁,获取不到就等待,等待上一个线程释放锁再获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void leftRigth() {
synchronized (left) {
...
synchronized (right) {
...
}
}
}

public static void rightLeft() {
synchronized (left) {
...
synchronized (right) {
...
}
}
}

动态锁顺序型死锁

由于方法入参由外部传递而来,方法内部虽然对两个参数按照固定顺序进行加锁,但是由于外部传递时顺序的不可控,而产生锁顺序造成的死锁,即动态锁顺序死锁。

​ 上例告诉我们,交替的获取锁会导致死锁,且锁是固定的。有时候并锁的执行顺序并不那么清晰,参数导致不同的执行顺序。经典案例是银行账户转账,from账户向to账户转账,在转账之前先获取两个账户的锁,然后开始转账,如果这是to账户向from账户转账,角色互换,也会导致锁顺序死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 动态顺序型死锁
* 转账业务
*/
public class TransferMoneyDeadlock {

public static void transfer(Account from, Account to, int amount) {
//先锁住转账的账户
synchronized (from) {
System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
//休眠增加死锁产生的概率
sleep(100);
//在锁住目标账户
synchronized (to) {
System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
if (from.balance < amount) {
System.out.println("余额不足");
return;
} else {
from.debit(amount);
to.credit(amount);
System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
}
}
}
}

private static class Account {
String name;
int balance;

public Account(String name, int balance) {
this.name = name;
this.balance = balance;
}

void debit(int amount) {
this.balance = balance - amount;
}

void credit(int amount) {
this.balance = balance + amount;
}
}


/**
* 休眠
*
* @param time
*/
private static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//创建账户A
Account A = new Account("A", 100);
//创建账户B
Account B = new Account("B", 200);
//A -> B 的转账
executorService.execute(() -> transfer(A, B, 5));
//B -> A 的转账
executorService.execute(() -> transfer(B, A, 10));
executorService.shutdown();
}
}

输出

1
2
线程【12】获取【A】账户锁成功
线程【13】获取【B】账户锁成功

然后就没有然后了,产生了死锁,我们发现 因为对象的调用关系,产生了互相锁住资源的问题。

解决方案

​ 根据传入对象的hashCode硬性确定加锁顺序,消除可变性,避免死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.test.thread.deadlock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 动态顺序型死锁解决方案
*/
public class TransferMoneyDeadlock {
/**
* 监视器,第三把锁,为了方式HASH冲突
*/
private static Object lock = new Object();

/**
* 我们经过上一次得失败,明白了不能依赖参数名称简单的确定锁的顺序,因为参数是
* 具有动态性的,所以,我们改变一下思路,直接根据传入对象的hashCode()大小来
* 对锁定顺序进行排序(这里要明白的是如何排序不是关键,有序才是关键)。
*
* @param from
* @param to
* @param amount
*/
public static void transfer(Account from, Account to, int amount) {
/**
* 这里需要说明一下为什么不使用HashCode()因为HashCode方法可以被重写,
* 所以,我们无法简单的使用父类或者当前类提供的简单的hashCode()方法,
* 所以,我们就使用系统提供的identityHashCode()方法,该方法保证无论
* 你是否重写了hashCode方法,都会在虚拟机层面上调用一个名为JVM_IHashCode
* 的方法来根据对象的存储地址来获取该对象的hashCode(),HashCode如果不重写
* 的话,其实也是通过这个虚拟机层面上的方法,JVM_IHashCode()方法实现的
* 这个方法是用C++实现的。
*/
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
if (fromHash > toHash) {
//先锁住转账的账户
synchronized (from) {
System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
//休眠增加死锁产生的概率
sleep(100);
//在锁住目标账户
synchronized (to) {
System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
if (from.balance < amount) {
System.out.println("余额不足");
return;
} else {
from.debit(amount);
to.credit(amount);
System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
}
}
}
} else if (fromHash < toHash) {
//先锁住转账的账户
synchronized (to) {
System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
//休眠增加死锁产生的概率
sleep(100);
//在锁住目标账户
synchronized (from) {
System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
if (from.balance < amount) {
System.out.println("余额不足");
return;
} else {
from.debit(amount);
to.credit(amount);
System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
}
}
}
} else {
//如果传入对象的Hash值相同,那就加让加第三层锁
synchronized (lock) {
//先锁住转账的账户
synchronized (from) {
System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + from.name + "】账户锁成功");
//休眠增加死锁产生的概率
sleep(100);
//在锁住目标账户
synchronized (to) {
System.out.println("线程【" + Thread.currentThread().getId() + "】获取【" + to.name + "】账户锁成功");
if (from.balance < amount) {
System.out.println("余额不足");
return;
} else {
from.debit(amount);
to.credit(amount);
System.out.println("线程【" + Thread.currentThread().getId() + "】从【" + from.name + "】账户转账到【" + to.name + "】账户【" + amount + "】元钱成功");
}
}
}
}
}

}

private static class Account {
String name;
int balance;

public Account(String name, int balance) {
this.name = name;
this.balance = balance;
}

void debit(int amount) {
this.balance = balance - amount;
}

void credit(int amount) {
this.balance = balance + amount;
}
}


/**
* 休眠
*
* @param time
*/
private static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//创建账户A
Account A = new Account("A", 100);
//创建账户B
Account B = new Account("B", 200);
//A -> B 的转账
executorService.execute(() -> transfer(A, B, 5));
//B -> A 的转账
executorService.execute(() -> transfer(B, A, 10));
executorService.shutdown();
}
}

输出

1
2
3
4
5
6
线程【12】获取【A】账户锁成功
线程【12】获取【B】账户锁成功
线程【12】从【A】账户转账到【B】账户【5】元钱成功
线程【13】获取【B】账户锁成功
线程【13】获取【A】账户锁成功
线程【13】从【B】账户转账到【A】账户【10】元钱成功

协作对象间的死锁

在协作对象之间可能存在多个锁获取的情况,但是这些获取多个锁的操作并不像在LeftRightDeadLock或transferMoney中那么明显,这两个锁并不一定必须在同一个方法中被获取。如果在持有锁时调用某个外部方法,那么这就需要警惕死锁问题,因为在这个外部方法中可能会获取其他锁,或者阻塞时间过长,导致其他线程无法及时获取当前被持有的锁。

​ 上述两例中,在同一个方法中获取两个锁。实际上,锁并不一定在同一方法中被获取。经典案例,如出租车调度系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* 协作对象间的死锁
*/
public class CoordinateDeadlock {
/**
* Taxi 类
*/
static class Taxi {
private String location;
private String destination;
private Dispatcher dispatcher;

public Taxi(Dispatcher dispatcher, String destination) {
this.dispatcher = dispatcher;
this.destination = destination;
}

public synchronized String getLocation() {
return this.location;
}

/**
* 该方法先获取Taxi的this对象锁后,然后调用Dispatcher类的方法时,又需要获取
* Dispatcher类的this方法。
*
* @param location
*/
public synchronized void setLocation(String location) {
this.location = location;
System.out.println(Thread.currentThread().getName() + " taxi set location:" + location);
if (this.location.equals(destination)) {
dispatcher.notifyAvailable(this);
}
}
}

/**
* 调度类
*/
static class Dispatcher {
private Set<Taxi> taxis;
private Set<Taxi> availableTaxis;

public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}

public synchronized void notifyAvailable(Taxi taxi) {
System.out.println(Thread.currentThread().getName() + " notifyAvailable.");
availableTaxis.add(taxi);
}

/**
* 打印当前位置:有死锁风险
* 持有当前锁的时候,同时调用Taxi的getLocation这个外部方法;而这个外部方法也是需要加锁的
* reportLocation的锁的顺序与Taxi的setLocation锁的顺序完全相反
*/
public synchronized void reportLocation() {
System.out.println(Thread.currentThread().getName() + " report location.");
for (Taxi t : taxis) {
t.getLocation();
}
}

public void addTaxi(Taxi taxi) {
taxis.add(taxi);
}
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
final Dispatcher dispatcher = new Dispatcher();
final Taxi taxi = new Taxi(dispatcher, "软件园");
dispatcher.addTaxi(taxi);
//先获取dispatcher锁,然后是taxi的锁
executorService.execute(() -> dispatcher.reportLocation());
//先获取taxi锁,然后是dispatcher的锁
executorService.execute(() -> taxi.setLocation("软件园"));
executorService.shutdown();
}
}
解决方案

使用开放调用,开放调用指调用该方法不需要持有锁。

​ 开放调用,是指在调用某个方法时不需要持有锁。开放调用可以避免死锁,这种代码更容易编写。上述调度算法完全可以修改为开发调用,修改同步代码块的范围,使其仅用于保护那些涉及共享状态的操作,避免在同步代码块中执行方法调用。修改Dispatcher的reportLocation方法:

setLocation方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 开放调用,不持有锁期间进行外部方法调用
*
* @param location
*/
public void setLocation(String location) {
synchronized (this) {
this.location = location;
}
System.out.println(Thread.currentThread().getName() + " taxi set location:" + location);
if (this.location.equals(destination)) {
dispatcher.notifyAvailable(this);
}
}
reportLocation 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 同步块只包含对共享状态的操作代码
*/
public synchronized void reportLocation() {
System.out.println(Thread.currentThread().getName() + " report location.");
Set<Taxi> taxisCopy;
synchronized (this) {
taxisCopy = new HashSet<Taxi>(taxis);
}
for (Taxi t : taxisCopy) {
t.getLocation();
}
}

数据库死锁

​ 可能会发生死锁的更复杂的情况是数据库事务。一个数据库事务可能包含许多SQL更新请求,在事务期间更新记录时,将锁定该记录防止其他事务更新,直到这个事务完成。因此,同一事务中的每个更新请求可能会锁定数据库中的某些记录。

​ 如果多个事务同时执行,并且需要更新相同记录,则存在最终陷入死锁的风险。例如:

1
2
3
4
Transaction 1, request record 1, locks record 1 for update
Transaction 2, request record 2, locks record 2 for update
Transaction 1, request record 2, tries to lock record 2 for update.
Transaction 2, request record 1, tries to lock record 1 for update.

​ 由于加锁是在不同的请求中进行的,并且无法提前知道给定事务所需的全部锁,因此很难检测或防止数据库事务中的死锁。

预防死锁

在某些情况下,可以防止死锁。我将在本文中描述三种技术:

  1. Lock Ordering
  2. Lock Timeout
  3. Deadlock Detection

顺序锁

当多个线程需要相同的一些锁但以不同的顺序获取它们时,会发生死锁。

​ 如果确保所有锁始终由线程以相同的顺序获取,则不会发生死锁。看看这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
Thread 1:
lock A
lock B

Thread 2:
wait for A
lock C (when A locked)

Thread 3:
wait for A
wait for B
wait for C

​ 如果线程(如线程3)需要多个锁,则必须按照确定的顺序获取它们。在获得前面的锁之前,它不能获得在锁序列中后面的锁。

​ 例如,线程2或线程3都不能锁定C,直到它们先锁定A. 由于线程1锁定A,因此线程2和3必须先等待锁定A解锁。然后他们必须成功锁定A,然后才能尝试锁定B或C.

​ 顺序锁是一种简单而有效的死锁预防机制。但是,只有在获取任何锁之前先了解所需要的所有锁时才能使用它,而情况并非总是如此。

超时锁

​ 另一种死锁预防机制是对尝试获取锁设置超时,这意味着尝试获取锁的线程只会在放弃之前尝试给定的时间。如果一个线程在给定的超时内没有成功获取所有必要的锁,它将回滚,释放所有锁,等待一段随机的时间,然后重试。等待随机数量的时间使得其他线程拥有获取所需要的锁的机会,从而让应用程序继续运行而不被锁定。

下面是两个线程尝试以不同顺序获取相同的两个锁的示例,其中线程回滚并重试:

1
2
3
4
5
6
7
8
9
10
11
12
13
Thread 1 locks A
Thread 2 locks B

Thread 1 attempts to lock B but is blocked
Thread 2 attempts to lock A but is blocked

Thread 1's lock attempt on B times out
Thread 1 backs up and releases A as well
Thread 1 waits randomly (e.g. 257 millis) before retrying.

Thread 2's lock attempt on A times out
Thread 2 backs up and releases B as well
Thread 2 waits randomly (e.g. 43 millis) before retrying.

​ 在上面的示例中,线程2将在线程1重新获取锁之前尝试大约200毫秒去获取锁,因此可能成功获取两个锁。然后线程1将在重新尝试获取锁A时继续等待。当线程2完成工作解锁后,线程1也将能够同时获取两个锁(除非线程2或另一个线程获取其中的锁)。

​ 要记住的一个问题是,仅仅因为超时获取锁失败,并不一定意味着线程已经发生死锁。它也可能只意味着持有锁的线程(导致另一个线程超时)需要很长时间才能完成其任务。

​ 此外,如果有足够多的线程竞争相同的资源,即使发生了超时和回滚,他们仍然有可能一次又一次地尝试同时获取资源。在重试之前,等待0到500毫秒的2个线程可能不会发生这种情况,但是对于10或20个线程来说,情况就不同了,这比发生两个线程在重试之前等待相同时间(或者足够接近导致发生此问题)的可能性要高得多。

​ 锁超时机制的一个问题是无法为进入Java中的同步块设置超时,您必须创建自定义锁类或使用Java 5 java.util.concurrency包中的并发结构之一。编写自定义锁并不困难,但它超出了本文的范围。

下面使用j.u.c包中的Lock构造一个获取多个锁的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import util.SleepUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
* @author shallowinggg
*/
public class MultiLock {
/**
* 请求获取的锁列表
*/
private List<Lock> locks = new ArrayList<>();

/**
* 已经获取到的锁
*/
private List<Lock> acquiredLocks = new ArrayList<>();

/**
* 锁获取超时时间,单位为mills
*/
private long time = DEFAULT_TIME;

/**
* 获取锁失败最长等待重试时间,默认为500ms
*/
private int maxWaitTime = DEFAULT_MAX_WAIT_TIME;

/**
* 生成随机数
*/
private Random random = new Random();

private static final int DEFAULT_MAX_WAIT_TIME = 500;

private static final long DEFAULT_TIME = 100;

public MultiLock(List<Lock> locks) {
this.locks = locks;
}

public MultiLock(Lock... locks) {
Collections.addAll(this.locks, locks);
}

/**
* 获取所需要的所有锁
* 如果无法一次成功获取所有锁,则释放所有已经获取的锁,并且等待一段时间然后重试
*/
public void lock() {
final List<Lock> locks = this.locks;
boolean acquireLock = false;

retry:
while (!acquireLock) {
for (Lock lock : locks) {
try {
if (lock.tryLock(time, TimeUnit.MILLISECONDS)) {
acquiredLocks.add(lock);
} else {
rollback();
long waitTime = random.nextInt(maxWaitTime);
SleepUtils.mills(waitTime);

//此处为了在测试时观察重试,实际应用可以改为记录日志
System.out.println(Thread.currentThread().getName() + " retry");
continue retry;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//如果获取锁的过程中线程被打断,则回滚释放之前获取到的锁
if(Thread.currentThread().isInterrupted()) {
rollback();
}
}
}

//成功获取所有锁,退出循环
if (locks.size() == acquiredLocks.size()) {
acquireLock = true;
}
}
}

/**
* 释放获取的所有锁
* 如果尚未成功获取锁,调用unlock()将抛出IllegalMonitorStateException
*/
public void unlock() {
if (locks.size() != acquiredLocks.size()) {
throw new IllegalMonitorStateException("Not acquire all need lock!");
}
acquiredLocks.forEach(Lock::unlock);
acquiredLocks.clear();
}

/**
* 尝试一次获取所有锁
* @return true 如果获取成功,否则返回false
*/
public boolean tryLock() {
final List<Lock> locks = this.locks;

for (Lock lock : locks) {
try {
if (lock.tryLock(time, TimeUnit.NANOSECONDS)) {
acquiredLocks.add(lock);
} else {
rollback();
return false;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(Thread.currentThread().isInterrupted()) {
rollback();
}
}
}
return true;
}

/**
* 失败回滚,释放之前获取的所有锁
*/
private void rollback() {
acquiredLocks.forEach(Lock::unlock);
acquiredLocks.clear();
}

public void setTime(long time) {
this.time = time;
}

public long getTime() {
return time;
}

public int getMaxWaitTime() {
return maxWaitTime;
}

public void setMaxWaitTime(int maxWaitTime) {
this.maxWaitTime = maxWaitTime;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.junit.Test;
import p5.MultiLock;
import util.SleepUtils;

import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MultiLockTest {
@Test
public void test() throws Exception {
Lock lock1 = new ReentrantLock();
Lock lock2 = new ReentrantLock();
Lock lock3 = new ReentrantLock();
Lock lock4 = new ReentrantLock();
Lock lock5 = new ReentrantLock();
Lock lock6 = new ReentrantLock();
Lock lock7 = new ReentrantLock();
Lock lock8 = new ReentrantLock();

MultiLock locks1 = new MultiLock(lock1, lock2, lock3, lock4, lock5, lock6, lock7);
MultiLock locks2 = new MultiLock(lock2, lock3, lock4, lock5, lock6, lock7, lock8);

Thread thread1 = new Thread(() -> {
System.out.println("begin acquire locks1 @ " + new Date());
locks1.lock();
try {
System.out.println("acquire locks1 successfully @ " + new Date());
SleepUtils.sleep(2);
} finally {
locks1.unlock();
}
}, "Thread-1");
Thread thread2 = new Thread(() -> {
System.out.println("begin acquire locks2 @ " + new Date());
locks2.lock();
try {
System.out.println("acquire locks2 successfully @ " + new Date());
SleepUtils.sleep(2);
} finally {
locks2.unlock();
}
}, "Thread-2");

thread1.start();
thread2.start();

thread1.join();
thread2.join();
}
}

输出如下:

1
2
3
4
5
6
7
8
9
begin acquire locks1 @ Tue Feb 26 22:50:40 CST 2019
begin acquire locks2 @ Tue Feb 26 22:50:40 CST 2019
acquire locks1 successfully @ Tue Feb 26 22:50:40 CST 2019
Thread-2 retry
Thread-2 retry
Thread-2 retry
Thread-2 retry
Thread-2 retry
acquire locks2 successfully @ Tue Feb 26 22:50:42 CST 2019

死锁检测

锁检测是一种较重(开销较大)的死锁防止机制,是针对无法进行顺序锁定,并且锁超时不可行的情况开发出的一种方法。

​ 每次线程获取锁时,都会在线程和锁的数据结构(map,graph等)中标明。另外,每当线程请求锁时,也在该数据结构中被记录。

​ 当线程请求锁但请求被拒绝时,线程可以遍历锁图以检查是否发生了死锁。例如,如果线程A请求锁7,但线程B持有锁7,则线程A可以检查线程B此时是否在请求线程A已持有的任何锁(如果有)。如果线程B请求了,则发生了死锁(线程A已持有锁1,请求锁7,线程B已持有锁7,请求锁1)

​ 当然,死锁场景可能比两个持有彼此锁的线程复杂得多。线程A可能等待线程B,线程B等待线程C,线程C等待线程D,线程D等待线程A。为了使线程A检测到死锁,它必须通过线程B传递检查所有请求的锁。线程A将遍历线程B请求的锁,到线程C,然后到线程D,从中找到是否有线程A本身持有的锁之一,然后它才知道是否发生了死锁。

下面是4个线程(A,B,C和D)获取和请求的锁图。像这样的数据结构,可用于检测死锁。

那么如果检测到死锁,线程会做什么?

​ 一种可能的操作是释放所有锁,回滚,等待一段随机时间,然后重试。这类似于更简单的锁超时机制,除了线程仅在实际发生死锁时进行回滚,而不是因为他们的锁请求超时。但是,如果许多线程竞争相同的锁,即使它们回滚并重试,它们也可能反复陷入死锁。

​ 更好的选择是确定或分配线程的优先级,以便只有一个(或几个)线程回滚。其余的线程继续获取他们需要的锁,就像没有发生死锁一样。如果分配给线程的优先级是固定的,则相同的线程将始终具有更高的优先级。为避免这种情况,您可以在检测到死锁时随机分配优先级。

死锁问题排查

​ 拿动态顺序型死锁举例,其他的都一样

1
2
3
4
5
6
7
8
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Account A = new Account("A", 100);
Account B = new Account("B", 200);
executorService.execute(() -> transfer(A, B, 5));
executorService.execute(() -> transfer(B, A, 10));
executorService.shutdown();
}

死锁的现象

系统越来越卡,没有任何报错信息,随机性比较高

排查死锁

使用 jps + jstack
  1. 在 window或linux中使用jps + jstack命令

  1. 找到可能发生死锁的类对应的PID

    我们对应的类是TransferMoneyDeadlock PID是 13964

使用jstack -l PID

执行 jstack -l 13964 命令

我们观察BLOCKED 就表示阻塞状态

  • pool-1-thread-2 等待锁 <0x00000000d673baa8>并且已经获取了锁 <0x00000000d673baf0>
  • pool-1-thread-1 等待锁 <0x00000000d673baf0> 并且已经获取了锁<0x00000000d673baa8>

我们发现他们互相持有各自的锁,并且想获取对方的锁,这就是明显的死锁。

使用jconsole

使用命令打开jconsole

打开jconsole界面工具选择我们需要检测的类

选择检查死锁

点击检查死锁

评论