当我们创建线程池并且提交任务失败时,线程池会回调RejectedExecutionHandler接口的rejectedExecution(Runnable task, ThreadPoolExecutor executor)方法来处理线程池处理失败的任务,其中task 是用户提交的任务,而executor是当前执行的任务的线程池。可以通过代码的方式来验证。
1、线程池工厂:
?
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76class="java keyword" style="overflow: visible !important; font-size: 1em !important; height: auto !important; font-family: Consolas, 'Bitstream Vera Sans Mono', 'Courier New', Courier, monospace !important; width: auto !important; vertical-align: baseline !important; float: none !important; font-weight: bold !important; color: #006699 !important; font-style: normal !important; text-align: left !important; margin: 0px; line-height: 1.1em !important; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; border: 0px; padding: 0px !important;">package
com.threadpool;
?
import
java.util.List;
import
java.util.concurrent.ThreadPoolExecutor;
?
/**
?
* 线程池工厂方法
?
* @author
?
*
?
*/
public
class
ThreadPoolFactory {
????
?
????
//线程池
????
private
static
ThreadPoolExecutor? pool;???
????
//自身对象
????
private
static
ThreadPoolFactory factory;
????
?
????
/**
?????
* 私有构造函数
?????
*/
????
private
ThreadPoolFactory(){??? }
????
?
????
/**
?????
* 获取工厂对象
?????
* @param config
?????
* @return
?????
*/
????
public
static
ThreadPoolFactory getInstance(ThreadPoolConfig config){
????????
if
(factory ==
null
){
????????????
factory =
new
ThreadPoolFactory();
????????
}
????????
?
????????
if
(pool ==
null
){
????????????
?
????????????
if
(config.getHandler() ==
null
){
????????????????
pool =
new
ThreadPoolExecutor(config.getCorePoolSize(),
????????????????????????
config.getMaximumPoolSize(),config.getKeepAliveTime(),
????????????????????????
config.getUnit(),config.getWorkQueue());
????????????
}
else
{
????????????????
pool =
new
ThreadPoolExecutor(config.getCorePoolSize(),
????????????????????????
config.getMaximumPoolSize(),config.getKeepAliveTime(),
????????????????????????
config.getUnit(),config.getWorkQueue(),config.getHandler());
????????????
}
????????
}??????
????????
System.out.println(
"pool? create= "
+pool.toString());
????????
return
factory;
????
}
????
?
????
/**
?????
* 添加线程池任务
?????
* @param run
?????
*/
????
public
synchronized
void
addTask(Runnable run){
????????
pool.execute(run);
????
}
????
?
????
/**
?????
* 添加线程池任务
?????
* @param runs
?????
*/
????
public
synchronized
void
addTask(List<Runnable> runs){
????????
if
(runs !=
null
){
????????????
for
(Runnable r:runs){
????????????????
this
.addTask(r);
????????????
}
????????
}
????
}
????
?
????
/**
?????
* 关闭线程池
?????
*/
????
public
void
closePool(){
????????
pool.shutdown();
????
}
????
?
}
2、线程池配置文件类:
?
?
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80package
com.threadpool;
?
import
java.util.concurrent.BlockingQueue;
import
java.util.concurrent.RejectedExecutionHandler;
import
java.util.concurrent.TimeUnit;
?
/**
?
* 线程池配置类
?
* @author
?
*
?
*/
public
class
ThreadPoolConfig {
????
//池中所保存的线程数,包括空闲线程。
????
private
int
corePoolSize;
????
//池中允许的最大线程数。
????
private
int
maximumPoolSize;
????
//当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
????
private
long
keepAliveTime;
????
//参数的时间单位。
????
private
TimeUnit unit;
????
//执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
????
private
BlockingQueue<Runnable> workQueue;
????
//由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
????
private
RejectedExecutionHandler handler;
????
//配置文件自身对象
????
private
static
ThreadPoolConfig config;
????
/**
?????
* 单例模式
?????
*/
????
private
ThreadPoolConfig(){
????????
?
????
}
????
?
????
/**
?????
* 获取配置文件对象
?????
* @return
?????
*/
????
public
static
ThreadPoolConfig getInstance(){
????????
if
(config ==
null
){
????????????
config =
new
ThreadPoolConfig();
????????
}??????
????????
return
config;
????
}??
????
public
int
getCorePoolSize() {
????????
return
corePoolSize;
????
}
????
public
void
setCorePoolSize(
int
corePoolSize) {
????????
this
.corePoolSize = corePoolSize;
????
}
????
public
int
getMaximumPoolSize() {
????????
return
maximumPoolSize;
????
}
????
public
void
setMaximumPoolSize(
int
maximumPoolSize) {
????????
this
.maximumPoolSize = maximumPoolSize;
????
}
????
public
long
getKeepAliveTime() {
????????
return
keepAliveTime;
????
}
????
public
void
setKeepAliveTime(
long
keepAliveTime) {
????????
this
.keepAliveTime = keepAliveTime;
????
}
????
public
TimeUnit getUnit() {
????????
return
unit;
????
}
????
public
void
setUnit(TimeUnit unit) {
????????
this
.unit = unit;
????
}
????
public
BlockingQueue<Runnable> getWorkQueue() {
????????
return
workQueue;
????
}
????
public
void
setWorkQueue(BlockingQueue<Runnable> workQueue) {
????????
this
.workQueue = workQueue;
????
}
????
public
RejectedExecutionHandler getHandler() {
????????
return
handler;
????
}
????
public
void
setHandler(RejectedExecutionHandler handler) {
????????
this
.handler = handler;
????
}??
}
3、简单任务类:
?
?
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28package
com.test;
?
/**
?
* 任务线程
?
* @author
?
*
?
*/
public
class
ThreadTask
extends
Thread {
????
?
????
public
ThreadTask(String name){
????????
super
(name);
????
}
????
?
????
@SuppressWarnings
(
"static-access"
)
????
@Override
????
public
void
run() {
????????
// TODO Auto-generated method stub
????????
System.out.println(
this
.getName().toString() +
", will sleep 0 s"
);
????????
try
{
????????????
this
.sleep(
1
*
10
);
????????
}
catch
(InterruptedException e) {
????????????
// TODO Auto-generated catch block
????????????
e.printStackTrace();
????????
}
????????
System.out.println(
this
.getName().toString() +
", I am wakeup now "
);
????
}
?
}
4、异常处理接口实现类:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22package
com.threadpool;
?
import
java.util.concurrent.RejectedExecutionHandler;
import
java.util.concurrent.ThreadPoolExecutor;
?
/**
?
* 线程池异常处理类
?
* @author
?
*
?
*/
public
class
MyRejectedExecutionHandler
implements
RejectedExecutionHandler {
?
????
@Override
????
public
void
rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
????????
// TODO Auto-generated method stub
????????
System.out.println(
"Begin exception handler-----------"
);
????????
//执行失败任务
????????
new
Thread(task,
"exception by pool"
).start();
????????
//打印线程池的对象
????????
System.out.println(
"The pool RejectedExecutionHandler = "
+executor.toString());
????
}
}
5、测试主函数:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38package
com.test;
?
import
java.util.concurrent.ArrayBlockingQueue;
import
java.util.concurrent.TimeUnit;
?
import
com.threadpool.MyRejectedExecutionHandler;
import
com.threadpool.ThreadPoolConfig;
import
com.threadpool.ThreadPoolFactory;
?
/**
?
* @author
?
*
?
*/
public
class
TestThreadPoolMain {
?
????
/**
?????
* @param args
?????
*/
????
public
static
void
main(String[] args) {
?
????????
//设置配置
????????
ThreadPoolConfig config = ThreadPoolConfig.getInstance();
????????
config.setCorePoolSize(
2
);
????????
config.setMaximumPoolSize(
3
);
????????
config.setKeepAliveTime(
5
);
????????
config.setUnit(TimeUnit.SECONDS);
????????
//将队列设小,会抛异常
????????
config.setWorkQueue(
new
ArrayBlockingQueue<Runnable>(
10
));
????????
config.setHandler(
new
MyRejectedExecutionHandler());
????????
//线程池工厂
????????
ThreadPoolFactory factory = ThreadPoolFactory.getInstance(config);
????????
?
????????
for
(
int
i =
0
;i<
100
;i++){
????????????
factory.addTask(
new
ThreadTask(i+
"-i"
));
????????
}
????????
System.out.println(
"i add is over!-------------------"
);
????
}
}
6、测试比较:
可以看出创建的线程池对象和调用传递的线程池对象是相同的。
pool create = java.util.concurrent.ThreadPoolExecutor@de6f34
0-i, will sleep 0 s
Begin exception handler-----------
12-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34
Begin exception handler-----------
1-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34