LZ最近离职,闲着也是闲着,打算梳理下
公司做的是电商,CTO打算把2.0系统用java 语言开发,LZ目前不打算做java,所以 选择离职。离职前,在公司负责的最后一个项目 供应链系统。
系统分为 3套子系统:
1 供应链工作平台(即用户操作平台):采用CS架构,Sqlite做缓存。
2 消息中心: 后台程序,采用mina.net,scoket 长连接 保证服务消息的 推送,后台消息的提醒,和 系统对最新订单的缓存。
3 WindowsService 监控消息中心,保证消息中心 随系统的开启而启动
mina 简介:
Apache Mina Server 是一个网络通信应用框架,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架,Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是Java NIO 作为底层支持)操作的编程模型。
mina.net 是Apache Mina Server 的.net 版本 主要用于系统的长连接通信
安装:
PM> Install-Package Mina
mina.net 主要对象:
1.AsyncSocketConnector: 发起链接
2.IoSession:mina 链接创建成功之后 客户端,服务的的数据传送
3.IoHandlerAdapter:适配器类。可以扩展
4.DemuxingProtocolCodecFactory:构建协议编码工厂
适配器主要方法:
1 MessageReceived:收到消息时触发
2.MessageSent 发送消息后触发
3.SessionClosed 关闭Session时 触发
4.SessionCreated 创建Session时 触发
5.ExceptionCaught 发生异常时 触发
6.SessionIdleSession 空闲时 触发
实现思路:
创建mina链接:
class="brush:csharp;gutter:true;">public void StartProcess(LoginContext config) { AsyncSocketConnector connector = new Mina.Transport.Socket.AsyncSocketConnector(); //注册协议编解码器工厂 connector.FilterChain.AddLast("encoding", new ProtocolCodecFilter(new MyMinaCodecFactory())); //指定服务端IP 和端口号 connector.DefaultRemoteEndPoint = new IPEndPoint(IPAddress.Parse(MinaConfig.Ip), MinaConfig.Port); //初始化 消息处理类 var headerDic = CreateHeader(); //继承IoHandlerAdapter构建适配器 MinaMessageHandler headler = new MinaMessageHandler(config, connector, headerDic); connector.Handler = headler; while (true) { try { //ClientHandler //建立链接 session = connector.Connect().Await().Session; break; } catch (Exception ex) { _Log.Error(ex.Message, ex); Thread.Sleep(1000); } } }
链接建立成功之后,触发mina.net 内部机制的SessionCreated 方法,登录用户
public override void SessionCreated(Mina.Core.Session.IoSession session) { try { MyBaseMessage message = new LoginRequestMessage(ClientConfig.ClientAddr,ClientConfig.SharedSecret); (message as LoginRequestMessage).SetAutherString(); session.Write(message); } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.SessionCreated(session); } }
重写MessageReceived方法,收到服务器消息之后,处理相应事件
/// <summary> /// 收到消息时 触发--处理消息,给服务器发送处理结果 /// </summary> /// <param name="session"></param> /// <param name="message"></param> public override void MessageReceived(Mina.Core.Session.IoSession session, object message) { try { if (message is MyBaseMessage) { var m = message as MyBaseMessage; if (HeaderDic.Keys.Any(p=>p==m.GetCommandType())) { var messageHeader = HeaderDic[m.GetCommandType()]; messageHeader.Handle(session,m); } } } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.MessageReceived(session, message); } }
重写 SessionClosed 事件,关闭session时,通知服务器,客户端已关闭链接
/// <summary> /// 关闭Session时 触发-发送关闭消息 /// </summary> /// <param name="session"></param> public override void SessionClosed(Mina.Core.Session.IoSession session) { try { while (true) { try { if (Connector != null) { if (!Connector.Disposed) { session = Connector.Connect().Await().Session; break; } else { break; } } } catch (Exception ex) { Thread.Sleep(1000); } } } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.SessionClosed(session); } }
重写 ExceptionCaught 方法,发生异常时,关闭链接
/// <summary> /// 发生异常时 触发,关闭session 重新登录 /// </summary> /// <param name="session"></param> /// <param name="cause"></param> public override void ExceptionCaught(Mina.Core.Session.IoSession session, Exception cause) { try { session.Close(true); _Log.Error(cause.Message, cause); } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.ExceptionCaught(session, cause); } }
重写 SessionIdle 方法,session空闲时,测试心跳
/// <summary> /// Session 空闲时 发生 /// </summary> /// <param name="session"></param> /// <param name="status"></param> public override void SessionIdle(Mina.Core.Session.IoSession session, Mina.Core.Session.IdleStatus status) { try { MyBaseMessage message = new DetectionMessage(); session.Write(message); } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.SessionIdle(session, status); } }
构建协议编解码器工厂
public class MyMinaCodecFactory : DemuxingProtocolCodecFactory { public MyMinaCodecFactory() { AddMessageEncoder(new MyMinaEncoder()); AddMessageDecoder(new MyMinaDecoder()); } }
编码器工厂,将对象 序列号成 bytes 数据
public class MyMinaEncoder : IMessageEncoder<MyBaseMessage> { public void Encode(IoSession session, MyBaseMessage message, IProtocolEncoderOutput output) { IoBuffer buf = IoBuffer.Allocate(12); buf.AutoExpand = true; var messageBytes = message.EncodeMessage(); buf.Put(messageBytes); buf.Flip(); session.Write(buf); } public void Encode(IoSession session, object message, IProtocolEncoderOutput output) { IoBuffer buf = IoBuffer.Allocate(12); buf.AutoExpand = true; if (message is MyBaseMessage) { var m = message as MyBaseMessage; var messageBytes = m.EncodeMessage(); buf.Put(messageBytes); buf.Flip(); } session.Write(buf); } }
解码器工厂,将字节转换为对象
public class MyMinaDecoder : IMessageDecoder { public ILog _Log = LogManager.GetLogger("MessageHandler"); public MessageDecoderResult Decodable(IoSession session,IoBuffer input) { try { if (input.Remaining < CommandConfig.messageHeaderLength) { return MessageDecoderResult.NeedData; } var headerBytes = new byte[CommandConfig.messageHeaderLength]; for (int i = 0; i < CommandConfig.messageHeaderLength; i++) { headerBytes[i] = input.Get(i); } var lengthBytes = new byte[4]; var commandIdBytes = new byte[4]; var sequenceBytes = new byte[4]; Array.Copy(headerBytes, 0, lengthBytes, 0, 4); Array.Copy(headerBytes, 4, commandIdBytes, 0, 4); Array.Copy(headerBytes, 8, sequenceBytes, 0, 4); var messageLength = lengthBytes.ByteToUint();//Convert.ToInt32(Encoding.Default.GetString(headerBytes, 0, 4)); var messageCommand = commandIdBytes.ByteToUint();//(uint)Convert.ToInt32(Encoding.Default.GetString(headerBytes, 4, 4)); if (messageCommand==CommandConfig.connect || messageCommand == CommandConfig.connectResp || messageCommand == CommandConfig.terminate || messageCommand == CommandConfig.terminateResp || messageCommand == CommandConfig.notify || messageCommand == CommandConfig.notifyResp || messageCommand == CommandConfig.cmppActiveTest || messageCommand == CommandConfig.cmppActiveTestResp) { return MessageDecoderResult.OK; } return MessageDecoderResult.NotOK; } catch (Exception ex) { _Log.Error(ex.Message, ex); return MessageDecoderResult.NeedData; } } }
结语:
博客写的不多,不喜勿碰,谢谢
欢迎指点和纠正