class="java">import net.sf.json.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.webframe.web.util.WebFrameUtils; /** * * 类功能描述:消息推送核心类 * * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a> * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $ * Create: 2013-7-30 下午03:35:40 */ public class ServerHandler extends IoHandlerAdapter { private Log log = LogFactory.getLog(ServerHandler.class); /** * 接受消息,登陆时执行此方法 */ public void messageReceived(IoSession session, Object message) throws Exception { String msg = message.toString().trim(); if (msg.startsWith("{")) { JSONObject jsonMessage = JSONObject.fromObject(msg); IProcessor processor = (IProcessor) WebFrameUtils.getBean(jsonMessage.getString("ty")); processor.process(jsonMessage, session); } log.info("<-------------------- Received Message: " + msg + "--------------------->"); } @Override public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); log.info("<--------------------MINA Server Connection Created--------------------->"); } @Override public void sessionOpened(IoSession session) throws Exception { super.sessionOpened(session); session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1); //读写 通道均在1800 秒内无任何操作就进入空闲状态 log.info("<--------------------MINA Server Connection Opened--------------------->"); } @Override public void sessionClosed(IoSession session) throws Exception { // this.removeUserBySessionId(session); session.close(true); log.info("<--------------------MINA Server Connection Closed--------------------->"); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { this.removeUserBySessionId(session); session.close(true); log.info("Session Id: " + session.getId() + "" + cause.getMessage()); log.error(cause); } /** * 超出空闲时间执行 */ @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { this.removeUserBySessionId(session); session.close(true); log.info("<--------------------Session Idle--------------------->"); } /** * * @function:移除在线列表,关闭session * @param session * @author: mengqingyu 2013-7-30 下午03:35:22 */ private void removeUserBySessionId(IoSession session) { SocketManager.getInstance().removeUserBySessionId(session.getId()); } } import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.mina.core.session.IoSession; /** * * 类功能描述:在线用户管理工具类 * * @author <a href="mailto:taibin.tang@berheley.com">tangtaibin </a> * @version $Id: SocketManager.java,v 1.18 2011/01/26 08:18:49 tangtaibin Exp $ * Create: 2010-10-19 下午02:29:31 */ public class SocketManager { private Log log = LogFactory.getLog(SocketManager.class); public static final String USER_HOST = "host"; public static final String USER_LOGIN_TIME = "logintime"; public static final String SESSION_ID = "sessionId"; public static final String MESSAGE_END = "@$$@"; private static final SocketManager manager = new SocketManager(); private final Map<String, IoSession> online = new HashMap<String, IoSession>(); private SocketManager() { } public static SocketManager getInstance() { return manager; } public Map<String, IoSession> getOnline() { return online; } /** * * @function:添加用户信息到在线用户列表 * @param username * @param session * @param moduleName * @author: mengqingyu 2013-7-30 下午03:38:47 */ public void addUserToOnlineMap(String username, IoSession session, String moduleName) { InetSocketAddress remoteAddress = (InetSocketAddress) session.getRemoteAddress(); session.setAttribute(SESSION_ID, session.getId()); session.setAttribute(USER_HOST, remoteAddress.getAddress().getHostAddress()); session.setAttribute(USER_LOGIN_TIME, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); session.setAttribute("userId", username); this.online.put(username, session); } /** * * @function:从在线用户列表删除用户信息 * @param username * @author: mengqingyu 2013-7-30 下午03:38:38 */ public void removeUserFromOnlineMap(String username) { IoSession session = (IoSession) this.online.get(username); this.online.remove(username); session.close(true); } /** * * @function:根据sessionId删除用户 * @param sessionId * @author: mengqingyu 2013-7-30 下午03:38:10 */ public void removeUserBySessionId(long sessionId) { for(Entry<String, IoSession> entry:online.entrySet()){ String username = entry.getKey(); IoSession session = entry.getValue(); if (sessionId == (Long)session.getAttribute(SESSION_ID)) { log.info("Exception from " + sessionId + " <--> " + username + "<--> is connected:" + session.isConnected()); this.removeUserFromOnlineMap(username); break; } } } /** * 获取在线用户列表 * * @function: * @return * @author: tangtaibin 2010-10-19 下午02:31:06 */ public List<String> getOnlineUsers() { Set<String> keySet = this.online.keySet(); return new ArrayList<String>(keySet); } /** * 获取用户Session * * @function: * @param userid * @return * @author: liujuan 2011-1-7 上午09:08:16 */ public IoSession getUserSession(String username) { return this.online.get(username); } /** * 给某个用户发送消息 * * @function: * @param userid * @param msg * @author: tangtaibin 2010-10-19 下午02:31:25 */ public synchronized void sendToUser(String username, String msg) { IoSession session = this.online.get(username); log.info("MSG To: " + msg); try { session.write(msg + MESSAGE_END); log.info("MSG: " + msg); } catch (Exception e) { log.error(e); } } } import java.util.List; import java.util.Map; import net.sf.json.JSONObject; import org.apache.mina.core.session.IoSession; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.webframe.web.util.WebFrameUtils; import com.berheley.bi.basic.exp.BusinessException; import com.berheley.bi.basic.timer.ITimerJob; import com.berheley.oa.nio.AbstractProcessor; import com.berheley.yq.common.IMonitorService; /** * * 类功能描述:用户登陆处理类 * * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a> * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $ * Create: 2013-7-30 下午03:41:22 */ @Component("webMessage_login") public class LoginProcessor extends AbstractProcessor { @Autowired IMonitorService monitorService; @Override public void process(JSONObject jsonMessage, IoSession session) { // 用户登陆 final String username = jsonMessage.getString("sd"); log.info("【用户已登录username:】" + username); IoSession ioSession = manager.getUserSession(username); // 将用户加入在线用户列表 manager.addUserToOnlineMap(username, session, "webMessge"); if(ioSession == null){ List<Map<String,Object>> beanNames = monitorService.getBeanNames(); for(Map<String,Object> beanName:beanNames){ ITimerJob timerJob = (ITimerJob)WebFrameUtils.getBean(beanName.get("beanName").toString()); beanName.put("currentUser", username); try { timerJob.jtaTimerExecute(null, beanName); } catch (BusinessException e) { e.printStackTrace(); log.error("登陆时消息提醒错误:"+e.getMessage()); } } } } } import net.sf.json.JSONObject; import org.apache.mina.core.session.IoSession; import org.springframework.stereotype.Component; import com.berheley.oa.nio.AbstractProcessor; /** * * 类功能描述:消息提醒处理器 * * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a> * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp $ * Create: 2013-7-3 下午04:04:50 */ @Component("webMessage") public class WebMessageProcessor extends AbstractProcessor { @Override public void process(JSONObject jsonMessage, IoSession session) { if (jsonMessage != null) { IoSession userSession = manager.getUserSession(jsonMessage.getString("sender")); if (userSession != null) { userSession.write(jsonMessage); log.info("***********************【页面提醒】:" + jsonMessage); } } } }