最近一直被队列的消费业务所困扰,先大致说下业务状况。
模块A产生数据通过队列传递给模块B处理,但是数据来自于定时任务,经常是瞬时上万条或者更多,而且模块B的消费有限速控制并且能力有限(消费业务使用的线程池),肯定需要时间消化。
那么带来的一个问题就是线程池的拒绝策略选哪种?
首先说下线程池的四种拒绝策略:
最初直观感受就是不能丢消息,用2吧,主线程跟着做业务。
久了,发现一个问题,主线程(暂且这么说吧,准确说是启动线程池的线程)一旦运行任务,即使线程池里的线程跑完任务都不会再进任务,饿着呢。直到主线程跑完一次业务,才能继续消费,分配给线程池任务。
问题很明显,业务流程比较耗时,主线程被占住了,线程池的一旦干完活,啥都干不了,都等着主线程消费队列的数据给新任务呢。
忍不了,但是分析其他三个策略,AbortPolicy直接抛异常,抛了能咋样,还是不知道要干啥;DiscardOldestPolicy丢弃老任务,丢消息,否了;DiscardPolicy丢弃,肯定否了。
于是查看拒绝策略源码,发现拒绝策略的这几个类真是够简洁了(只有两个方法),统一实现RejectedExecutionHandler接口,实现rejectedExecution方法(代码几行……),还有自己的构造方法(空的)。
然后分析自己的业务需求,总结:线程池的阻塞队列满了后,主线程啥都不干,就等着阻塞队列不满的时候,把任务扔给线程池。
看源码发现阻塞队列正好有个remainingCapacity接口,看看字面意思就知道是啥意思了,不过本着咱们猿们严谨的态度,继续深入看接口源码(这里不带着看了,确认是队列空余个数)。然后主线程只要不断获取空余个数,是0就继续获取,直到不是0为止。代码如下:
class="language-java" style="margin: 0px; padding: 0px; border-radius: 3px; font-family: Menlo, Monaco, Consolas, 'Andale Mono', 'lucida console', 'Courier New', monospace;">public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
while (e.getQueue().remainingCapacity() == 0);
e.execute(r);
}
}
Runnable r :主线程
ThreadPoolExecutor e:线程池
建议先看看其他四种策略的实现。
原以为这个问题比较复杂,结果只用几行代码搞定,不知道有没有坑……
ps:如果有问题,请指正交流。如果有更好的方案,欢迎指导。