一个因中断产生的死锁分析_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 一个因中断产生的死锁分析

一个因中断产生的死锁分析

 2013/11/3 18:25:48  hill007299  程序员俱乐部  我要评论(0)
  • 摘要:最近在一次压测过程中暴露出notifyclient的一个死锁问题,发生死锁的场景是消息的可靠异步发送,具体过程是:(生产者)消息发送线程拿到队列锁,当队列未满的时候写入消息,释放锁,当队列满的时候,释放锁,等待队列空条件。(消费者)刷盘线程拿到队列锁,当队列有数据的时候,取数据清空队列,释放锁,再把取出来的消息数据刷盘持久化;没数据的时候,释放锁,等待队列非空条件。这是一个典型的多生产者-单消费者的问题。起初我们通过review代码来看,都觉得不会发生死锁,因为在临界区域里面只用到了一把锁
  • 标签:一个 分析

class="MsoNormal" style="text-indent: 21.0pt;">最近在一次压测过程中暴露出notify client的一个死锁问题,发生死锁的场景是消息的可靠异步发送,具体过程是:

(生产者)消息发送线程拿到队列锁,当队列未满的时候写入消息,释放锁,当队列满的时候,释放锁,等待队列空条件。

消费者)刷盘线程拿到队列锁,当队列有数据的时候,取数据清空队列,释放锁,再把取出来的消息数据刷盘持久化;没数据的时候,释放锁,等待队列非空条件。

这是一个典型的多生产者-单消费者的问题。起初我们通过review代码来看,都觉得不会发生死锁,因为在临界区域里面只用到了一把锁,不会出现deadly embrace类型的死锁。

后来进一步了解到用户对notify client的使用方式,发现他们的用法比较特殊,用户会把N ms之内没有完成消息发送的任务,强行cancel掉。也就是说生产者可能会在某个时刻检测到interrupt标记位,响应interrupt,是否会产生死锁必须把interrupt的因素也给考虑进去。

一般来说,当捕获到InterruptedException之后,比较规范的做法是把InterruptedException抛给上层调用者;或者调用Thread.currentThread().interrupt(),重新把线程中断标记置为true(因为阻塞方法在抛出InterruptedException,会清除中断标记位),暂不处理中断,把中断留给线程后续处理。基本原则就是要让任务能够优雅地在一个合适的时机响应中断,而不能对中断毫不作为。

在这个案例里面,生产者选择了后者,暂不处理,通过Thread.currentThread().interrupt()重新设置中断标记。

在大部分情况下,这么做是不会有问题的,但是在这种情况下,问题很大。因为在enqueue里面,会响应中断的代码是this.notEmpty.await(),并且是在一个循环里,this.notEmpty.await()会在方法入口处检测是否有中断标记,如果有那么就抛InterruptedException,这样一来一旦抛出第一个InterruptedException,在catch方法块里执行Thread.currentThread().interrupt(),会导致在下一次循环里继续抛出InterruptedException。如果运气好的话,可能在某个时刻this.nextWriteBatch != null条件不成立,跳出循环。如果运气不好的话,可能就是一个死循环。

在这次死锁案例中,是属于运气不好的情况,因为InterruptedException是在this.notEmpty.await()Condition.await()会在执行过程中释放关联的锁)释放锁enqueueLock之前发生的也就是说生产者在释放锁之前陷入中断循环。唯一能让this.nextWriteBatch != null不成立的线程是消费者线程,消费者线程没拿到锁,没机会执行this.nextWriteBatch = null。这样一来这个中断循环就成了死循环了,他永远不会释放锁,其他线程会一直阻塞等待锁。

?

锁定义,以下代码都在同一个实例里

privatefinal Lock enqueueLock = new ReentrantLock();

privatefinal Condition notEmpty = this.enqueueLock.newCondition();

privatefinal Condition empty = this.enqueueLock.newCondition();

生产者代码

private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException {

??????? WriteBatch result = null;

??????? this.enqueueLock.lock();

??????? try {

??????????? // 如果没有启动,则先启动appender线程

??????????? this.startAppendThreadIfNessary();

??????????? if (this.nextWriteBatch == null) {

??????????????? result = this.newWriteBatch(writeCommand);

??????????????? this.empty.signalAll();

??????????? }

??????????? else {

??????????????? if (this.nextWriteBatch.canAppend(writeCommand)) {

??????????????????? this.nextWriteBatch.append(writeCommand);

??????????????????? result = this.nextWriteBatch;

??????????????? }

??????????????? else {

??????????????????? while (this.nextWriteBatch != null) {

???????????????? ???????try {

??????????????????????????? this.notEmpty.await();

??????????????????????? }

??????????????????????? catch (InterruptedException e) {

??????????????????????????? Thread.currentThread().interrupt();

??????????????????????? }

?????????????????? ?}

??????????????????? result = this.newWriteBatch(writeCommand);

??????????????????? this.empty.signalAll();

??????????????? }

??????????? }

??????????? if (!sync) {

??????????????? InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey);

??????????????? switch (writeCommand.opItem.op) {

??????????????? case OpItem.OP_ADD:

??????????????????? if (inflyWriteData == null) {

??????????????????????? this.inflyWrites.put(writeCommand.bytesKey, new InflyWriteData(writeCommand.data));

???????????????????}

??????????????????? else {

??????????????????????? // update and increase reference count;

??????????????????????? inflyWriteData.data = writeCommand.data;

??????????????????????? inflyWriteData.count++;

??????????????????? }

???????? ???????????break;

??????????????? case OpItem.OP_DEL:

??????????????????? // 无条件删除

??????????????????? if (inflyWriteData != null) {

??????????????????????? this.inflyWrites.remove(writeCommand.bytesKey);

??????????????????? }

??????????????? }

?????????? ?}

??????????? return result;

??????? }

??????? finally {

??????????? this.enqueueLock.unlock();

??????? }

}

消费者代码

??? publicvoid processQueue() {

??????? while (true) {

??????????? WriteBatch batch = null;

??????????? this.enqueueLock.lock();

?????????? ?try {

??????????????? while (true) {

??????????????????? if (this.nextWriteBatch != null) {

??????????????????????? batch = this.nextWriteBatch;

??????????????????????? this.nextWriteBatch = null;

??????????????????????? break;

??????????????????? }

???? ???????????????if (this.shutdown) {

??????????????????????? return;

??????????????????? }

??????????????????? try {

??????????????????????? this.empty.await();

??????????????????? }

??????????????????? catch (InterruptedException e) {

???????????????????? ???break;

??????????????????? }

??????????????? }

??????????????? this.notEmpty.signalAll();

??????????? }

??????????? finally {

??????????????? this.enqueueLock.unlock();

??????????? }

??????????? if (batch != null) {

??????????????? final DataFile dataFile = batch.dataFile;

??????????????? final LogFile logFile = batch.logFile;

??????????????? final List<WriteCommand> cmdList = batch.cmdList;

??????????????? try {

??????????????????? this.writeDataAndLog(batch, dataFile, logFile, cmdList);

??????????????????? this.processRemove(batch, dataFile, logFile);

??????????????? }

??????????????? finally {

??????????????????? batch.latch.countDown();

??????????????? }

??????????? }

??????? }

}

?

其实在java技术规范里面,并不推崇,也不提供简单粗暴的任务中断机制,强制要求一个任务立马停止。因为如果一个任务在执行过程中,被非正常取消的话,有可能会导致数据结构被破坏,数据不一致的情况发生。所以java推崇的是通过协作的方式来终止一个任务,一个线程可以向另外一个线程发起终止信号,但是具体如何终止,应该由被终止的线程来决定。被终止的线程可以通过检查终止信号,在一个合适的时机优雅退出。

比如一个任务可以设置一个volatilecancel标记,当要终止这个任务的时候,我们把cancel标记设置为true,告诉任务我们要终止了。任务在某个时候检查到这个终止标记,知道要终止的,把数据结构维护好,在合适的时机退出。

尽管可以通过设置用户自定义cancel标记来取消任务,但是也有可能任务调用了一些阻塞方法,比如Condition.await(),一旦阻塞可能就没机会去检测用户自定义的cancel标记,这样一来任务也就不能及时响应用户的取消操作。因此jdk提供了线程内置的interrupt标记,可以通过Thread.currentThread().interrupt()来设置,并且大部分的类库阻塞方法都能会检查这个中断标记位,在中断的时候抛出异常,以便让任务及时响应用户中断。

当然也有一种可能是任务没有使用自定义cancel标记,也没有调用能够抛出InterruptedException的方法,如果对这类任务调用Thread.currentThread().interrupt(),是不会产生预期效果的。因此调用方不应该随意调用Thread.currentThread().interrupt()来取消任务,除非他知道这个任务的中断策略。而作为任务代码编写者,要保证程序健壮,应该考虑一个合适的中断策略,能够在被中断的时候,尽可能及时响应中断,优雅的退出。

重新回到上面的例子结合业务场景,当用户调用中断的时候,是想取消发送消息任务。任务代码在this.notEmpty.await()检查到线程中断标记,抛出InterruptedException。因为数据结构还没有被破坏,数据状态是一致的,所以无需捕获异常,直接往上层抛出InterruptedException,释放锁,以消息发送失败告终;

private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException, InterruptedException {

??????? WriteBatch result = null;

??????? this.enqueueLock.lock();

??????? try {

??????????? // 如果没有启动,则先启动appender线程

??????????? this.startAppendThreadIfNessary();

??????????? if (this.nextWriteBatch == null) {

??????????????? result = this.newWriteBatch(writeCommand);

??????????????? this.empty.signalAll();

??????????? }

??????????? else {

??????? ????????if (this.nextWriteBatch.canAppend(writeCommand)) {

??????????????????? this.nextWriteBatch.append(writeCommand);

??????????????????? result = this.nextWriteBatch;

??????????????? }

??????????????? else {

??????????????????? while (this.nextWriteBatch != null) {

??????????????????????????? this.notEmpty.await();

??????????????????? }

??????????????????? result = this.newWriteBatch(writeCommand);

??????????????????? this.empty.signalAll();

??????????????? }

??????????? }

??????????? if (!sync) {

??????????????? InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey);

??????????????? switch (writeCommand.opItem.op) {

??????????????? case OpItem.OP_ADD:

??????????????????? if (inflyWriteData == null) {

???????????????? ???????this.inflyWrites.put(writeCommand.bytesKey, new InflyWriteData(writeCommand.data));

??????????????????? }

??????????????????? else {

??????????????????????? // update and increase reference count;

??????????????????????? inflyWriteData.data = writeCommand.data;

??????????????????????? inflyWriteData.count++;

??????????????????? }

??????????????????? break;

??????????????? case OpItem.OP_DEL:

??????????????????? // 无条件删除

??????????????????? if (inflyWriteData != null) {

?????????????????????? ?this.inflyWrites.remove(writeCommand.bytesKey);

??????????????????? }

??????????????? }

??????????? }

??????????? return result;

??????? }

??????? finally {

??????????? this.enqueueLock.unlock();

??????? }

}

如果在抛出中断异常的时候,数据结构处于不一致的状态,那么可以先把中断状态保存下来,等数据结构维护好之后再退出。比如下面的任务,子任务A和子任务B必须都执行才能保证数据结构不被破坏。

策略1: process声明中断异常,上层调用者必须考虑如何处理中断异常。Process执行完A之后被中断,直接撤销A,抛出中断异常,让调用者来处理中断。

??? publicvoid process() throws InterruptedException{

????????????? doA();

????????????? try {

???????????????????? Thread.sleep(1000);

????????????? } catch (InterruptedException e) {

???????????????????? undoA();

???????????????????? throw e;

????????????? }

????????????? doB();

?????? }

策略2process没有声明中断异常,当中断发生时,选择一不做二不休的策略。先保存中断状态,把AB都做完了。然后重新标记中断,把中断信号传递下去,让线程后面的逻辑来响应中断。

??? publicvoid process() {

????????????? boolean cancel=false;

????????????? doA();

????????????? try {

???????????????????? Thread.sleep(1000);

????????????? } catch (InterruptedException e) {

???????????????????? cancel=true;

????????????? }

????????????? doB();

????????????? if (cancel) {

???????????????????? Thread.currentThread().interrupt();

????????????? }

?

?????? }

发表评论
用户名: 匿名