class="java" name="code"> package com.robustel.rlink.device.service.impl; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.jms.Destination; import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort.Direction; import org.springframework.data.domain.Sort.Order; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import com.robustel.iot.data.share.entity.DeviceModuleData; import com.robustel.pl.util.constant.DeviceConstantTemplate; import com.robustel.pl.util.thread.QunarThreadPoolExecutor; import com.robustel.pl.util.utils.AppUtils; import com.robustel.pl.util.utils.JsonUtil; import com.robustel.rlink.control.entity.DeviceCommandRequest; import com.robustel.rlink.control.enums.CommandType; import com.robustel.rlink.device.entity.Device; import com.robustel.rlink.device.enums.DeviceStatusEnum; import com.robustel.rlink.device.service.RegisterResponseSenderService; import com.robustel.rlink.device.vo.ModelData; /** * 定时处理任务-- * @author jfn * */ public class DeviceOverTimeHandler { public static DeviceOverTimeHandler handler; static Logger logger = LoggerFactory.getLogger(DeviceOverTimeHandler.class); @Autowired private MongoTemplate mongoTemplate; @Autowired private RegisterResponseSenderService registerResponseSenderService; @PostConstruct public void init() { handler = this; handler.mongoTemplate = this.mongoTemplate; handler.registerResponseSenderService = this.registerResponseSenderService; } static String mdRealTime = DeviceConstantTemplate.module_data_real_time_set; static Integer betch = 50; static Integer threadNum = 30; public static final String CliTopicPerfix="sys_cli."; public static final String CmdTopicPerfix="sys_ctrl."; private static List<ModelData> operaSet = Collections.synchronizedList(new ArrayList<ModelData>(betch)); static CountDownLatch startSignal = new CountDownLatch(1); static CountDownLatch doneSignal = new CountDownLatch(5); private static ExecutorService pool = Executors.newFixedThreadPool(1024); private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); private QunarThreadPoolExecutor qunarThreadPoolExecutor = new QunarThreadPoolExecutor(50, 200, 5, TimeUnit.MINUTES, queue); private static Long deviceCount = 0L; private static Query query = null; private static volatile Integer num = 0; public void overTimeProcess(){ logger.info("模块数据下发定时任务启动..."+handler.mongoTemplate); makeThreadPools(); } public void makeThreadPools(){ logger.info("线程池初始化..."); query = new Query(); query.addCriteria(Criteria.where("flag").is(-1)); deviceCount = handler.mongoTemplate.count(query,ModelData.class, mdRealTime); while(deviceCount -betch*num > 0){ query = new Query(); query = query.skip(betch*num).limit(betch); List<ModelData> data = handler.mongoTemplate.find(query,ModelData.class, mdRealTime); logger.info("查询数据 ...{}",data); if(AppUtils.isNotBlank(data)){ Worker worker = new Worker(data); qunarThreadPoolExecutor.execute(worker); } num++; } } public static void addTasks(){ while(operaSet.size()<=0 && deviceCount -betch*num > 0){ query = query.skip(betch*num).limit(betch); List<ModelData> data = handler.mongoTemplate.find(query,ModelData.class, mdRealTime); if(AppUtils.isBlank(data)){ /**for(Thread t : pools){ t.stop(); } pools.clear();**/ pool.shutdown(); logger.info("线程池退出...."); } operaSet.addAll(data); num++; } } public static void doWork(List<ModelData> operaSet){ logger.info("working......"); if(AppUtils.isBlank(operaSet)||operaSet.size()<=0){ logger.info("待处理数据为空,线程退出..."); return; } logger.info("待处理模块数据 {}",operaSet); for(ModelData md : operaSet){ logger.info("dowork object {}" ,md); sendModuleData(md); } } private static void sendModuleData(ModelData md){ logger.info("modelData is {}",md); Query query = new Query(); query.addCriteria(Criteria.where("_id").is(md.getSn())); Device dev = handler.mongoTemplate.findOne(query,Device.class, DeviceConstantTemplate.device_list_collection_name); ModelData data = handler.mongoTemplate.findOne(query,ModelData.class, DeviceConstantTemplate.module_data_real_time_set); if(data==null||data.getFlag().equals(1)||dev==null || dev.getDeviceOnLineStatus().equals(DeviceStatusEnum.OFF_LINE.value())){ return; } DeviceCommandRequest cmd = new DeviceCommandRequest(); cmd.setSn(md.getSn()); cmd.setTime(new Date().getTime()); cmd.setId(md.getCommId()); cmd.setCmd(CommandType.ConfigModuleData.getType()); cmd.setData(md.getPropers()); Destination topic = new ActiveMQTopic(CmdTopicPerfix+md.getSn()); handler.registerResponseSenderService.sendMQTT(topic, JsonUtil.javaObjToJson(cmd)); Update update = new Update(); update.set("flag", 1); update.set("sendTime", new Date().getTime()); handler.mongoTemplate.updateFirst(query, update, ModelData.class,mdRealTime); query.addCriteria(Criteria.where("commId").is(md.getCommId())); handler.mongoTemplate.updateFirst(query, update, ModelData.class,DeviceConstantTemplate.module_data_set); } static class Worker implements Runnable{ List<ModelData> operaSet; public Worker(List<ModelData> operaSet){ this.operaSet = operaSet; } public void run() { logger.info("worker run function ..."); doWork(operaSet); } } private void deviceOffline(Device dev) { if(AppUtils.isBlank(dev)){ return; } Query query2 = new Query(); Criteria cnd2 = Criteria.where("sn").is(dev.getSn()); query2.addCriteria(cnd2).with(new Sort(new Order(Direction.DESC,"time"))); DeviceModuleData module = handler.mongoTemplate.findOne(query2, DeviceModuleData.class, DeviceConstantTemplate.module_data_real_time); if(AppUtils.isBlank(module)){ //超时(10分钟)没有上传数据的设备将会被下线 upateStatus(module,dev,dev.getDeviceUpdateTime()); }else{ upateStatus(module,dev,module.getTime()); } } private void upateStatus(DeviceModuleData module,Device dev,Long time) { if(time==null) return; if((new Date().getTime()-time)>10*60*1000 && dev.getDeviceOnLineStatus().equals(DeviceStatusEnum.ON_LINE.value())){ Update update = new Update(); update.set("deviceOnLineStatus", DeviceStatusEnum.OFF_LINE.value()); update.set("deviceLastLoginTime", dev.getOnTime()); update.set("deviceUpdateTime", new Date().getTime()); update.set("offTime", new Date().getTime()); if(AppUtils.isNotBlank(dev.getOffTime())){ update.set("deviceLastOffLineTime", dev.getOffTime()); } Query qUpte = new Query(); qUpte.addCriteria(Criteria.where("_id").is(dev.getSn())); handler.mongoTemplate.updateFirst(qUpte, update, Device.class,DeviceConstantTemplate.device_list_collection_name); } } public MongoTemplate getMongoTemplate() { return mongoTemplate; } public void setMongoTemplate(MongoTemplate mongoTemplate) { this.mongoTemplate = mongoTemplate; } public RegisterResponseSenderService getRegisterResponseSenderService() { return registerResponseSenderService; } public void setRegisterResponseSenderService( RegisterResponseSenderService registerResponseSenderService) { this.registerResponseSenderService = registerResponseSenderService; } }
package com.robustel.pl.util.thread; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * * @author jingfangnan * */ public class QunarThreadPoolExecutor extends ThreadPoolExecutor { // 记录每个线程执行任务开始时间 private ThreadLocal<Long> start = new ThreadLocal<Long>(); // 记录所有任务完成使用的时间 private AtomicLong totals = new AtomicLong(); // 记录线程池完成的任务数 private AtomicInteger tasks = new AtomicInteger(); public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } /** * 每个线程在调用run方法之前调用该方法 * */ protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); start.set(System.currentTimeMillis()); } /** * 每个线程在执行完run方法后调用该方法 * */ protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); tasks.incrementAndGet(); totals.addAndGet(System.currentTimeMillis() - start.get()); } @Override protected void terminated() { super.terminated(); System.out.println("完成"+ tasks.get() +"个任务,平均耗时: [" + totals.get() / tasks.get() + "] ms"); } }