这是我第一次做java技术比较全面和复杂的系统,当时刚从事互联网开发,它与传统单机增删改查的Web应用差别很大,那只是业务复杂。当时除了学习很多工具技巧外包括maven/git使用都才入门,
线程处理的相关技术整合使用还不多,自己一个人一下做这个还是蛮有压力的,新的工作需要打响第一P。这时我只看过一部分dubbo与druid源码,有了些想法,最后一步步也顺利完成了。
开发一个系统其实就是组织一个工厂,安排各种人员,各司其职又相互协作,处理业务的过程。不过后来又看了些源码,比如rocketmq,
发现有些相似的思想,也有更多设计显的稚嫩。现在看的多了,更可以参考优秀的设计,无法结构/代码/细节功能都可以向高水平看齐。
两年前做的系统,有时介绍起来都想不起来细节了,因为很多地方都反复想了多个方案,比如是否按类型分线程池处理等。所以
打算总结一下,随便分析一下可以改进的地方。
# 1. 系统的功能与特点
? 这个系统有点类似于消息中间件的开发,调查
服务端(以下简称 SS)接收的消息是一个个背调请求,系统要持久化,每个请求要再要分解出一个个单项调查子任务,推送到各个调查客户端(以下简称 SC)进行消费,及时的调查返回结果,人工调查 SC只返回收到任务。
? 比如用户选择调查一个人的身份/
学历/处罚/不良信息。SS接收这个任务,拆解后分别发送给处理身份,学历,处罚,不良信息的四类SC,每类可能不只一个应用,也许有两个调用不良信息的SC都可以完成单独的任务。这个系统有以下几个特点:
- - SS与SC之间通过公司自己开发的基于netty的通讯组件进行,我参与了这个组件开发,在分析通讯协议设计的文章中介绍了我们的通讯组件。rocketmq中也是自己的通讯组件,自己的专用协议。
- - 这个过程中还要维护调查客户端的连接,状态,同类的客户端要进行选择性的推送。这个类似于nameServer的功能,有心跳功能,要监听通讯通道的状态,我们集成在SS中,更没有实现高可用。
- - SS要嵌入到一个web应用中,接收企业服务应用过来的http背调请求,未来内部使用还考虑也支持自己的remote组件,比如相应的企业服务中可以引用背调请求的生产者。当然对外部推广客户还是要http的方式的跨防火墙请求。
- - 每一个SC也会嵌入一个web应用中,可以查看接收的单项调查子任务的情况。SC要容纳各种子任务的实现,通过配置化只加载其中的一种。SC只处理push过来的子任务,没实现主动pull。
- - SS收到的消息要支持持久化,提供相应的接口,可以配置具体的方式。
- - 异步处理为主。比如SC完成的结果后通知SS,SS有监听器异步监听完成情况,进行更新。
- - 后期由于提出了人工干预同类SC的处理,以及有人工电话调查SC并准备使用dubbo技术,所以又改造了一下,增加了中间处理层(以下简称SM),同类的SC注册到SM中,而SM又注册到SS中。SM也有局部的nameServer的功能,也有业务功能。
# 2. 项目的组成
? 项目由一个总的pom工程,和三个jar工程组成,分别是survey-client,survey-middle,survey-server组成。
? 下面以survey-server为主,其它两个模块简单介绍。
# 3. 服务端的设计
## 3.1 总体设计
? 服务端有两大功能,分别是处理背调请求,维护客户端的状态。有点类似与rocketmq的broker与nameSvr的两大功能。
? 这两个功能都有核心类,因为因为附属功能都在其中,所以我当时喜欢叫Container,也许现在会学rocketmq叫
controller了。两个核心类不需要
远程通讯,但它们相互引用。
? TaskContainer管理任务,AppContainer管理注册的客户端,它们由一个inti类进行统一启动两个核心类,以及注入持久化实现类。同时init类还会启动一个守护进程,输出一些核心类中的重要日志信息给控制台。
## 3.2 通讯中间件使用
基于netty开发。
服务端事先配置可以连接的客户端的信息,包括appKey,code,appSecret等信息。如果客户端连接成功了,会有一个sessionId,由服务端保存。其内部有验证,重连等机制。
通讯层本身有连接心跳功能,但上层还需要一个业务心跳,传递业务执行情况与服务器状态变化 ,上层选择客户端时就可以基于多种策略了。
### 3.2.1 服务端发消息与处理
**服务端的启动:**
class="java">```java
//服务端启动,包括端口以及可允许连接的客户端信息。这些信息用于底层校验客户端
ServerInit.init(serverPort, clientAppList);
```
**服务端push消息给客户端:**
一般通讯层服务端会返回连接好的channel给使用者,使用者可以包装成自己的channel进行使用。这里并不提供channel,其内部持有。外部使用如下方式:
```java
//ServerPushHandler是通讯层的类,有静态方法发信息给客户端。
//根据客户端的sessionId发送一个【任务(名称)】以及数据,并设置好返回结果的回调对象来异步处理返回值。
//当然也可以当成dispatcher来用。
boolean bln = ServerPushHandler.pushBySessionId(subTaskInfo.getSurveyClient().getSessionId(), "assignTaskToClient", body, new ServerPushTaskCallback());
```
客户端会注册对应【任务(名称)】的处理类,来处理接收到的数据并返回值。
```java
//客户端配置对应的处理类。
PushReceiverCenter.registReceiver("assignTaskToClient", new PushReceiverFeedbackHandler(apiInvorkerInterface));
```
### 3.2.2 客户端发消息给与服务端处理
**客户端启动:**
```java
//设置状态监听类。底层netty监控到状态变化会通知这个类对象来处理
client = ClientCenter.getAClient(this.serverIP, serverPort, this.appkey, this.appSecret,clientStatusListener);
```
**发送与接收处理:**
```java
//客户端发送业务心跳数据的方式,消息本身包含【消息名称】
//同时还设置了服务端返回值的处理类,确认服务端已经收到了。
MiddleMsg msg = new MiddleMsg("getClientSystemInfo", body);
ResultObject res = client.sendMsg(msg, new ReportClientInfoFeedbackHandler());
//服务端注册消息的处理类,根据【消息名称】,选择对应的处理类
register.addMsgServiceHandler("getClientSystemInfo",RecvClientInfoHandler);
```
### 3.2.3 设计改进
? 在rocketmq中,通讯层与核心类不直接引用,中间有一个outApi的类隔离着,对核心类与其它类提供所有的通讯功能服务。
? 我的设计中,通讯层属于AppContainer,直接使用了,考虑整体通讯的统一性,应该被一个独立的类被引用使用,独立的类负责通讯,注册消息处理类等功能。
## 3.2 客户端管理核心类的功能
### 3.2.1 属性
? 客户端管理类,主要有配置的客户端与
在线的客户端。由于由服务端与客户端两层改为三层,并且原始的配置值被
意义被要求改变,比如appKey由一个客户端变成了一类客户端。还涉及到客户端配置管理服务不能及时变化造成的一定冗余。有些传参被要求变动,还涉及到通讯层的参数变动,所以属性中有些意义发生变化。
? ClientApp是通讯层要的配置客户端类,SurveyApp是本业务中的客户端。
```java
public class AppContainer {
private static final Logger logger =Logger.getLogger(AppContainer.class);
private String serverIp;
private String serverPort;
/**用户单例锁*/
private static Boolean lockSigleton = true;
private static AppContainer appContainer;//单例使用
/**配置的app,由于底层没有code,用appKey记录,所以监听底层的Client用AppKey确定。
//【appKey--(SurveyApp--SurveyClientList)】*/
public Map<String,SurveyApp> appHolder=new HashMap<String,SurveyApp>();
/**配置的app,任务用Code记录(在任务处理中,因为子任务中使用AppCode指明使用的App类型,这是从产品那边配置的,不太可能用AppKey这个不是很清晰的码)
* 【appCode--(SurveyApp--SurveyClientList)】
*/
public Map<String,SurveyApp> appCodeHolder=new HashMap<String,SurveyApp>();
/**配置的Client(ClientCode--SurveyClient)*/
public Map<String,SurveyClient> clientHolder=new HashMap<String,SurveyClient>();
/**可维护的Client在线列表。
* <key为appKey(代表一类客户端),内部一组客户端,有并发控制<sessionId,client>>
【appKey--(sessionId--onlineClient)】
*/
public volatile Map<String, ConcurrentHashMap<String, SurveyClient>> onlineClientMap=new HashMap<String, ConcurrentHashMap<String,SurveyClient>>();
/**根据配置的SurveyClient,生成ClientApp列表,仅用于启动中间件前给底层中间件传递*/
private List<ClientApp> clientAppList=new ArrayList<ClientApp>();
private TaskContainer taskContainer;//引用任务处理容器
public static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public boolean isMiddlewearStarted=false;
```
### 3.2.2 主要方法
- 启动通讯服务端,注册三个业务处理类:客户端状态监听,处理客户端业务心跳,处理客户端完成任务情况上报处理。
```java
public void initMiddlerWareStart(){
logger.info("【背调中心】注册客户心跳消息与任务完成消息的回调处理");
try {
// 获取应用client基本信息
MsgServiceHandlerRegister register = MsgServiceHandlerRegister.getRegister();
//注册事件处理类
MsgServiceHandlerRegister.setEventHandlerClass(ClientStatusListener.class);
register.addMsgServiceHandler("getClientSystemInfo",RecvClientInfoHandler.class);
register.addMsgServiceHandler("subTaskFinishInfo", RecvClientTaskHandler.class);
new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
try {
isMiddlewearStarted=true;
ServerInit.init(StringUtils.isEmpty(serverPort)?9166:Integer.parseInt(serverPort), clientAppList);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
isMiddlewearStarted=false;
}
}
}).start();
logger.debug("【背调中心】启动消息服务成功!端口:" + serverPort);
} catch (Exception e) {
isMiddlewearStarted=false;
e.printStackTrace();
logger.error("【背调中心】启动消息服务失败!异常:" + e.toString());
}
}
```
- ClientStatusListener主要监听客户端登录成功事件,以及客户端断开连接事件。登录成功将产生一个surveyClient,并以sessionId为key存在map中。surveyClient还有一部分业务信息,比如权重等,来自业务心跳给补充。
```java
//public class ClientStatusListener extends AbstractEventHandler中的方法。
//将产生一个surveyClient客户端。
@Override
public void loginSuccess(EventInfo res) {
// TODO Auto-generated method stub
super.loginSuccess(res);
ContainerInit containerInit = ContainerInit.getInstance();
if (containerInit != null) {
ClientApp clientApp = res.getAppinfo();
Map<String, SurveyClient> surveyClientMap = AppContainer.instance.onlineClientMap.get(clientApp.getAppKey());
if (surveyClientMap == null)
surveyClientMap = new ConcurrentHashMap<String, SurveyClient>();
SurveyApp surveyApp = AppContainer.appHolder.get(clientApp.getAppKey());
// 1.新建一个调查客户端,以sessionId为key,记录在app表下面。
// surveyClient中一部分来源上监听,另一部分信息要来源于业务心跳。
SurveyClient surveyClient = new SurveyClient(clientApp.getIp(), "", clientApp.getSessionId(), clientApp.getChannelId(), surveyApp);
surveyClientMap.put(clientApp.getSessionId(), surveyClient);
// 2.新建一个准备放置客户端下的子任务。
TaskContainer.clientSubTaskInfoMap.put(clientApp.getSessionId(), new ArrayList<SubTaskInfo>());// 新建此客户端下的子任务容器
logger.info("【中间件状态监听】此APP当前客户端总数:" + surveyClientMap.size());
}
}
//public class RecvClientInfoHandler implements MsgServiceHandler中的方法。
//将对surveyClient客户端信息进行补充。包括客户端的类型code,权重,更新时间等,未来增加其它性能数据。
@Override
public MiddleMsg handleMsgEvent(MsgEvent dm, MiddleMsg msg) {
String body = msg.getBody() + "";
String sessionId = msg.getHeader().getSessionID();
String returnCode = "";
SurveyResponse td = new SurveyResponse();
try {
ClientRealData clientRealData = JsonUtils.toBean(body, ClientRealData.class);
//将客户端的实时信息设置到在线客户端的属性中
Map<String, SurveyClient> onlineClientMap=(Map<String, SurveyClient>) AppContainer.onlineClientMap.get(clientRealData.getAppkey());
SurveyClient surveyClient=onlineClientMap.get(sessionId);
// logger.debug("【处理客户心跳】客户端【存在吗】?:"+surveyClient!=null);
if(surveyClient!=null){
logger.debug("【处理客户心跳】clientRealData:"+clientRealData.getClientCode()+"@"+clientRealData.getAppCode());
surveyClient.setClientCode(clientRealData.getClientCode());
surveyClient.setUpdateTime(new Date());
surveyClient.setWeight(clientRealData.getWeight() == null ? "60" : clientRealData.getWeight());
}
returnCode = "success";
} catch (Exception e) {
e.printStackTrace();
returnCode = "failure";
logger.error("【处理客户心跳】消息失败!异常:" + e.toString());
}
td.setCode(returnCode);
msg.setBody(td);
return msg;
}
```
根据业务的类型与企业客户的级别,选择一个可用的客户端,这部分改变比较大,包括构建treemap得到权重与概率与级别要求,同类型还要按已经分配的任务决定给最少任务的。如果正好又不能用了,再
递归找一个可用的。
```java
public static SurveyClient getClientByUserRankAndClinetLever(String appCode,int userRank) {
// 从appCode得到appKey,从而找到可用的在线客户端.让前端根据appKey来分子任务不可靠
SurveyApp surveyApp = appCodeHolder.get(appCode);
logger.debug("【策略2】未配置客户端,返回!appKey:"+appCode);
return getClientByUserRankAndClinetLeverByAppkey(surveyApp.getAppKey(),userRank);
}
private static SurveyClient getClientByUserRankAndClinetLeverByAppkey(String appKey,int userRank) {
// 从appCode得到appKey,从而找到可用的在线客户端.让前端根据appKey来分子任务不可靠
//SurveyApp surveyApp = appCodeHolder.get(appCode);
Map<String, SurveyClient> onlineSurveyClientMap = onlineClientMap.get(appKey);
if(onlineSurveyClientMap==null){
logger.info("【策略2】未配置客户端,返回!appKey:"+appKey);
return null;
}
int onlineClientNum=onlineSurveyClientMap.size();
logger.debug("【策略2】surveyApp(code|在线客户端数):"+appKey+"|" + onlineClientNum);
// 用于权重随机的参数对象
//logger.debug("【策略】可选用客户端的数:" + onlineClientNum);
Map<String, Integer> canUseClient = new HashMap<String, Integer>();//用于treemap排序。
List<SurveyClient> sameLeverClientList = new LinkedList<SurveyClient>();//用于同级别客户端存放
if (onlineClientNum == 0) {
logger.info("【策略2】备选客户端为0,返回!");
return null;
}else {
Iterator iter = onlineSurveyClientMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, SurveyClient> entry = (Map.Entry<String, SurveyClient>) iter.next();
Integer eachWeight = new Integer(entry.getValue().getWeight() == null ? Constants.CLIENT_BASE_LEVER+"" : entry.getValue().getWeight());
canUseClient.put(entry.getKey(), eachWeight);
logger.debug("【策略2】循环可选用客户端详细信息(sessionId|weight):" + entry.getKey() + "|" + eachWeight);
if(onlineClientNum==1) return entry.getValue();
}
}
//找出可用的客户端,产生一个map,再构建treemap,用策略得到一个值。
logger.info("【策略2】准备筛选的客户端个数:"+canUseClient.size());//canUseClient不含有重复的,所以得到选择的客户端值,还要再处理多个同值的情况。
RankAndLeverTreeMapSelect rankAndLeverTreeMapSelect = new RankAndLeverTreeMapSelect(canUseClient,0);
Integer chooseValue = rankAndLeverTreeMapSelect.chooseValue();
logger.debug("【策略2】选择出的客户端lever:"+chooseValue);
if(chooseValue==null) return null;
for(SurveyClient surveyClient:onlineSurveyClientMap.values()){
if(surveyClient.getWeight() == null) surveyClient.setWeight(Constants.CLIENT_BASE_LEVER+"");
logger.debug("【策略2】当前比对的surveyClient.getWeight(null=60):"+(surveyClient.getWeight()));
//if(StringUtils.isBlank(surveyClient.getWeight())) continue;//没有就是60分
if(chooseValue.intValue()==new Integer(surveyClient.getWeight()).intValue()) sameLeverClientList.add(surveyClient);
}
if(sameLeverClientList.size()==1) return sameLeverClientList.get(0);
//如果同一个值的客户端有多个,再排序,取任务最少的一个。
Collections.sort(sameLeverClientList,new Comparator<SurveyClient>() {
@Override
public int compare(SurveyClient surveyClient1, SurveyClient surveyClient2) {
//以下如果改变顺序则调换一下参数位置
return surveyClient1.getTaskCount()-(surveyClient2.getTaskCount());
}
});
SurveyClient surveyClient=sameLeverClientList.get(0);
sameLeverClientList.clear();
//如果找到的客户端不能用,就递归找一个,同时移除这个客户端
SecretManagement m = ServerGlobal.sessionWithAppKeys.get(surveyClient.getSessionId());
if(m!=null && m.getChannel()!=null && m.getChannel().isWritable()){
return surveyClient;
}
else{
Map<String, SurveyClient> surveyClientMap=AppContainer.onlineClientMap.get(appKey);
if(surveyClientMap.containsKey(surveyClient.getSessionId())){
logger.debug("【策略。推送失败移除客户端再递归获取】sessionId:"+surveyClient.getSessionId());
surveyClientMap.remove(surveyClient.getSessionId());// 根据通讯客户端,移除里面的调查客户端对象
}
return getClientByUserRankAndClinetLeverByAppkey(appKey,userRank);
}
}
```
```java
/**rankAndLeverTreeMapSelect.chooseValue();
* 策略:
* 1.如果有用户级别值,比如90分,那100、80、70、60、40、20分的客户端中,选择最近的80分的客户端。
* 2.如果用户没有级别,那80/70/60/40/20中,选择及格的最低的60,如果都不及格,选择最高的40。
* 3.选择了一个分值的客户端,如果这里面有多个,再随机选择一个(未来根据完成情况或者性能)
* <P></P>
* @return
*/
@Deprecated
public K choose() {
...
}
/**
* 考虑到重复情况,不能返回key了,只能返回特定value后再循环处理。
* @return
*/
public Integer chooseValue() {
if(treeMap.size()==0) return null;
if(treeMap.size()==1) return treeMap.firstEntry().getKey();
if(_userRank>0d){//如果有用户级别
logger.debug("有用户级别值,找接近最大的。_userRank:"+_userRank);
SortedMap<Integer, K> headMap = this.treeMap.headMap(_userRank, true);
logger.debug("_userRank & headMap.size:"+_userRank+"|"+headMap.size());
if(headMap.size()==0) return treeMap.firstEntry().getKey();//如果找不到,给最低的。
return headMap.lastKey();
}
else{//如果无用户级别
logger.debug("无用户级别值,找及格里最小的。_baseLever:"+_baseLever);
SortedMap<Integer, K> tailMap = this.treeMap.tailMap(_baseLever, true);
logger.debug("_baseLever & headMap.size:"+_userRank+"|"+tailMap.size());
if(tailMap.size()==0) return treeMap.lastEntry().getKey();//如果都生活及格,找一个最大的。
return tailMap.firstKey();
}
}
```
## 3.3 业务处理核心类的功能
### 3.3.1 TaskContainer主要的属性
包括了背调任务存放,失败处理
队列,子任务分发线程池,外部持久化接口实现,子任务完成监听类。
超时时间,尝试次数配置。
```java
/**
* 任务容器-管理背调任务与相关子任务
* @author liujun
* @date 2018年1月18日 上午10:33:01
*/
public class TaskContainer {
private static final Logger logger = Logger.getLogger(TaskContainer.class);
/** 实时总任务信息(taskid---TaskInfo(SubTaskInfoMap)) */
public volatile static Map<String, TaskInfo> taskInfoMap = new ConcurrentHashMap<String, TaskInfo>();
/** 任务在内存中允许的最大存放数 */
private static Integer maxTaskMapSize=Integer.MAX_VALUE;
/**
* 实时子任务信息(subtaskid---SubTaskInfo)
* 目的:子任务完成后,根据subTaskId从这里快速拿到对应的子任务。从上面的主任务不方便找。
* 不需要了,子任务中带有主任务ID,所以还是先拿主任务,再取子任务处理。
*/
// public volatile static Map<String, SubTaskInfo> subTaskInfoMap = new ConcurrentHashMap<String, SubTaskInfo>();
/**使用阻塞队列,放置所有要处理的失败子任务.失败的任务先会再回线程池,之后超时会触发返回*/
BlockingQueue<SubTaskInfo> subTaskInfoQueue = new LinkedBlockingQueue<SubTaskInfo>();
/**任务超时是否自动处理,此超时不是推送客端尝试多次,而是等待子任务完成*/
public boolean autoDealTimeoutSubtask = false;
/**
* 子任务推送的最多尝试次数
*/
public static int maxRePushSubTaskTimes=5;
/**
* 等待子任务完成的超时的时间
*/
// public static long maxTimeoutSubTaskDealTime=24*60*60000L;
/**线上子任务超时时间*/
public static long maxTimeoutOnlineSubTaskTime=60*1000L;
/**主任务超时时间*/
public static long maxTimeoutTaskTime=2*24*60*60*1000L;
/**
* 一个配置的client下的实时子任务信息(sessionId-List<SubTaskInfo>)
* 用sessionId方便应对底层的上下线变化。中间件的事件只能得到sessionId,没有ClientCode。
*/
public volatile static Map<String, List<SubTaskInfo>> clientSubTaskInfoMap = new ConcurrentHashMap<String, List<SubTaskInfo>>();
/** app,client配置容器类 */
private AppContainer appContainer;
/** 子任务分派用线程池 */
private static ExecutorService executor = Executors.newCachedThreadPool();
/** 监听器-子任务完成 */
public SubTaskListener subTaskListener = new SubTaskListener();
/** 监听器-客户端上下线事件 */
public ClientStatusListener clientStatusListener = new ClientStatusListener();
/** 任务池锁 */
private Object TaskLock = new Object();
/** 任务处理线程 */
public SubTaskRedo subTaskRedo = new SubTaskRedo();
/**
* 外部持久化任务接口
*/
public TaskPersistenceInterface taskPersistenceInterface;
```
### 3.3.2 主要方法
#### 接收背调任务及选择客户端后发出去
在initContainer中已经用TaskFactory,把请求参数处理成TaskInfo对象了,处理过程中已经持久化了。
```java
String queryJsonStr = jsonParam.toString();
logger.info("【发起任务】背调任务请求参数:" + queryJsonStr);
if (containerInit != null) {
logger.info("---------【用户发起背调了...】--------");
TaskInfo taskInfo = TaskFactory.creatTaskInfo(jsonParam);
taskContainer.createAndPutTaskPool(taskInfo);
} else {
logger.warn("【发起任务】背调中心没有启动");
}
```
taskContainer处理背调任务:
```java
/**
* <P>
* 根据提交请求,生成主任务子任务,放入登记的map,并放入子任务队列
* </P>
* @param paras
* @param appCode
* @throws SurveyException
*/
public boolean createAndPutTaskPool (TaskInfo taskInfo) throws SurveyException {
boolean createResult=false;
// TaskInfo taskInfo = TaskFactory.creatTaskInfo(paras);
logger.debug("【主任务Map添加】当前总数:" + taskInfoMap.size());
if (taskInfoMap.size() >= maxTaskMapSize) {
logger.error("【任务登记MAP】已满!");
throw new SurveyException("任务登记已经满");
// return false;
} else {
taskInfoMap.put(taskInfo.getTaskId(), taskInfo);
Iterator<Map.Entry<String,SubTaskInfo>> iter =taskInfo.getSubTaskMap().entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, SubTaskInfo> entry = (Map.Entry<String, SubTaskInfo>) iter.next();
try {
startExecutorCompletionService(entry.getValue());
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new SurveyException("任务提交线程池失败",e);
}
}
logger.debug("【主任务池添加】当前总数+1后:" + taskInfoMap.size());
//改阻塞队列不需要另加锁
// synchronized (TaskLock) {
// TaskLock.notifyAll();
// }
createResult = true;
}
logger.debug("【任务登记MAP】情况如下:");
Iterator<Map.Entry<String,TaskInfo>> iter =taskInfoMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, TaskInfo> entry = (Map.Entry<String, TaskInfo>) iter.next();
logger.debug("【任务登记MAP】主任务id|(总|推|执|完):"+entry.getValue().getTaskId()+"|"+entry.getValue().getTotalTask()+"|"+entry.getValue().getPushTask()+"|"+entry.getValue().getExecuteTask()+"|"+entry.getValue().getCompleteTask() );
}
return createResult;
}
```
上面的拆分后的子任务处理:startExecutorCompletionService(entry.getValue());
```java
/**
* <P>将子任务设置一个可用的客户端后,通过线程池执行发送</P>
*
* @param subTaskInfo
* @param taskContainer
* @throws InterruptedException
* @throws ExecutionException
*/
public void startExecutorCompletionService(SubTaskInfo subTaskInfo) throws InterruptedException, ExecutionException {
if (StringUtils.isBlank(subTaskInfo.getAppCode()) && StringUtils.isBlank(subTaskInfo.getAppKey())) {
logger.warn("【分派预处理】子任务未设置AppCode或者AppKey,子任务Id:" + subTaskInfo.getSubTaskId());
return;
}
if (!StringUtils.isBlank(subTaskInfo.getStatus())) {
//已经有状态的,都是从库里加载的,不处理了,只放池子里。等线下的回调,或者人工处理,或者超时了。
logger.warn("【分派预处理】子任务已经有状态,不再分配推 。子任务Id:" + subTaskInfo.getSubTaskId()+",状态:"+subTaskInfo.getStatus());
// if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType()))
// subTaskInfoQueue.put(subTaskInfo);//如果成功的,并且是线上的,进行超时处理。
return;
}
// 从在线客户端表中取一个
// SurveyClient surveyClient = AppContainer.getWeightRandomClient(subTaskInfo.getAppCode());
SurveyClient surveyClient =null;
if(StringUtils.isBlank(subTaskInfo.getAppKey())){
logger.debug("【老版本-按AppCode分配】-----------旧版getAppCode--"+subTaskInfo.getAppCode());
surveyClient=AppContainer.getClientByUserRankAndClinetLever(subTaskInfo.getAppCode(),0);
}else
{
logger.debug("【新版本-按Appkey分配】-----------新版getAppKey--"+subTaskInfo.getAppKey());
surveyClient=AppContainer.getClientByUserRankAndClinetLeverByAppkey(subTaskInfo.getAppKey(),0);
}
if (surveyClient == null || surveyClient.getClientCode() == null) {// 后面表示没有业务心跳补充属性
// logger.debug("");
logger.warn("【分派预处理】暂无可用的客户端");
logger.debug("");
subTaskInfo.setExeErrorCount(subTaskInfo.getExeErrorCount()+1);
// subTaskInfoQueue.put(subTaskInfo);//没客户处理,则回炉
logger.debug("【分派预处理】原来回炉再次尝试,现在直接返回任务推送失败,再推无意义");
// 自动处理,作为失败子任务返回
JSONObject finishJason = new JSONObject();
finishJason.put("code", "failure");
finishJason.put("msg", "任务推送失败");
JSONObject finishData = new JSONObject();
finishData.put("taskId", subTaskInfo.getTaskInfo().getTaskId());
finishData.put("subTaskId", subTaskInfo.getSubTaskId());
finishData.put("remark", "暂无可用的客户端");
finishJason.put("data", finishData);
logger.debug("【子任务队列消费】:【暂无可用的客户端");
//通用处理完成或者失败的子任务。推送试过了也就当子任务结束了。
updateSubTaskStatus(finishJason);
return;
}
//设置执行子任务的在线客户端,之前有设置过其它的,也会被替换成当前的
String sessionId=surveyClient.getSessionId();
logger.debug("【分派预处理】策略选出的客户端sessionId为:" +sessionId );
subTaskInfo.setSurveyClient(surveyClient);
subTaskInfo.setClientCode(surveyClient.getClientCode());//标识最后一次使用的客户端
//一个在线客户端下所有的任务中加入此任务。用于客户端下线后,更新下面的子任务
List<SubTaskInfo> subTaskInfoList =clientSubTaskInfoMap.get(sessionId);
if(subTaskInfoList==null) subTaskInfoList=new ArrayList<SubTaskInfo>();
subTaskInfoList.add(subTaskInfo);
// 交线程池执行分派子任务
Future<String> future = executor.submit(new AssignTask(subTaskInfo));
String exeResult = "";
try {
exeResult = future.get(15L, TimeUnit.SECONDS);
} catch (TimeoutException|ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.warn("【分派预处理】出错e:" + e.getMessage());
exeResult = "failure";
} finally {
logger.info("【分派预处理】子任务分派结果为:" + exeResult);
boolean isSubTaskSendOk = "success".equals(exeResult);
//推送后的维护
clientSubTaskInfoMap.get(sessionId).remove(subTaskInfo);//从当前客户端下移除子任务
subTaskInfo.setSurveyClient(null);//子任务清除客户端
if(!isSubTaskSendOk){
logger.info("【分派预处理】子任务分派不成功,重新回队列subTaskId:" + subTaskInfo.getSubTaskId());
subTaskInfo.setExeErrorCount(subTaskInfo.getExeErrorCount()+1);
subTaskInfoQueue.put(subTaskInfo);//不成功,则回炉
}else{
subTaskInfo.setAsignStatus("success");//表示分配成功,等结果了
if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType()))
subTaskInfoQueue.put(subTaskInfo);//如果成功的,并且是线上的,进行超时处理。
}
//对主任务进行状态标识
TaskInfo taskInfo = subTaskInfo.getTaskInfo();
synchronized (taskInfo) {
TaskInfo.modifyTaskInfoPush(taskInfo, isSubTaskSendOk);
}
}
}
```
Future<String> future = executor.submit(new AssignTask(subTaskInfo));中的线程池任务。
```java
/**
* <P>线程池执行发送任务</P>
*
* @author liujun
* @date 2018年1月15日 下午5:36:48
*/
static class AssignTask implements Callable<String> {
private SubTaskInfo subTaskInfo;
public AssignTask(SubTaskInfo subTaskInfo) {
this.subTaskInfo = subTaskInfo;
}
@Override
public String call() throws Exception {
// Thread.sleep(3000);
logger.info("【推送线程任务】任务执行...");
// if(true) return "success";//测试
// String body = subTaskInfo.getParaObj().toString();
//推送的子任务,只包括总任务ID,子任务ID,其它都是一个json中。
SubTaskData subTaskData=new SubTaskData();
subTaskData.setTaskId(subTaskInfo.getTaskInfo().getTaskId());
subTaskData.setSubTaskId(subTaskInfo.getSubTaskId());
subTaskData.setSubTaskType(subTaskInfo.getSubTaskType());
subTaskData.setQueryJsonStr(subTaskInfo.getParaObj().toString());
//这两个用于异步任务时,客户端按里面的IP,PORT发结果消息上来。
//不管是什么,都加上这个。如果多个中间层,那要按这个回复信息。
subTaskData.setServerIp(ContainerInit.getInstance().appContainer.getServerIp());
subTaskData.setServerPort(ContainerInit.getInstance().appContainer.getServerPort());
String body =(JsonUtils.toString(subTaskData));
logger.debug("----------------【推送子任务】--------------------body:"+body);
// String appCode = subTaskInfo.getAppCode();
if (subTaskInfo.getSurveyClient() == null) {
logger.info("【推送线程任务】任务未设置执行客户端:" + subTaskInfo.getDescription());
return "failure";
}
String clientSessionId=subTaskInfo.getSurveyClient().getSessionId();
logger.info("【推送任务任务】推送目标sissionId:" + clientSessionId);
boolean bln = ServerPushHandler.pushBySessionId(subTaskInfo.getSurveyClient().getSessionId(), "assignTaskToClient", body, new ServerPushTaskCallback());
logger.info("【推送任务任务】bln:"+bln);
// boolean bln =
// ServerPushHandler.pushByAppKey(subTaskInfo.getSurveyClient().getClientCode(),
// "assignTaskToClient", body);// 消息推送,推送所有服务器
//不管成功不成功,去除子任务与动态客户端的关联(成功就不要关联了,不成功也应该换其它的客户端了)
// List thisClientSubTaskList=clientSubTaskInfoMap.get(clientSessionId);
// if(thisClientSubTaskList==null) thisClientSubTaskList=new ArrayList<SubTaskInfo>();
// logger.info("【推送任务推送】当前session下的子任务数:" + thisClientSubTaskList.size());
// if (thisClientSubTaskList.size() > 0) {
// thisClientSubTaskList.remove(subTaskInfo);
// subTaskInfo.setSurveyClient(null);
// }
// Map<String, List<SubTaskInfo>>
//注意:【在返回值的furturn中处理移除或者再入队的操作】
if (bln) {
logger.info("【推送任务推送】子任务成功");
subTaskInfo.setAsignStatus("success");
return "success";
} else {// 推送失败
logger.error("【推送任务推送】子任务推送失败:" + subTaskInfo.getSubTaskId());
subTaskInfo.setAsignStatus("failure");
//按说客户端下线可以事件中移除,但偶尔出现没有移除,所以在这里将无法推送的,
//将这个不可用的channel的的客户端移除
//注意:【在返回值的furturn中处理移除或者再入队的操作】
Map<String, SurveyClient> surveyClientMap=AppContainer.onlineClientMap.get(subTaskInfo.getSurveyClient().getSurveyApp().getAppKey());
if(surveyClientMap.containsKey(subTaskInfo.getSurveyClient().getSessionId())){
logger.info("【推送失败移除客户端】sessionId:"+subTaskInfo.getSurveyClient().getSessionId());
surveyClientMap.remove(subTaskInfo.getSurveyClient().getSessionId());// 根据通讯客户端,移除里面的调查客户端对象
}
return "failure";
}
}
}
```
#### 接收客户端任务完成的handler
将完成情况告诉配置进来的监听器,监听器会发起更新子任务的相关操作。
```java
@Override
public MiddleMsg handleMsgEvent(MsgEvent dm, MiddleMsg msg) {
// TODO Auto-generated method stub
String body = msg.getBody() + "";
String code = "";
logger.debug("【得到Client子任务返回结果】API返回子任务结果:" + body);
SurveyResponse td = new SurveyResponse();
try {
JSONObject tdObject = JsonUtils.toJSONObject(body);
// 子任务失败原因
subTaskListener.onSubTaskFinished(tdObject);
code = "success";
} catch (Exception e) {
e.printStackTrace();
code = "failure";
logger.error("【得到Client子任务返回结果】背调中心消息处理任务完成消息失败!异常:" + e.toString());
}
td.setCode(code);
msg.setBody(td);
return msg;
}
```
监听后发起的更新操作:
```java
/**
* <P>
* 根据监听的子任务完成结果,更新任务的状态,都完成后有可能从总任务表中移除,并且调用外部接口,进行持久化操作。
* </P>
*
* @param finishJason
*/
public void updateSubTaskStatus(JSONObject finishJason) {
// 根据子任务完成情况,修改子任务的状态,以及主任务的状态
logger.info("【修改子任务的完成状态】:"+finishJason.toString());
try {
String code = finishJason.getString("code");
String msg = finishJason.getString("msg");
String subTaskId = finishJason.getJSONObject("data").getString("subTaskId");
String taskId = finishJason.getJSONObject("data").getString("taskId");
String remark = (finishJason.getJSONObject("data").containsKey("remark"))? finishJason.getJSONObject("data").getString("remark"):"";
// String clientCode = finishJason.getString("clientCode");
TaskInfo taskInfo=taskInfoMap.get(taskId);
if(taskInfo==null){
logger.info("【修改子任务的完成状态】内存中已经没有这个主任务了!taskId:"+taskId);
//如果需要,找不到主任务了,还可以直接入库
//如果非线上任务,有可能内存中没有了,因为非线上,内存中存在的时间太长了,占用比较大
//是否都完成,以及上报都在接口中实现
if(taskPersistenceInterface!=null) taskPersistenceInterface.modifySubTask2DB(finishJason);//子任务入库
return;
}
SubTaskInfo subTaskInfo = taskInfo.getSubTaskMap().get(subTaskId);
// if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType())){
// logger.info("【修改子任务的完成状态】线上子任务不能人工处理!subTaskInfo.getSubTaskType():"+subTaskInfo.getSubTaskType());
// //如果需要,找不到主任务了,还可以直接入库
// return;
// }
//1.【收到】
if("received".equals(code)) {
logger.info("【修改子任务的完成状态,并移除】子任务为异步的,对方已经收到!code:"+code);
subTaskInfo.setStatus(Constants.TASK_DOING);
subTaskInfo.setDescription("子应用已经收到子任务");
synchronized (taskInfo) {
taskInfo.setReceivedAsyncTask(taskInfo.getReceivedAsyncTask()+1);
}
//这里啥也不做。因为是异步的,只是对方已经收到了。
return;
}
if(Constants.TASK_FAILURE.equals(subTaskInfo.getStatus()) || Constants.TASK_DONE.equals(subTaskInfo.getStatus()) ){
//这里啥也不做。可能总任务检测时设置了结果
return;
}
subTaskInfo.setSurveyClient(null);
// subTaskInfo.setUpdateTime(new Date());//数据持久化时再设置
//2.【不成功】 默认成功。成功时...(一定是执行且成功的)
if(!"success".equals(code)) {
logger.info("【修改子任务的完成状态,并移除】子任务推送或者执行出错了!返回code:"+code);
subTaskInfo.setStatus(Constants.TASK_FAILURE);
subTaskInfo.setDescription(msg+remark);//中文失败与原因
//这里是否加入出错队列再处理?还是先入库,以后从库中加载呢?都可以,目前先入。如果推送失败的,已经推过多次了,如果执行失败的,先不再推送了。
// return;
}//3.【成功】
else{
logger.info("【修改子任务的完成状态,并移除】子任务执行成功!code:"+code);
String hasData = (finishJason.getJSONObject("data").containsKey("hasData"))? finishJason.getJSONObject("data").getString("hasData"):"";
Integer dataCount =0;
try {
dataCount = (finishJason.getJSONObject("data").containsKey("dataCount"))? finishJason.getJSONObject("data").getInt("dataCount"):0;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
subTaskInfo.setHasData(hasData);
subTaskInfo.setDataCount(dataCount);
subTaskInfo.setStatus(Constants.TASK_DONE);
}
if(taskPersistenceInterface!=null) taskPersistenceInterface.modifySubTask2DB(subTaskInfo);//子任务入库
synchronized (taskInfo) {//计算完成数
//执行数据+1(包括推失败的,对方收到的异步的),完成数要看成功才成。
TaskInfo.modifyTaskInfoFinish(taskInfo, "success".equals(code));
// logger.info("【修改子任务的完成状态,并移除】总任务【移除前】有:"+taskInfoMap.size());
// logger.info("【修改主任务的状态,并可能移除】当前主任务的-总|执|完|线:" + taskInfo.getTotalTask() + "|" + taskInfo.getExecuteTask()+ "|" + taskInfo.getCompleteTask()+ "|" + taskInfo.getOnlineTask());
}
//del--->当执行数与线上数一样时。线上都完成了。就持久化,但不移除。当与总数一样时,持久化并移除。
//都执行了就移除,并持久化。但如果不全是线上的,置的状态不一样
if (taskInfo.getExecuteTask().intValue() == taskInfo.getTotalTask().intValue()) {
//【移除的情况:】当线上数与总数一样的时候,全完成了。就从内存中移除,并且调外部接口类进行持久化。线下子任务没有超时机制,可能一直接没反馈,由主任务总超时处理。
// if (taskInfo.getOnlineTask().intValue() == taskInfo.getTotalTask().intValue() || taskInfo.getExecuteTask().intValue() == taskInfo.getTotalTask().intValue() ) {
//如果没有异步的任务
if (taskInfo.getReceivedAsyncTask()==0 ) {
//主任务状态为:
boolean isAllSubtaskOk=taskInfo.getCompleteTask().intValue()==taskInfo.getExecuteTask();
taskInfo.setStatus(isAllSubtaskOk?Constants.TASK_DONE:Constants.TASK_FAILURE);
// if(taskInfo.getCompleteTask().intValue() == taskInfo.getExecuteTask().intValue()) taskInfo.setStatus(Constants.TASK_FAILURE);
// taskInfoMap.remove(taskInfo.getTaskId());// 从任务记录中移除
// if(taskPersistenceInterface!=null) taskPersistenceInterface.modifyTask2DB(taskInfo);//主任务入库
logger.info("【修子任务的完成状态,全部完成并移除】移除任务id:" + taskInfo.getTaskId());
}else{
//【持久化】都移除,线下不适合在内存中长时间放。
taskInfo.setStatus(Constants.TASK_DOING);
logger.info("【修子任务的完成状态,只持久化线上部分】任务id:" + taskInfo.getTaskId()+"。此时收到线下任务回复数为:"+taskInfo.getReceivedAsyncTask());
}
taskInfoMap.remove(taskInfo.getTaskId());// 从任务记录中移除(线下的不适合长时间放在内存中)
logger.info("【修改任务的完成状态,全部完成并移除】有完成后移除。总任务【移除后】有:" + taskInfoMap.size());
if(taskPersistenceInterface!=null) taskPersistenceInterface.modifyTask2DB(taskInfo);//主任务入库
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.info("【修改子任务的状态,并移除】出错:"+e.toString());
}
}
```
#### 其它方法介绍
监听客户端下线时(客户端管理里也有去监听),将其它持有的子任务中标识的客户端置空,并移除重新选择客户端。
```
public synchronized void updateSubTaskInfoByOffline(String clientSessionId)
```
定时任务:清理、失败任务没超时,没超重试次数时的再次执行。
```java
/**
* <P>守护线程中定时会清理超时的主任务。(线上子任务等待结果超时在队列里处理,线下超时不处理,由主任务超时处理)</P>
*/
public void dealTimeoutTask(TaskInfo taskInfo)
/**
* <P>用于处理阻塞对列里的子任务,再扔进线程池。</P>
* 1.反复处理再分配的子任务,一定次数后超时。主要是原通讯没返回sessionId,监听移除有问题,所以多试几次。
* 每次试如果不能会真正移除不在线的客户端。
* 2.处理线上子任务,已经推送出去了,状态发生变化。反复检测是不是超时没有收到反馈。有反馈的会设置子任务状态,就不再入队了。
* @author liujun
* @date 2018年1月22日 上午9:59:10
*/
private class SubTaskRedo implements Runnable
```
此外,还有从持久层加载可以再执行的任务,提供人工控制的接口方法等。
# 4. 中间层的设计
## 4.1 作为客户端的核心处理类AsClientContainer
这个对接前面提到的SS。
```java
public class AsClientContainer {
private static final Logger logger = Logger.getLogger(AsClientContainer.class);
public static final long THEARTBEAT_INTERVAL = 3 * 1000L;
private static JSONArray REMORTSERVERS=new JSONArray();
public static Map<String,JSONObject> REMORTSERVERS_MAP =new HashMap<String,JSONObject>();
/*子任务存放*/
// public volatile static Map<String, JSONObject> subTaskInfoMap = new ConcurrentHashMap<String, JSONObject>();
/**使用阻塞队列,放置所有要处理的子任务*/
public volatile static BlockingQueue<JSONObject> subTaskInfoQueue = new LinkedBlockingQueue<JSONObject>();
/**
* 需要人工干预的子任务池
*/
public volatile static Map<String, JSONObject> subTaskInfoMap = new ConcurrentHashMap<String,JSONObject>();
/*同步对象存放*/
public volatile static Map<String, PushFuture<CommonReturnData>> syncKey = new ConcurrentHashMap<String, PushFuture<CommonReturnData>>();
/**用户单例线程锁*/
private static Boolean lockSigleton = true;
/**用户单例对象*/
private static AsClientContainer clientContainer;
public TaskConsumer taskConsumer = new TaskConsumer();
public TimeoutSubTask timeoutSubTask = new TimeoutSubTask();
public OfflineSendInterface offlineSendInterface;
public DateFormatInterface dateFormatInterface;
/** 失败的任务用线程池 */
private static ExecutorService executor = Executors.newCachedThreadPool();
private ScheduledExecutorService executorTimeout = Executors.newScheduledThreadPool(1);
public static AtomicLong subTaskNum = new AtomicLong();
```
任务消费,对重试任务,线上任务,人工任务都分别进行处理,人工任务的处理由外部提供处理类,真正实现由dubbo完成。
```java
private class TaskConsumer implements Runnable {
@Override
public void run() {
while (true) {
try {
//因为这个队列中都是出问题的子任务,所以要等待一下处理。
Thread.sleep(1500);// 调节频率,过快容易撑死~~
// logger.debug("【子任务队列】的任务数1:" + taskNum);
JSONObject subtaskInfo=subTaskInfoQueue.take();
logger.debug("【子任务队列消费】取出子任务JASON:"+subtaskInfo.toString());
String subTaskType=subtaskInfo.containsKey("subTaskType")?subtaskInfo.getString("subTaskType"):null;
//!!!!!!检查这个子任务是还否可以复制之前的结果,如果可以就复制出来,返回一个成功的结果。
//这里交冯实现的接口,由于工作变动,还没出来。
// logger.debug("【子任务队列】的任务数2:" + subTaskInfoQueue.size());
//如果重试了50次或者超时了5分钟,那么子任务失败吧
long reDoTime=new Date().getTime()-subtaskInfo.getLong("startDate");
logger.debug("reDoTime:"+reDoTime+"。testnum:"+subtaskInfo.getInt("testNum"));
if(subtaskInfo.getInt("testNum")>Integer.parseInt(MiddleConfig.getFailureSubTaskMaxRetryTimes()) || (reDoTime>Long.parseLong(MiddleConfig.getFailureSubTaskMaxRetryTimes())) ){
logger.debug("【子任务队列消费】子任务超时失败:"+subtaskInfo.getString("subTaskId"));
logger.debug("【子任务队列消费】子任务超时失败,尝试次数为:"+subtaskInfo.getInt("testNum"));
AsClientContainer.subTaskInfoMap.remove(subtaskInfo.getString("subTaskId"));
//通用处理完成或者失败的子任务
Thread.sleep(1000);// 调节频率,过快容易撑死~~
String isAsync=subtaskInfo.containsKey("isAsync")? subtaskInfo.getString("isAsync"):null;
//1.【推失败了,如果是异步的,就发消息给服务端】
if("async".equals(isAsync) || !Constants.TASK_TYPE_ONLINE.equals(subTaskType)){
// AsClientContainer.sendTaskAsyncResult2Server(asyncServerCode,finishJason);
//如果异步调用失败。
logger.debug("【推送异步任务】超时了,发消息给服务器");
JSONObject finishJason=new JSONObject();
finishJason.put("code", "failure");
finishJason.put("msg", "OFFLINE_RPC_FAIL中间层任务调用C端失败");
finishJason.put("data", subtaskInfo);//这里面有ip/port用于异步。
//如果是线下的推送或者调用失败了,只持久化到本地,再重试。或者超时。不可以迅速返回失败的。
sendTaskAsyncResult2Server(finishJason);
return;
}
//2.【如果是线上任务推送失败,设置同步等待对象。】
JSONObject finishJason=new JSONObject();
finishJason.put("code", "failure");
finishJason.put("msg", "中间层任务推送失败");
JSONObject finishData=new JSONObject();
finishData.put("taskId", subtaskInfo.getString("taskId"));
finishData.put("subTaskId", subtaskInfo.getString("subTaskId"));
finishData.put("remark", "重试了"+subtaskInfo.getInt("testNum")+"次,用时"+reDoTime+"ms");
finishJason.put("data", finishData);
PushFuture<CommonReturnData> responseFuture = AsClientContainer.syncKey.get(subtaskInfo.getString("subTaskId"));
if(responseFuture!=null){
CommonReturnData response=new CommonReturnData();
response.setCode("failure");
response.setMsg("任务超时失败");
response.setData(finishData);
responseFuture.setResponse(response);
logger.debug("【推送任务任务】超时了,设置同步对象的返回值");
}
else{
logger.debug("【推送任务任务】设置超时时,同步对象已经被移除。");
}
}
else//如果是正常处理子任务
{
//如果非线上任务,就走外部接口(注入的实现类)发出去(实现类会持久化,再发的)
logger.debug("subTaskType:"+subTaskType+"。offlineSendInterface:"+offlineSendInterface);
if(!Constants.TASK_TYPE_ONLINE.equals(subTaskType)){
try {
if(offlineSendInterface!=null){
offlineSendInterface.sendOfflineQuery(subtaskInfo);
}else{
logger.warn("【找不到外部(非线上子任务)调用的接口】");
throw new SurveyException("找不到外部(非线上子任务)调用的接口实现类");
}
logger.info("【推送任务任务】成功推送非线上任务到C端!");
} catch (SurveyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.warn("【推送任务任务】推送非线上任务到C端失败!");
ReDoTask reDoTask = new ReDoTask(subtaskInfo);
executor.submit(reDoTask);
}
}
else //如果是线上的,就推送出去。
{
SurveyClient surveyClient = AsServerContainer.getClientByUserRankAndClinetLever(0);
// 如果找到策略的客户端
if (surveyClient != null && surveyClient.getSessionId() != null) {
// 开始推送
String body = JsonUtils.toString(subtaskInfo);
String clientSessionId = surveyClient.getSessionId();
logger.info("【推送任务任务】推送目标sissionId:" + clientSessionId);
boolean bln = ServerPushHandler.pushBySessionId(clientSessionId, "assignTaskToClient", body, new MiddlePushClientTaskCallback());
logger.info("【推送任务任务】bln:" + bln);
if (bln) {
logger.info("【推送任务任务】成功!");
// 推成功了,但一直不返回,也是个问题。不过同步对象会被移除的。
} else {
logger.warn("【推送任务任务】推送失败!");
ReDoTask reDoTask = new ReDoTask(subtaskInfo);
executor.submit(reDoTask);
}
} else {// 没有可用的客户端
logger.info("【推送任务任务】bln:没有可用的客户端,直接失败返回。次数:" + subtaskInfo.getInt("testNum"));
ReDoTask reDoTask = new ReDoTask(subtaskInfo);
executor.submit(reDoTask);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
```
## 4.2 作为服务的核心处理类AsServerContainer
```java
public class AsServerContainer {
private static final Logger logger = Logger.getLogger(AsServerContainer.class);
/** 实时总任务信息(taskid---TaskInfo(SubTaskInfoMap)) */
public static volatile Map<String, SurveyClient> onlineClientMap=new ConcurrentHashMap<String,SurveyClient>();
public boolean isMiddlewearStarted=false;
/** 任务在内存中允许的最大存放数 */
// private static Integer maxTaskMapSize=Integer.MAX_VALUE;
/**
* 可接入的类型列表,目前一个中间层只支持一种类型的接口接入。(同一类型的app都一样,不同的不一样)
*/
private static List<ClientApp> clientAppList=new ArrayList<ClientApp>();
// static{
// //设置有效的客户端
//
// }
/**
* 失败子任务重复处理次数、与超时处理的时间
*/
public static int maxReDealSubTaskTimes=20;
/**
* 失败子任务重复处理次数、与超时处理的时间
*/
public static long maxTimeoutSubTaskTime=60000L;
/**主任务超时时间*/
public static long maxTimeoutTaskTime=150000L;
/**
* 一个配置的client下的实时子任务信息(sessionId-List<SubTaskInfo>)
* 用sessionId方便应对底层的上下线变化。中间件的事件只能得到sessionId,没有ClientCode。
*/
// public volatile static Map<String, List<SubTaskInfo>> clientSubTaskInfoMap = new ConcurrentHashMap<String, List<SubTaskInfo>>();
/**用户单例线程锁*/
private static Boolean lockSigleton = true;
private static AsServerContainer asServerContainer;
```
# 5. 客户端的设计
## 5.1 核心类的设计
属性如下,主要功能有子任务执行与客户端业务心跳。
```java
/**
* 背调中间层容器-管理通讯层并处理背调任务
* @author liujun
*/
public class ClientContainer {
private static final Logger logger = Logger.getLogger(ClientContainer.class);
public static final long THEARTBEAT_INTERVAL = 3 * 1000L;
/**appKey-同类的客户端相同*/
private String appkey;
/**appSec-同类的客户端相同*/
private String appSecret;
/**客户端标识Code*/
private String clientCode;
/**所连接服务器IP*/
private String serverIP;
/**此客户端的权重*/
private String weight;
/**通讯层客户端*/
Client client = null;
/**子任务处理接口对象*/
ApiInvorkerInterface apiInvorkerInterface;
/**监听器-监听通讯层客户端状态*/
ClientStatusListener clientStatusListener;
/**客户端上报状态传输对象*/
ClientRealData clientRealData;
/**客户端是否连接状态标识*/
private boolean isConnected = false;
private Object lock=new Object();
// ScheduledExecutorService service = Executors.newScheduledThreadPool(1);//不需要线程池,只要一个循环的守护线程
/**用户单例线程锁*/
private static Boolean lockSigleton = true;
/**用户单例对象*/
private static ClientContainer clientContainer;
```
## 5.2 背调任务执行
每一个包装的Web客户端,要实现这个接口来做具体任务。
```java
/**
* 调用第三方功能的接口,请各个第三方接口应用实现些接口
* <P></P>
* @author liujun
* @date 2018年1月12日 下午5:10:02
*/
public interface ApiInvorkerInterface {
/**
* 根据请求参数与所选择的一组app产生任务并处理
* <P></P>
* @param paras
* @param appCode
*/
public CommonReturnData dealSubTaskByApi(JSONObject taskData) throws SurveyException;
}
```
# 6. 其它
## 6.1 与Web应用的整合
外部Web应用提供数据持久化与具体任务执行等接口的实现。实现都是spring容器中的类,由一个统一管理的@Componet类@Autowired这些实现,在其afterPropetiesSet方法时启动核心业务组件,并按提供的接口注入实现类。当然也可以考虑都交给spring管理。
## 6.2 子任务相关Web工程
近20个客户端我做了一个
例子工程,后来交办出去发现在clone工程,于是我要求只用一个工程,各种实现类通过配置的不同进行加载。
## 6.3 反思
- 除了前面提到的通讯层更加独立外,细节问题不少,比如线程池使用不够规范。
-
- 有些参数不完全确定就用jason。一些参数要根据测试从外部配置进来。
-
- 内部系统的启动停止最好由smartCycle控制,这里用afterPropetiesSet只管启动,没有优雅的停止。
-
- 有些优化还需要相关应用与人员配合才能整体优化,比如通讯层,配置服务。
-
- 如果要高可用,还有非常多的工作要做。
虽然很多问题,但运行稳定,可以顺利的工作。由于手上还有其它任务,比如企业服务,调查发起,定单处理,被查人同意,根据结果费用核算等应用与功能处理。所以除了功能变更,没真正进行重构过。