class="MsoNormal" style="text-indent: 21.0pt;">最近在一次压测过程中暴露出notify client的一个死锁问题,发生死锁的场景是消息的可靠异步发送,具体过程是:
(生产者)消息发送线程拿到队列锁,当队列未满的时候写入消息,释放锁,当队列满的时候,释放锁,等待队列空条件。
(消费者)刷盘线程拿到队列锁,当队列有数据的时候,取数据清空队列,释放锁,再把取出来的消息数据刷盘持久化;没数据的时候,释放锁,等待队列非空条件。
这是一个典型的多生产者-单消费者的问题。起初我们通过review代码来看,都觉得不会发生死锁,因为在临界区域里面只用到了一把锁,不会出现deadly embrace类型的死锁。
后来进一步了解到用户对notify client的使用方式,发现他们的用法比较特殊,用户会把N ms之内没有完成消息发送的任务,强行cancel掉。也就是说生产者可能会在某个时刻检测到interrupt标记位,响应interrupt,是否会产生死锁必须把interrupt的因素也给考虑进去。
一般来说,当捕获到InterruptedException之后,比较规范的做法是把InterruptedException抛给上层调用者;或者调用Thread.currentThread().interrupt(),重新把线程中断标记置为true(因为阻塞方法在抛出InterruptedException,会清除中断标记位),暂不处理中断,把中断留给线程后续处理。基本原则就是要让任务能够优雅地在一个合适的时机响应中断,而不能对中断毫不作为。
在这个案例里面,生产者选择了后者,暂不处理,通过Thread.currentThread().interrupt()重新设置中断标记。
在大部分情况下,这么做是不会有问题的,但是在这种情况下,问题很大。因为在enqueue里面,会响应中断的代码是this.notEmpty.await(),并且是在一个循环里,this.notEmpty.await()会在方法入口处检测是否有中断标记,如果有那么就抛InterruptedException,这样一来一旦抛出第一个InterruptedException,在catch方法块里执行Thread.currentThread().interrupt(),会导致在下一次循环里继续抛出InterruptedException。如果运气好的话,可能在某个时刻this.nextWriteBatch != null条件不成立,跳出循环。如果运气不好的话,可能就是一个死循环。
在这次死锁案例中,是属于运气不好的情况,因为InterruptedException是在this.notEmpty.await()(Condition.await()会在执行过程中释放关联的锁)释放锁enqueueLock之前发生的,也就是说生产者在释放锁之前陷入中断循环。唯一能让this.nextWriteBatch != null不成立的线程是消费者线程,消费者线程没拿到锁,没机会执行this.nextWriteBatch = null。这样一来这个中断循环就成了死循环了,他永远不会释放锁,其他线程会一直阻塞等待锁。
?
锁定义,以下代码都在同一个实例里
privatefinal Lock enqueueLock = new ReentrantLock();
privatefinal Condition notEmpty = this.enqueueLock.newCondition();
privatefinal Condition empty = this.enqueueLock.newCondition();
生产者代码
private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException {
??????? WriteBatch result = null;
??????? this.enqueueLock.lock();
??????? try {
??????????? // 如果没有启动,则先启动appender线程
??????????? this.startAppendThreadIfNessary();
??????????? if (this.nextWriteBatch == null) {
??????????????? result = this.newWriteBatch(writeCommand);
??????????????? this.empty.signalAll();
??????????? }
??????????? else {
??????????????? if (this.nextWriteBatch.canAppend(writeCommand)) {
??????????????????? this.nextWriteBatch.append(writeCommand);
??????????????????? result = this.nextWriteBatch;
??????????????? }
??????????????? else {
??????????????????? while (this.nextWriteBatch != null) {
???????????????? ???????try {
??????????????????????????? this.notEmpty.await();
??????????????????????? }
??????????????????????? catch (InterruptedException e) {
??????????????????????????? Thread.currentThread().interrupt();
??????????????????????? }
?????????????????? ?}
??????????????????? result = this.newWriteBatch(writeCommand);
??????????????????? this.empty.signalAll();
??????????????? }
??????????? }
??????????? if (!sync) {
??????????????? InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey);
??????????????? switch (writeCommand.opItem.op) {
??????????????? case OpItem.OP_ADD:
??????????????????? if (inflyWriteData == null) {
??????????????????????? this.inflyWrites.put(writeCommand.bytesKey, new InflyWriteData(writeCommand.data));
???????????????????}
??????????????????? else {
??????????????????????? // update and increase reference count;
??????????????????????? inflyWriteData.data = writeCommand.data;
??????????????????????? inflyWriteData.count++;
??????????????????? }
???????? ???????????break;
??????????????? case OpItem.OP_DEL:
??????????????????? // 无条件删除
??????????????????? if (inflyWriteData != null) {
??????????????????????? this.inflyWrites.remove(writeCommand.bytesKey);
??????????????????? }
??????????????? }
?????????? ?}
??????????? return result;
??????? }
??????? finally {
??????????? this.enqueueLock.unlock();
??????? }
}
消费者代码
??? publicvoid processQueue() {
??????? while (true) {
??????????? WriteBatch batch = null;
??????????? this.enqueueLock.lock();
?????????? ?try {
??????????????? while (true) {
??????????????????? if (this.nextWriteBatch != null) {
??????????????????????? batch = this.nextWriteBatch;
??????????????????????? this.nextWriteBatch = null;
??????????????????????? break;
??????????????????? }
???? ???????????????if (this.shutdown) {
??????????????????????? return;
??????????????????? }
??????????????????? try {
??????????????????????? this.empty.await();
??????????????????? }
??????????????????? catch (InterruptedException e) {
???????????????????? ???break;
??????????????????? }
??????????????? }
??????????????? this.notEmpty.signalAll();
??????????? }
??????????? finally {
??????????????? this.enqueueLock.unlock();
??????????? }
??????????? if (batch != null) {
??????????????? final DataFile dataFile = batch.dataFile;
??????????????? final LogFile logFile = batch.logFile;
??????????????? final List<WriteCommand> cmdList = batch.cmdList;
??????????????? try {
??????????????????? this.writeDataAndLog(batch, dataFile, logFile, cmdList);
??????????????????? this.processRemove(batch, dataFile, logFile);
??????????????? }
??????????????? finally {
??????????????????? batch.latch.countDown();
??????????????? }
??????????? }
??????? }
}
?
其实在java技术规范里面,并不推崇,也不提供简单粗暴的任务中断机制,强制要求一个任务立马停止。因为如果一个任务在执行过程中,被非正常取消的话,有可能会导致数据结构被破坏,数据不一致的情况发生。所以java推崇的是通过协作的方式来终止一个任务,一个线程可以向另外一个线程发起终止信号,但是具体如何终止,应该由被终止的线程来决定。被终止的线程可以通过检查终止信号,在一个合适的时机优雅退出。
比如一个任务可以设置一个volatile的cancel标记,当要终止这个任务的时候,我们把cancel标记设置为true,告诉任务我们要终止了。任务在某个时候检查到这个终止标记,知道要终止的,把数据结构维护好,在合适的时机退出。
尽管可以通过设置用户自定义的cancel标记来取消任务,但是也有可能任务调用了一些阻塞方法,比如Condition.await(),一旦阻塞可能就没机会去检测用户自定义的cancel标记,这样一来任务也就不能及时响应用户的取消操作。因此jdk提供了线程内置的interrupt标记,可以通过Thread.currentThread().interrupt()来设置,并且大部分的类库阻塞方法都能会检查这个中断标记位,在中断的时候抛出异常,以便让任务及时响应用户中断。
当然也有一种可能是任务没有使用自定义cancel标记,也没有调用能够抛出InterruptedException的方法,如果对这类任务调用Thread.currentThread().interrupt(),是不会产生预期效果的。因此调用方不应该随意调用Thread.currentThread().interrupt()来取消任务,除非他知道这个任务的中断策略。而作为任务代码编写者,要保证程序健壮,应该考虑一个合适的中断策略,能够在被中断的时候,尽可能及时响应中断,优雅的退出。
重新回到上面的例子,结合业务场景,当用户调用中断的时候,是想取消发送消息任务。任务代码在this.notEmpty.await()检查到线程中断标记,抛出InterruptedException。因为数据结构还没有被破坏,数据状态是一致的,所以无需捕获异常,直接往上层抛出InterruptedException,释放锁,以消息发送失败告终;
private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException, InterruptedException {
??????? WriteBatch result = null;
??????? this.enqueueLock.lock();
??????? try {
??????????? // 如果没有启动,则先启动appender线程
??????????? this.startAppendThreadIfNessary();
??????????? if (this.nextWriteBatch == null) {
??????????????? result = this.newWriteBatch(writeCommand);
??????????????? this.empty.signalAll();
??????????? }
??????????? else {
??????? ????????if (this.nextWriteBatch.canAppend(writeCommand)) {
??????????????????? this.nextWriteBatch.append(writeCommand);
??????????????????? result = this.nextWriteBatch;
??????????????? }
??????????????? else {
??????????????????? while (this.nextWriteBatch != null) {
??????????????????????????? this.notEmpty.await();
??????????????????? }
??????????????????? result = this.newWriteBatch(writeCommand);
??????????????????? this.empty.signalAll();
??????????????? }
??????????? }
??????????? if (!sync) {
??????????????? InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey);
??????????????? switch (writeCommand.opItem.op) {
??????????????? case OpItem.OP_ADD:
??????????????????? if (inflyWriteData == null) {
???????????????? ???????this.inflyWrites.put(writeCommand.bytesKey, new InflyWriteData(writeCommand.data));
??????????????????? }
??????????????????? else {
??????????????????????? // update and increase reference count;
??????????????????????? inflyWriteData.data = writeCommand.data;
??????????????????????? inflyWriteData.count++;
??????????????????? }
??????????????????? break;
??????????????? case OpItem.OP_DEL:
??????????????????? // 无条件删除
??????????????????? if (inflyWriteData != null) {
?????????????????????? ?this.inflyWrites.remove(writeCommand.bytesKey);
??????????????????? }
??????????????? }
??????????? }
??????????? return result;
??????? }
??????? finally {
??????????? this.enqueueLock.unlock();
??????? }
}
如果在抛出中断异常的时候,数据结构处于不一致的状态,那么可以先把中断状态保存下来,等数据结构维护好之后再退出。比如下面的任务,子任务A和子任务B必须都执行才能保证数据结构不被破坏。
策略1: process声明中断异常,上层调用者必须考虑如何处理中断异常。Process在执行完A之后被中断,直接撤销A,抛出中断异常,让调用者来处理中断。
??? publicvoid process() throws InterruptedException{
????????????? doA();
????????????? try {
???????????????????? Thread.sleep(1000);
????????????? } catch (InterruptedException e) {
???????????????????? undoA();
???????????????????? throw e;
????????????? }
????????????? doB();
?????? }
策略2:process没有声明中断异常,当中断发生时,选择一不做二不休的策略。先保存中断状态,把A、B都做完了。然后重新标记中断,把中断信号传递下去,让线程后面的逻辑来响应中断。
??? publicvoid process() {
????????????? boolean cancel=false;
????????????? doA();
????????????? try {
???????????????????? Thread.sleep(1000);
????????????? } catch (InterruptedException e) {
???????????????????? cancel=true;
????????????? }
????????????? doB();
????????????? if (cancel) {
???????????????????? Thread.currentThread().interrupt();
????????????? }
?
?????? }