MINA 消息推送核心代码_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > MINA 消息推送核心代码

MINA 消息推送核心代码

 2013/10/21 18:11:39  mengqingyu  程序员俱乐部  我要评论(0)
  • 摘要:importnet.sf.json.JSONObject;importorg.apache.commons.logging.Log;importorg.apache.commons.logging.LogFactory;importorg.apache.mina.core.service.IoHandlerAdapter;importorg.apache.mina.core.session.IdleStatus;importorg.apache.mina.core.session
  • 标签:代码
class="java">import net.sf.json.JSONObject;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.webframe.web.util.WebFrameUtils;

/**
 * 
 * 类功能描述:消息推送核心类
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-7-30 下午03:35:40
 */
public class ServerHandler extends IoHandlerAdapter {

	private Log log = LogFactory.getLog(ServerHandler.class);

	/**
	 * 接受消息,登陆时执行此方法
	 */
	public void messageReceived(IoSession session, Object message) throws Exception {
		String msg = message.toString().trim();
		if (msg.startsWith("{")) {
			JSONObject jsonMessage = JSONObject.fromObject(msg);
			IProcessor processor = (IProcessor) WebFrameUtils.getBean(jsonMessage.getString("ty"));
			processor.process(jsonMessage, session);
		}
		log.info("<-------------------- Received Message: " + msg + "--------------------->");
	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		super.sessionCreated(session);
		log.info("<--------------------MINA Server Connection Created--------------------->");

	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		super.sessionOpened(session);
		session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1); //读写 通道均在1800 秒内无任何操作就进入空闲状态  
		log.info("<--------------------MINA Server Connection Opened--------------------->");
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
//		this.removeUserBySessionId(session);
		session.close(true);
		log.info("<--------------------MINA Server Connection Closed--------------------->");
	}

	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		this.removeUserBySessionId(session);
		session.close(true);
		log.info("Session Id: " + session.getId() + "" + cause.getMessage());
		log.error(cause);
	}

	/**
	 * 超出空闲时间执行
	 */
	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		this.removeUserBySessionId(session);
		session.close(true);
		log.info("<--------------------Session Idle--------------------->");
	}

	/**
	 * 
	 * @function:移除在线列表,关闭session
	 * @param session
	 * @author: mengqingyu    2013-7-30 下午03:35:22
	 */
	private void removeUserBySessionId(IoSession session) {
		SocketManager.getInstance().removeUserBySessionId(session.getId());
	}

}


import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.session.IoSession;

/**
 * 
 * 类功能描述:在线用户管理工具类
 * 
 * @author <a href="mailto:taibin.tang@berheley.com">tangtaibin </a>
 * @version $Id: SocketManager.java,v 1.18 2011/01/26 08:18:49 tangtaibin Exp $
 *          Create: 2010-10-19 下午02:29:31
 */
public class SocketManager {

	private Log log = LogFactory.getLog(SocketManager.class);

	public static final String USER_HOST = "host";
	
	public static final String USER_LOGIN_TIME = "logintime";
	
	public static final String SESSION_ID = "sessionId";

	public static final String MESSAGE_END = "@$$@";

	private static final SocketManager manager = new SocketManager();

	private final Map<String, IoSession> online = new HashMap<String, IoSession>();

	private SocketManager() {
	}

	public static SocketManager getInstance() {
		return manager;
	}

	public Map<String, IoSession> getOnline() {
		return online;
	}

	/**
	 * 
	 * @function:添加用户信息到在线用户列表
	 * @param username
	 * @param session
	 * @param moduleName
	 * @author: mengqingyu    2013-7-30 下午03:38:47
	 */
	public void addUserToOnlineMap(String username, IoSession session, String moduleName) {
		InetSocketAddress remoteAddress = (InetSocketAddress) session.getRemoteAddress();
		session.setAttribute(SESSION_ID, session.getId());
		session.setAttribute(USER_HOST, remoteAddress.getAddress().getHostAddress());
		session.setAttribute(USER_LOGIN_TIME, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
		session.setAttribute("userId", username);
		this.online.put(username, session);
	}

	/**
	 * 
	 * @function:从在线用户列表删除用户信息
	 * @param username
	 * @author: mengqingyu    2013-7-30 下午03:38:38
	 */
	public void removeUserFromOnlineMap(String username) {
		IoSession session = (IoSession) this.online.get(username);
		this.online.remove(username);
		session.close(true);
	}

	/**
	 * 
	 * @function:根据sessionId删除用户
	 * @param sessionId
	 * @author: mengqingyu    2013-7-30 下午03:38:10
	 */
	public void removeUserBySessionId(long sessionId) {
		for(Entry<String, IoSession> entry:online.entrySet()){
			String username = entry.getKey();
			IoSession session = entry.getValue();
			if (sessionId == (Long)session.getAttribute(SESSION_ID)) {
				log.info("Exception from " + sessionId + " <--> " + username + "<--> is connected:" + session.isConnected());
				this.removeUserFromOnlineMap(username);
				break;
			}
		}
	}

	/**
	 * 获取在线用户列表
	 * 
	 * @function:
	 * @return
	 * @author: tangtaibin 2010-10-19 下午02:31:06
	 */
	public List<String> getOnlineUsers() {
		Set<String> keySet = this.online.keySet();
		return new ArrayList<String>(keySet);
	}

	/**
	 * 获取用户Session
	 * 
	 * @function:
	 * @param userid
	 * @return
	 * @author: liujuan 2011-1-7 上午09:08:16
	 */
	public IoSession getUserSession(String username) {
		return this.online.get(username);
	}

	/**
	 * 给某个用户发送消息
	 * 
	 * @function:
	 * @param userid
	 * @param msg
	 * @author: tangtaibin 2010-10-19 下午02:31:25
	 */
	public synchronized void sendToUser(String username, String msg) {
		IoSession session = this.online.get(username);
		log.info("MSG To: " + msg);
		try {
			session.write(msg + MESSAGE_END);
			log.info("MSG: " + msg);
		} catch (Exception e) {
			log.error(e);
		}
	}
}

import java.util.List;
import java.util.Map;

import net.sf.json.JSONObject;

import org.apache.mina.core.session.IoSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.webframe.web.util.WebFrameUtils;

import com.berheley.bi.basic.exp.BusinessException;
import com.berheley.bi.basic.timer.ITimerJob;
import com.berheley.oa.nio.AbstractProcessor;
import com.berheley.yq.common.IMonitorService;

/**
 * 
 * 类功能描述:用户登陆处理类
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-7-30 下午03:41:22
 */
@Component("webMessage_login")
public class LoginProcessor extends AbstractProcessor {
	
	@Autowired
	IMonitorService monitorService;
	
	@Override
	public void process(JSONObject jsonMessage, IoSession session) {
		
		// 用户登陆
		final String username = jsonMessage.getString("sd");
		log.info("【用户已登录username:】" + username);
		IoSession ioSession = manager.getUserSession(username);
		// 将用户加入在线用户列表
		manager.addUserToOnlineMap(username, session, "webMessge");
		if(ioSession == null){
			List<Map<String,Object>> beanNames = monitorService.getBeanNames();
			for(Map<String,Object> beanName:beanNames){
				ITimerJob timerJob = (ITimerJob)WebFrameUtils.getBean(beanName.get("beanName").toString());
				beanName.put("currentUser", username);
				try {
					timerJob.jtaTimerExecute(null, beanName);
				} catch (BusinessException e) {
					e.printStackTrace();
					log.error("登陆时消息提醒错误:"+e.getMessage());
				}
			}
		}
	}
}

import net.sf.json.JSONObject;

import org.apache.mina.core.session.IoSession;
import org.springframework.stereotype.Component;

import com.berheley.oa.nio.AbstractProcessor;

/**
 * 
 * 类功能描述:消息提醒处理器
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-7-3 下午04:04:50
 */
@Component("webMessage")
public class WebMessageProcessor extends AbstractProcessor {
	
	@Override
	public void process(JSONObject jsonMessage, IoSession session) {
		if (jsonMessage != null) {
			IoSession userSession = manager.getUserSession(jsonMessage.getString("sender"));
			if (userSession != null) {
				userSession.write(jsonMessage);
				log.info("***********************【页面提醒】:" + jsonMessage);
			}
		}
	}
}
发表评论
用户名: 匿名