当我们创建线程池并且提交任务失败时,线程池会回调RejectedExecutionHandler接口的rejectedExecution(Runnable task, ThreadPoolExecutor executor)方法来处理线程池处理失败的任务,其中task 是用户提交的任务,而executor是当前执行的任务的线程池。可以通过代码的方式来验证。
1、线程池工厂:
?
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76class="java keyword" style="overflow: visible !important; font-size: 1em !important; height: auto !important; font-family: Consolas, 'Bitstream Vera Sans Mono', 'Courier New', Courier, monospace !important; width: auto !important; vertical-align: baseline !important; float: none !important; font-weight: bold !important; color: #006699 !important; font-style: normal !important; text-align: left !important; margin: 0px; line-height: 1.1em !important; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; border: 0px; padding: 0px !important;">package com.threadpool;
?
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
?
/**
?* 线程池工厂方法
?* @author
?*
?*/
public class ThreadPoolFactory {
?????
????//线程池
????private static ThreadPoolExecutor? pool;???
????//自身对象
????private static ThreadPoolFactory factory;
?????
????/**
?????* 私有构造函数
?????*/
????private ThreadPoolFactory(){??? }
?????
????/**
?????* 获取工厂对象
?????* @param config
?????* @return
?????*/
????public static ThreadPoolFactory getInstance(ThreadPoolConfig config){
????????if(factory == null){
????????????factory = new ThreadPoolFactory();
????????}
?????????
????????if(pool == null){
?????????????
????????????if(config.getHandler() == null){
????????????????pool = new ThreadPoolExecutor(config.getCorePoolSize(),
????????????????????????config.getMaximumPoolSize(),config.getKeepAliveTime(),
????????????????????????config.getUnit(),config.getWorkQueue());
????????????}else{
????????????????pool = new ThreadPoolExecutor(config.getCorePoolSize(),
????????????????????????config.getMaximumPoolSize(),config.getKeepAliveTime(),
????????????????????????config.getUnit(),config.getWorkQueue(),config.getHandler());
????????????}
????????}??????
????????System.out.println("pool? create= "+pool.toString());
????????return factory;
????}
?????
????/**
?????* 添加线程池任务
?????* @param run
?????*/
????public synchronized void addTask(Runnable run){
????????pool.execute(run);
????}
?????
????/**
?????* 添加线程池任务
?????* @param runs
?????*/
????public synchronized void addTask(List<Runnable> runs){
????????if(runs != null){
????????????for(Runnable r:runs){
????????????????this.addTask(r);
????????????}
????????}
????}
?????
????/**
?????* 关闭线程池
?????*/
????public void closePool(){
????????pool.shutdown();
????}
?????
}
2、线程池配置文件类:
?
?
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80package com.threadpool;
?
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
?
/**
?* 线程池配置类
?* @author
?*
?*/
public class ThreadPoolConfig {
????//池中所保存的线程数,包括空闲线程。
????private int corePoolSize;
????//池中允许的最大线程数。
????private int maximumPoolSize;
????//当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
????private long keepAliveTime;
????//参数的时间单位。
????private TimeUnit unit;
????//执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
????private BlockingQueue<Runnable> workQueue;
????//由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
????private RejectedExecutionHandler handler;
????//配置文件自身对象
????private static ThreadPoolConfig config;
????/**
?????* 单例模式
?????*/
????private ThreadPoolConfig(){
?????????
????}
?????
????/**
?????* 获取配置文件对象
?????* @return
?????*/
????public static ThreadPoolConfig getInstance(){
????????if(config == null){
????????????config = new ThreadPoolConfig();
????????}??????
????????return config;
????}??
????public int getCorePoolSize() {
????????return corePoolSize;
????}
????public void setCorePoolSize(int corePoolSize) {
????????this.corePoolSize = corePoolSize;
????}
????public int getMaximumPoolSize() {
????????return maximumPoolSize;
????}
????public void setMaximumPoolSize(int maximumPoolSize) {
????????this.maximumPoolSize = maximumPoolSize;
????}
????public long getKeepAliveTime() {
????????return keepAliveTime;
????}
????public void setKeepAliveTime(long keepAliveTime) {
????????this.keepAliveTime = keepAliveTime;
????}
????public TimeUnit getUnit() {
????????return unit;
????}
????public void setUnit(TimeUnit unit) {
????????this.unit = unit;
????}
????public BlockingQueue<Runnable> getWorkQueue() {
????????return workQueue;
????}
????public void setWorkQueue(BlockingQueue<Runnable> workQueue) {
????????this.workQueue = workQueue;
????}
????public RejectedExecutionHandler getHandler() {
????????return handler;
????}
????public void setHandler(RejectedExecutionHandler handler) {
????????this.handler = handler;
????}??
}
3、简单任务类:
?
?
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28package com.test;
?
/**
?* 任务线程
?* @author
?*
?*/
public class ThreadTask extends Thread {
?????
????public ThreadTask(String name){
????????super(name);
????}
?????
????@SuppressWarnings("static-access")
????@Override
????public void run() {
????????// TODO Auto-generated method stub
????????System.out.println(this.getName().toString() + ", will sleep 0 s");
????????try {
????????????this.sleep(1*10);
????????} catch (InterruptedException e) {
????????????// TODO Auto-generated catch block
????????????e.printStackTrace();
????????}
????????System.out.println(this.getName().toString() + ", I am wakeup now ");
????}
?
}
4、异常处理接口实现类:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22package com.threadpool;
?
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
?
/**
?* 线程池异常处理类
?* @author
?*
?*/
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
?
????@Override
????public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
????????// TODO Auto-generated method stub
????????System.out.println("Begin exception handler-----------");
????????//执行失败任务
????????new Thread(task,"exception by pool").start();
????????//打印线程池的对象
????????System.out.println("The pool RejectedExecutionHandler = "+executor.toString());
????}
}
5、测试主函数:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38package com.test;
?
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
?
import com.threadpool.MyRejectedExecutionHandler;
import com.threadpool.ThreadPoolConfig;
import com.threadpool.ThreadPoolFactory;
?
/**
?* @author
?*
?*/
public class TestThreadPoolMain {
?
????/**
?????* @param args
?????*/
????public static void main(String[] args) {
?
????????//设置配置
????????ThreadPoolConfig config = ThreadPoolConfig.getInstance();
????????config.setCorePoolSize(2);
????????config.setMaximumPoolSize(3);
????????config.setKeepAliveTime(5);
????????config.setUnit(TimeUnit.SECONDS);
????????//将队列设小,会抛异常
????????config.setWorkQueue(new ArrayBlockingQueue<Runnable>(10));
????????config.setHandler(new MyRejectedExecutionHandler());
????????//线程池工厂
????????ThreadPoolFactory factory = ThreadPoolFactory.getInstance(config);
?????????
????????for(int i = 0;i<100;i++){
????????????factory.addTask(new ThreadTask(i+"-i"));
????????}
????????System.out.println("i add is over!-------------------");
????}
}
6、测试比较:
可以看出创建的线程池对象和调用传递的线程池对象是相同的。
pool create = java.util.concurrent.ThreadPoolExecutor@de6f34
0-i, will sleep 0 s
Begin exception handler-----------
12-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34
Begin exception handler-----------
1-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34