Java信号量—Semaphore_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > Java信号量—Semaphore

Java信号量—Semaphore

 2018/1/4 15:36:06  aoyouzi  程序员俱乐部  我要评论(0)
  • 摘要:Semaphore用来控制同时访问某一资源的操作数量,或控制同时执行某个指定操作的数量。主要通过控制一组虚拟的“许可”,当需要执行操作时首先申请获取许可,如果还有剩余的许可并且获取成功,就执行操作;如果剩余许可为0,就阻塞当前线程;操作执行完成后释放许可,排队的阻塞线程可以被唤醒重新获取许可继续执行。这里提到排队,其实就是利用AQS的队列进行排队。咋一看跟CountDownLatch有点类似,都维护了一个计数器。不同的是,CountDownLatch一开始就通过await阻塞线程
  • 标签:Map 信号量 Java SEM

class="MsoNormal">Semaphore用来控制同时访问某一资源的操作数量,或控制同时执行某个指定操作的数量。主要通过控制一组虚拟的“许可”,当需要执行操作时首先申请获取许可,如果还有剩余的许可 并且获取成功,就执行操作;如果剩余许可为0,就阻塞当前线程;操作执行完成后释放许可,排队的阻塞线程可以被唤醒重新获取许可继续执行。这里提到排队,其实就是利用AQS的队列进行排队。

?

咋一看跟CountDownLatch有点类似,都维护了一个计数器。不同的是,CountDownLatch一开始就通过await阻塞线程,其他操作不停的对计数器减1(也可以大于1),直到为0时唤醒所有线程;Semaphore是执行操作之前对计数器减1(也可以大于1),执行完成之后释放许可对计数器加1。不难看出CountDownLatch只能使用一次,计数器为0后就不能再次使用了,而Semaphore有进有出,可以一直使用。

?

但Semaphore本质上也是基于AQS实现的,只是在重写AQS的方法时稍有不同。在详细分析Semaphore具体实现之前,先看看Semaphore是如何使用的。这里依旧以游戏为例,总所周知的大型网络游戏“魔兽世界”,在高峰期登陆游戏都需要排队,为什么呢?因为服务器资源有限,如果不做限制 服务器负载达到极限就会崩溃。这里我们用Semaphore来模拟实现“魔兽世界”中的排队,这里假设同一个服务器同一时间只能同时允许10个人同时在线,但现在有20位玩家在排队上线

Java代码??收藏代码
  1. /**?
  2. ?*?Created?by?gantianxing?on?2018/1/3.?
  3. ?*/??
  4. public?class?SemaphoreTest?{??
  5. ???
  6. ????public?static?void?main(String[]?args)?{??
  7. ????????//假设服务器只能承受10个人同时在线??
  8. ????????Semaphore?semaphore?=?new?Semaphore(10,true);??
  9. ????????//模拟20个玩家线程??
  10. ????????ExecutorService?executorService?=?Executors.newFixedThreadPool(20);??
  11. ????????for?(int?i=0;i<20;i++){??
  12. ????????????executorService.submit(new?WowPlayer(semaphore,i+""));??
  13. ????????}??
  14. ????????executorService.shutdown();??
  15. ????}??
  16. }??
  17. ???
  18. class?WowPlayer?implements?Runnable{??
  19. ????private?Semaphore?semaphore;??
  20. ????private?String?name;??
  21. ???
  22. ????public?WowPlayer(Semaphore?semaphore,String?name)?{??
  23. ????????this.semaphore?=?semaphore;??
  24. ????????this.name?=?name;??
  25. ????}??
  26. ???
  27. ????@Override??
  28. ????public?void?run()?{??
  29. ????????System.out.println("玩家:"+name+"开始排队");??
  30. ????????try?{??
  31. ????????????semaphore.acquire();//获取许可??
  32. ????????????try?{??
  33. ????????????????System.out.println("玩家:"?+?name?+?"进入游戏");??
  34. ????????????????Thread.sleep(new?Random().nextInt(10000));//模拟每位玩家游戏时长?10秒钟以内??
  35. ????????????????System.out.println("玩家:"?+?name?+?"离开游戏");??
  36. ????????????}catch?(Exception?e){??
  37. ????????????????//业务异常??
  38. ????????????????e.printStackTrace();??
  39. ????????????}finally?{??
  40. ????????????????//释放许可,最好在finally中释放??
  41. ????????????????semaphore.release();??
  42. ????????????}??
  43. ????????}?catch?(Exception?e)?{??
  44. ????????????e.printStackTrace();??
  45. ????????}??
  46. ???
  47. ????}??
  48. }??
  49. ???

?

执行main方法,打印日志如下(日志比较长,省略了部分):

?

Java代码??收藏代码
  1. -------前10个玩家不需要排队时长为0,也就是不用排队----??
  2. 玩家:1开始排队??
  3. 玩家:1进入游戏??
  4. ------省略其他8个??
  5. 玩家:9开始排队??
  6. 玩家:9进入游戏??
  7. -----到这里10个许可用完,后面需要登陆的玩家需要排队??
  8. 玩家:14开始排队??
  9. 玩家:18开始排队??
  10. 玩家:3开始排队??
  11. 玩家:11开始排队??
  12. 玩家:15开始排队??
  13. 玩家:13开始排队??
  14. 玩家:17开始排队??
  15. 玩家:10开始排队??
  16. 玩家:7开始排队??
  17. 玩家:19开始排队??
  18. ???
  19. -------等到有玩家离开游戏,排队的玩家才能进入游戏??
  20. ???
  21. 玩家:9离开游戏??
  22. 玩家:14进入游戏??
  23. 玩家:8离开游戏??
  24. 玩家:18进入游戏??
  25. 玩家:0离开游戏??
  26. 玩家:3进入游戏??
  27. 玩家:6离开游戏??
  28. 玩家:11进入游戏??
  29. 玩家:1离开游戏??
  30. 玩家:15进入游戏??
  31. ----省略其他日志---??
  32. ???

?

可以发现前10个玩家可以直接获得“许可”,排队时间为0?登陆后直接进入游戏;后面加入的10个玩家开始排队,为了公平性这里使用了Semaphore的公平构造方法;待前10个玩家有人离开游戏后,排队的10个玩家依次进入游戏。基本用法讲解完毕,下面开始Semaphore实现原理分析:

?

Semaphore实现原理

?

前文已经提到Semaphore是基于AQS实现的,其核心内部类就是实现AQS的子类,在Semaphore中有包含了公平实现和非公平实现。前面示例中为了保证游戏的公平性,排队使用的公平队列。这里需要提一下的是“公平”固然是好事,但是会有性能损失,主要原因是:线程在排队阻塞和被唤醒时都有上下文切换开销;而非公平的的实现,在加入队列前先检查是否存在“许可”,如果有 直接获取,相对公平实现 减少部分开销。所以在不需要严格保证排队顺序的情况下,建议都使用非公平信号量

?

在Semaphore内部类实现AQS的过程中,为了保证部分方法复用首先定义了一个公共的实现类Sync,然后又分别创建了公平实现FairSync和非公平实现NonfairSync基础自Sync类。

?

首先看Sync类的实现:

Java代码??收藏代码
  1. abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{??
  2. ????private?static?final?long?serialVersionUID?=?1192457210091910933L;??
  3. ???
  4. ????Sync(int?permits)?{??
  5. ????????//构造方法,用“许可”个数初始化AQS的State字段值??
  6. ????????setState(permits);??
  7. ????}??
  8. ???
  9. ????final?int?getPermits()?{??
  10. ????????return?getState();??
  11. ????}??
  12. ???
  13. ????//非公平?共享获取?“资源”方法,参数为尝试获取的“资源”个数??
  14. ????final?int?nonfairTryAcquireShared(int?acquires)?{??
  15. ????????for?(;;)?{??
  16. ????????????int?available?=?getState();??
  17. ????????????int?remaining?=?available?-?acquires;??
  18. ???
  19. ????????????if?(remaining?<?0?||??
  20. ????????????????????//利用自旋,原子方式修改AQS的state值??
  21. ????????????????????compareAndSetState(available,?remaining))??
  22. ????????????????return?remaining;??
  23. ????????}??
  24. ????}??
  25. ???
  26. ????//共享方式?释放“资源”方法??
  27. ????protected?final?boolean?tryReleaseShared(int?releases)?{??
  28. ????????for?(;;)?{??
  29. ????????????int?current?=?getState();??
  30. ????????????int?next?=?current?+?releases;??
  31. ????????????if?(next?<?current)?//?overflow??
  32. ????????????????throw?new?Error("Maximum?permit?count?exceeded");??
  33. ????????????if?(compareAndSetState(current,?next))??
  34. ????????????????return?true;??
  35. ????????}??
  36. ????}??
  37. ???
  38. ????//动态调整“资源个数”??
  39. ????final?void?reducePermits(int?reductions)?{??
  40. ????????for?(;;)?{??
  41. ????????????int?current?=?getState();??
  42. ????????????int?next?=?current?-?reductions;??
  43. ????????????if?(next?>?current)?//?underflow??
  44. ????????????????throw?new?Error("Permit?count?underflow");??
  45. ????????????if?(compareAndSetState(current,?next))??
  46. ????????????????return;??
  47. ????????}??
  48. ????}??
  49. ???
  50. ????//动态清空?所有“许可”??
  51. ????final?int?drainPermits()?{??
  52. ????????for?(;;)?{??
  53. ????????????int?current?=?getState();??
  54. ????????????if?(current?==?0?||?compareAndSetState(current,?0))??
  55. ????????????????return?current;??
  56. ????????}??
  57. ????}??
  58. }??
  59. ???

?

主要方法实现都比较简单,结合给出的注释很好理解。下面接着来看非公平的实现NonfairSync,继承自上述讲的Sync类:

Java代码??收藏代码
  1. static?final?class?NonfairSync?extends?Sync?{??
  2. ????????private?static?final?long?serialVersionUID?=?-2694183684443567898L;??
  3. ???
  4. ????????//构造方法,没有添加任何新操作??
  5. ????????NonfairSync(int?permits)?{??
  6. ????????????super(permits);??
  7. ????????}??
  8. ????????//获取资源方法,直接调用Sync定义的?非公平共享获取方法??
  9. ????????protected?int?tryAcquireShared(int?acquires)?{??
  10. ????????????return?nonfairTryAcquireShared(acquires);??
  11. ????????}??
  12. ????}??
  13. ???

?

最后看下公平的实现FairSync,同样继承自Sync类:

Java代码??收藏代码
  1. static?final?class?FairSync?extends?Sync?{??
  2. ????????private?static?final?long?serialVersionUID?=?2014338818796000944L;??
  3. ???
  4. ????????FairSync(int?permits)?{??
  5. ????????????super(permits);??
  6. ????????}??
  7. ????????protected?int?tryAcquireShared(int?acquires)?{??
  8. ????????????for?(;;)?{??
  9. ????????????????if?(hasQueuedPredecessors())??
  10. ????????????????????return?-1;??
  11. ????????????????int?available?=?getState();??
  12. ????????????????int?remaining?=?available?-?acquires;??
  13. ????????????????if?(remaining?<?0?||??
  14. ????????????????????compareAndSetState(available,?remaining))??
  15. ????????????????????return?remaining;??
  16. ????????????}??
  17. ????????}??
  18. }??

?

tryAcquireShared与非公平的实现区别不大,只多了一个hasQueuedPredecessors方法调用,该方法是AQS中定义的方法,主要作用就是判断当前线程是否是头节点或者队列为空,如果不是就进行排队。非公平的实现里如果尝试获取到“许可”,就无需加入队列排队了,这就是根本区别,Doug Lea大神只用了一行代码就实现这个区别,不可谓不巧妙。

?

到这里Semaphore对AQS使用内部类实现讲解完毕,下面开始看下Semaphore的核心方法,这些方法就很简单了,基本都是对上述三个内部类的方法调用,这里只列出几个核心方法即可,其他方法可以自行查阅。

?

Semaphore默认构造方法:

Java代码??收藏代码
  1. public?Semaphore(int?permits)?{??
  2. ????????sync?=?new?NonfairSync(permits);??
  3. }??

?

可以看到默认是调用AQS的非公平实现,毕竟性能会好些。主要作用就是使用参数“permits”初始化AQS的state字段。

?

Semaphore带公平或非公平参数构造方法:

Java代码??收藏代码
  1. public?Semaphore(int?permits,?boolean?fair)?{??
  2. ????????sync?=?fair???new?FairSync(permits)?:?new?NonfairSync(permits);??
  3. }??

?

主要就是根据参数fair,判断是创建AQS的公平实现还是非公平实现。

?

Semaphore的获取许可方法和释放许可方法:

Java代码??收藏代码
  1. public?void?acquire()?throws?InterruptedException?{??
  2. ????????sync.acquireSharedInterruptibly(1);??
  3. }??
  4. ???
  5. public?void?release()?{??
  6. ????????sync.releaseShared(1);??
  7. }??

可以看到Semaphore的获取许可方法,是调用的AQS的“共享可中断获取方法”acquireSharedInterruptibly,之后会再回调Semaphore中的tryAcquireShared方法。说明当线程在使用Semaphore时被阻塞,是可以手动被中断的。

?

另外需要注意的是Semaphore的内部类对AQS的实现是采用的“共享”方式,因为如果有足够的多的“许可”被释放,可以同时唤醒多个线程,这时典型的共享锁的运用场景。

?

总结

?

简单的总结Semaphore,就是它可以用来控制同时访问某一资源的操作数量,或控制同时执行某个指定操作的数量。有点像限流的阀门,在有些场景下可以被固定的线程池代替,比如:Executors.newFixedThreadPool(xx),但它可以比线程池的控制更加细粒度。另外Semaphore可以理解为一种共享的可中断锁。

?

http://moon-walker.iteye.com/blog/2406557

发表评论
用户名: 匿名