当你把任务发送给 executor 后,你可以选择取消这个任务的执行。使用 submit() 方法发送一个 Runnable 对象给一个executor,submit() 方法将返回一个实现了 Future 这个接口类的对象。你可以通过该类的 cancel() 方法取消任务的执行。cancel() 方法接收一个 boolean 值作为参数。如果参数为 true,且executor正在执行这个任务,那么执行任务的线程将被中断。
cancel() 方法返回一个boolean值显示任务是否被取消。
?
ThreadPoolExecutor类是接口 Executor 和 ExecutorService的基本实现类。同时Java提供了一个扩展类用以实现任务的调度 - ScheduledThreadPoolExecutor类,通过此类你可以:
你可以通过继承现有的类 (ThreadPoolExecutor 或者 ScheduledThreadPoolExecutor)来实现自定义的executor。如果你继承了 ThreadPoolExecutor 类,你可以重载以下方法:
如果你继承了ScheduledThreadPoolExecutor类,你可以重载decorateTask()方法。这个方法就像上面的newTaskFor(),但它是针对调度任务的。它允许你重载被executor执行的任务。
?
你也可以修改一些参数来改变executor的行为。其中包括:
在第二章中,我们实现了客户端 / 服务器端应用程序。在这个例子中我们将对那个应用程序做如下拓展:
?
?
class="java" name="code">// 这个类用来记录已经执行的任务总数以及这些任务的总耗时
// 两个变量都是原子变量,因为不同线程需要更新这两个参数的值
public class ExecutorStatistics {
private AtomicLong executionTime = new AtomicLong(0L);
private AtomicInteger numTasks = new AtomicInteger(0);
public void addExecutionTime(long time) {
executionTime.addAndGet(time);
}
public void addTask() {
numTasks.incrementAndGet();
}
@Override
public String toString() {
return "Executed Tasks: " + getNumTasks() +
". Execution Time: "+ getExecutionTime();
}
public AtomicLong getExecutionTime() {
return executionTime;
}
public AtomicInteger getNumTasks() {
return numTasks;
}
}
?
?
?
// 当executor被调用shutdown()或shutdownNow()后,executor会拒绝新提交的任务
// 这个类用来处理这种情况,这里它所做的是往输出流里打印出错信息
public class RejectedTaskController implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task,
ThreadPoolExecutor executor) {
ConcurrentCommand command = (ConcurrentCommand) task;
Socket clientSocket = command.getSocket();
try {
PrintWriter out = new
PrintWriter(clientSocket.getOutputStream(), true);
String message = "The server is shutting down."
+ " Your request can not be served."
+ " Shutting Down: "
+ executor.isShutdown()
+ ". Terminated: "
+ executor.isTerminated()
+ ". Terminating: "
+ executor.isTerminating();
out.println(message);
out.close();
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
?
?
?
/**
* 当一个Runnable对象提交给executor后,executor不是直接执行这个Runnable对象
* 它创建了一个FutureTask类的实例对象,该对象被executor里的线程所执行
* 这里我们继承了FutureTask类并实现了Comparable接口,这样每个提交给executor
* 的任务可以按照一定规则排序(如优先级)
*/
public class ServerTask<V> extends FutureTask<V> implements Comparable<ServerTask<V>> {
private ConcurrentCommand command;
public ServerTask(ConcurrentCommand command) {
super(command, null);
this.command=command;
}
public ConcurrentCommand getCommand() {
return command;
}
public void setCommand(ConcurrentCommand command) {
this.command = command;
}
@Override
public int compareTo(ServerTask<V> other) {
return command.compareTo(other.getCommand());
}
}
?
?
// 重载executor,以根据我们的需要修改它的行为
public class ServerExecutor extends ThreadPoolExecutor {
// 记录每个任务的执行时间,主键是ServerTask对象(即Runnable对象)
// 键值是对应的日期
private ConcurrentHashMap<Runnable, Date> startTimes;
// 这个变量记录每个用户的统计数据,主键是用户名
// 键值是ExecutorStatistics对象
private ConcurrentHashMap<String, ExecutorStatistics>
executionStatistics;
private static int CORE_POOL_SIZE =
Runtime.getRuntime().availableProcessors();
private static int MAXIMUM_POOL_SIZE =
Runtime.getRuntime().availableProcessors();
private static long KEEP_ALIVE_TIME = 10;
private static RejectedTaskController REJECTED_TASK_CONTROLLER
= new RejectedTaskController();
public ServerExecutor() {
super(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME,
TimeUnit.SECONDS, new PriorityBlockingQueue<>(),
REJECTED_TASK_CONTROLLER);
startTimes = new ConcurrentHashMap<>();
executionStatistics = new ConcurrentHashMap<>();
}
// 每个任务被执行前调用,这里记录每个任务的开始时间
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTimes.put(r, new Date());
}
// 每个任务执行完后调用,这里我们计算执行当前任务的耗时,
// 并更新用户已被执行的任务总数和总耗时
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
ServerTask<?> task = (ServerTask<?>) r;
ConcurrentCommand command = task.getCommand();
if (t == null) {
if (!task.isCancelled()) {
// 首先从startTimes中删除此任务的运行开始时间
Date startDate = startTimes.remove(r);
Date endDate = new Date();
long executionTime = endDate.getTime() - startDate.getTime();
ExecutorStatistics statistics =
executionStatistics.computeIfAbsent
(command.getUsername(), n -> new ExecutorStatistics());
statistics.addExecutionTime(executionTime);
statistics.addTask();
// 从ConcurrentServer中维护的任务列表中删除此任务,因为它已经完成
ConcurrentServer.finishTask(command.getUsername(), command);
} else {
String message = "The task"
+ command.hashCode() + "of user"
+ command.getUsername() + "has been cancelled. ";
System.out.println(message);
}
} else {
String message = "The exception "
+ t.getMessage()
+ " has been thrown.";
System.out.println(message);
}
}
// 把发送给executor的Runnable对象包装成ServerTask对象,该对象才真正是被线程执行的
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable,
T value) {
return new ServerTask<T>(runnable);
}
public void writeStatistics() {
for (Map.Entry<String, ExecutorStatistics> entry : executionStatistics.entrySet()) {
String user = entry.getKey();
ExecutorStatistics stats = entry.getValue();
System.out.println(user + ":" + stats);
}
}
}
?
?
?
// 指令的抽象类
public abstract class Command {
protected String[] command;
public Command (String [] command) {
this.command=command;
}
public abstract String execute ();
}
/**
* 这是所有指令类的基础类,它包括了一些所有指令的公共行为
* 1. 调用每个指令类的具体逻辑实现
* 2. 把指令结果写会给客户端
* 3. 关闭所有通信中使用的资源
*/
public abstract class ConcurrentCommand extends Command implements Comparable<ConcurrentCommand>, Runnable {
private String username;
private byte priority;
private Socket socket;
public ConcurrentCommand(Socket socket, String[] command) {
super(command);
username=command[1];
priority=Byte.parseByte(command[2]);
this.socket=socket;
}
@Override
public abstract String execute();
@Override
public void run() {
String ret = execute();
try {
PrintWriter out = new
PrintWriter(socket.getOutputStream(),true);
out.println(ret);
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public int compareTo(ConcurrentCommand o) {
return Byte.compare(o.getPriority(), this.getPriority());
}
public byte getPriority() {
return priority;
}
public Socket getSocket() {
return socket;
}
public String getUsername() {
return username;
}
}
// 对应Query请求的指令类
public class ConcurrentQueryCommand extends ConcurrentCommand {
public ConcurrentQueryCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
WDIDAO dao=WDIDAO.getDAO();
if (command.length==5) {
return dao.query(command[3], command[4]);
} else if (command.length==6) {
try {
return dao.query(command[3], command[4],
Short.parseShort(command[5]));
} catch (NumberFormatException e) {
return "ERROR;Bad Command";
}
} else {
return "ERROR;Bad Command";
}
}
}
//对应Report请求的指令类
public class ConcurrentReportCommand extends ConcurrentCommand {
public ConcurrentReportCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
WDIDAO dao=WDIDAO.getDAO();
return dao.report(command[3]);
}
}
//对应Stop请求的指令类
public class ConcurrentStopCommand extends ConcurrentCommand {
public ConcurrentStopCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
ConcurrentServer.shutdown();
return "Server stopped";
}
}
//对应Cancel请求的指令类
public class ConcurrentCancelCommand extends ConcurrentCommand {
public ConcurrentCancelCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
ConcurrentServer.cancelTasks(getUsername());
return message;
}
}
//此类处理一些服务器不支持的请求
public class ConcurrentErrorCommand extends ConcurrentCommand {
public ConcurrentErrorCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
return "Unknown command: " + command[0];
}
}
//对应status请求的指令
public class ConcurrentStatusCommand extends ConcurrentCommand {
public ConcurrentStatusCommand (Socket socket, String[] command) {
super(socket, command);
}
@Override
public String execute() {
StringBuilder sb=new StringBuilder();
ThreadPoolExecutor executor = ConcurrentServer.getExecutor();
sb.append("Server Status;");
sb.append("Actived Threads: ");
sb.append(executor.getActiveCount());
sb.append(";");
sb.append("Maximum Pool Size: ");
sb.append(executor.getMaximumPoolSize());
sb.append(";");
sb.append("Core Pool Size: ");
sb.append(executor.getCorePoolSize());
sb.append(";");
sb.append("Pool Size: ");
sb.append(executor.getPoolSize());
sb.append(";");
sb.append("Largest Pool Size: ");
sb.append(executor.getLargestPoolSize());
sb.append(";");
sb.append("Completed Task Count: ");
sb.append(executor.getCompletedTaskCount());
sb.append(";");
sb.append("Task Count: ");
sb.append(executor.getTaskCount());
sb.append(";");
sb.append("Queue Size: ");
sb.append(executor.getQueue().size());
sb.append(";");
return sb.toString();
}
}
?
?
/**
* 在这个类中,我们启动了RequestTask这个线程,该线程读取由ConcurrentServer保存的客户端socket,
* 创建相应的指令并发给executor执行。
* 这样做的目的是让被线程执行的每个任务只包含和请求相关的代码,其它操作可以在executor外处理
*/
public class ConcurrentServer {
private static volatile boolean stopped = false;
// 用来保存发送消息给服务器的客户的sockets
private static LinkedBlockingQueue<Socket> pendingConnections;
// 保存每一个在executor里执行的任务所关联的Future对象,主键是用户名,
// 键值是另外一个ConcurrentMap (它的主键是ConcurrentCommand,键值是和任务关联的Future实例)
private static ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController;
// 执行RequestTask对象的Thread
private static Thread requestThread;
// 创建指令对象并发送给executor
private static RequestTask task;
private static ServerSocket serverSocket;
public static void main(String[] args) {
pendingConnections = new LinkedBlockingQueue<>();
taskController = new ConcurrentHashMap<String, ConcurrentHashMap<ConcurrentCommand, Future<?>>>();
// 启动RequestTask线程
task = new RequestTask(pendingConnections, taskController);
requestThread = new Thread(task);
requestThread.start();
System.out.println("Initialization completed.");
serverSocket = new ServerSocket(Constants.CONCURRENT_PORT);
do {
try {
Socket clientSocket = serverSocket.accept();
pendingConnections.put(clientSocket);
} catch (Exception e) {
e.printStackTrace();
}
} while (!stopped);
finishServer();
}
// 该方法修改stopped变量为true并关闭serverSocket
public static void shutdown() {
stopped = true;
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 该方法停止executor并中断RequestTask线程
private static void finishServer() {
System.out.println("Shutting down the server...");
task.shutdown();
System.out.println("Shutting down Request task");
requestThread.interrupt();
System.out.println("Request task ok");
System.out.println("Closing socket");
System.out.println("Shutting down logger");
System.out.println("Logger ok");
System.out.println("Main server thread ended");
}
// 取消一个用户的请求
public static void cancelTasks(String username) {
ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username);
if (userTasks == null) {
return;
}
int taskNumber = 0;
Iterator<ServerTask<?>> it = userTasks.values().iterator();
while(it.hasNext()) {
ServerTask<?> task = it.next();
ConcurrentCommand command = task.getCommand();
if(!(command instanceof ConcurrentCancelCommand) &&
task.cancel(true)) {
taskNumber++;
it.remove();
}
}
}
// 当一个任务顺利完成后,我们需要把和这个任务关联的Future对象从保存的ConcurrentMap中删除
public static void finishTask(String username, ConcurrentCommand command) {
ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks
= taskController.get(username);
userTasks.remove(command);
}
}
public class RequestTask implements Runnable {
// 保存客户端的sockets
private LinkedBlockingQueue<Socket> pendingConnections;
// 用来并行处理用户的请求指令
private ServerExecutor executor = new ServerExecutor();
// 保存和任务相关联的Future对象
private ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController;
public RequestTask(LinkedBlockingQueue<Socket>
pendingConnections, ConcurrentHashMap<String,
ConcurrentHashMap<Integer, Future<?>>> taskController) {
this.pendingConnections = pendingConnections;
this.taskController = taskController;
}
public void run() {
try {
while (!Thread.currentThread().interrupted()) {
try {
Socket clientSocket = pendingConnections.take();
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String line = in.readLine();
ConcurrentCommand command;
String[] commandData = line.split(";");
System.out.println("Command: " + commandData[0]);
switch (commandData[0]) {
case "q":
System.out.println("Query");
command = new ConcurrentQueryCommand(clientSocket, commandData);
break;
case "r":
System.out.println("Report");
command = new ConcurrentReportCommand(clientSocket, commandData);
break;
case "s":
System.out.println("Status");
command = new ConcurrentStatusCommand(executor, clientSocket, commandData);
break;
case "z":
System.out.println("Stop");
command = new ConcurrentStopCommand(clientSocket, commandData);
break;
case "c":
System.out.println("Cancel");
command = new ConcurrentCancelCommand(clientSocket, commandData);
break;
default:
System.out.println("Error");
command = new ConcurrentErrorCommand(clientSocket, commandData);
break;
}
ServerTask<?> controller = (ServerTask<?>) executor.submit(command);
storeContoller(command.getUsername(), controller, command);
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
// No Action Required
}
}
// 保存和用户请求关联的Future对象
private void storeContoller(String userName, ServerTask<?>controller, ConcurrentCommand command) {
taskController.computeIfAbsent(userName, k -> new ConcurrentHashMap<ConcurrentCommand, ServerTask<?>>()).put(command, controller);
}
// 关闭executor
public void shutdown() {
String message = "Request Task: "
+ pendingConnections.size()
+ " pending connections.";
System.out.println(message);
executor.shutdown();
}
// 等待executor执行完所有正在执行的任务
public void terminate() {
try {
executor.awaitTermination(1, TimeUnit.DAYS);
executor.writeStatistics();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
?
你也可以重载以下Executor的方法:
ScheduledThreadPoolExecutor类有以下方法允许延迟执行任务,或周期性的任务:
?