?
1 单线程模型,echo过程。
?
客户端
当req大小超过服务端的receive buffer时就会抛出buffer不足的异常。
客户端执行过程
AbstractPollingIoConnector.connect0(SocketAddress, SocketAddress, IoSessionInitializer<? extends ConnectFuture>)-》
NioSocketConnector.connect(SocketChannel, SocketAddress)-》
使用线程池ThreadPoolExecutor执行AbstractPollingIoConnector.Connector<T, H>
-》在connector中执行就绪选择NioSocketConnector.select(int)-》
NioSocketConnector.selectedHandles() ?从中拿到select key进而得到关联的socket channel-》
拿到socket channel 后再放到这个方法里处理AbstractPollingIoConnector.processConnections(Iterator<H>)-》将socket channel封装成Niosession ,并关联一个IoProcessor(SimpleIoProcessorPool), NioSocketConnector.newSession(IoProcessor<NioSession>, SocketChannel) ?-》IoProcessor.add(IoSession session)添加至?SimpleIoProcessorPool中-》SimpleIoProcessorPool.getProcessor(T session)-》AbstractPollingIoProcessor.add(T session)
-》AbstractPollingIoProcessor.startupProcessor()-》使用线程池ThreadPoolExecutor启动AbstractPollingIoProcessor$Processor,这个就是反应器-》AbstractPollingIoProcessor.select(long timeout)反应器采用轮询的方式进行就绪选择,选择器注册了读事件-》如果有读事件的话,那么执行?AbstractPollingIoProcessor.process(T session)。
?
?
IoConnector connector = new NioSocketConnector(); StringBuffer req = new StringBuffer(); int length = 2048 / 26; for (int i = 0; i < length; i++) { // 26 byte req.append("hello server-"); } System.out.println(req.length() * 2 + " bytes"); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS .getValue(), LineDelimiter.WINDOWS.getValue()))); connector.setHandler(new ClientHandler(req.toString())); connector.connect(new InetSocketAddress("localhost", 9123)); while (true) { Thread.sleep(10000); }
?
public class ClientHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(ClientHandler.class); private final String values; public ClientHandler(String values) { this.values = values; } @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); log.info("The response message received length is [" + str.length() * 2 + "bytes ]"); log.info("The response message received is [" + str + "]"); } @Override public void sessionOpened(IoSession session) { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened"); session.write(values); } @Override public void sessionCreated(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated"); } @Override public void sessionClosed(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { System.out.println("IDLE " + session.getIdleCount(status)); } @Override public void messageSent(IoSession session, Object message) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent"); } }
?
?
public class ClientHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(ClientHandler.class); private final String values; private final int totalNum; private AtomicInteger finichNum = new AtomicInteger(); private long start; public ClientHandler(String values, int totalNum) { this.values = values; this.totalNum = totalNum; } @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); log.info("The response message received length is [" + str.length() * 2 + "bytes ]"); log.info("The response message received is [" + str + "]"); if (finichNum.addAndGet(1) == totalNum) { System.out.println(System.currentTimeMillis() - start); } } @Override public void sessionOpened(IoSession session) { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened"); session.write(values); if (finichNum.get() == 0) { start = System.currentTimeMillis(); } } @Override public void sessionCreated(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated"); } @Override public void sessionClosed(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { System.out.println("IDLE " + session.getIdleCount(status)); } @Override public void messageSent(IoSession session, Object message) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent"); } }?
?
?服务端
服务端执行过程:
AbstractIoAcceptor.bind(SocketAddress... addresses)-》AbstractPollingIoAcceptor.bindInternal(List<? extends SocketAddress> localAddresses)-》启动AbstractPollingIoAcceptor.startupAcceptor()-》使用线程池启动AbstractIoService.executeWorker(Runnable worker) ,接收器AbstractPollingIoAcceptor.Acceptor-》接收器轮询对接收事件进行就绪选择-》如果有连接进来,从NioSocketAcceptor.selectedHandles()拿到select key中拿到ServerSocketChannel-》执行AbstractPollingIoAcceptor.accept(IoProcessor<T> processor,?H?handle)-》ServerSocketChannel.accept()在从中拿到SocketChannel-》通过NioSocketSession构造iosession-》SimpleIoProcessorPool.add(T session)处理器池中拿到NioProcessor-》AbstractPollingIoProcessor.add(T session)-》启动反应器AbstractPollingIoProcessor.startupProcessor()-》轮询进行读就绪选择NioProcessor.select(long timeout)-》触发读就绪事件AbstractPollingIoProcessor.process(T session)-》AbstractPollingIoProcessor.read(T session)-》IoFilterChain.fireMessageReceived(Object message) 进入过滤器流程,过滤器在构造接收器的时候就已经拼接好了(服务端先产生一个单例acceptor,触发读写事件后,acceptor再从SimpleIoProcessorPool(池里的AbstractPollingIoProcessor个数一般为cpu核数+1,如果业务处理无阻塞,那么这是最优的线程数)池里拿到一个AbstractPollingIoProcessor并创建一个与之关联的单例AbstractPollingIoProcessor.Processor
,在没有ExecutorFilter的情况下,反应器Processor和业务处理处于同一线程,属于单线程模型)。-》服务器接收到请求,那么在MyIoHandler.messageReceived(IoSession session, Object message)中读取,并发出响应IoSession.write(Object message)-》直接进入过滤器链IoFilterChain.fireFilterWrite(WriteRequest writeRequest)-》IoFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》进入编码解码器ProtocolCodecFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》最后一个环节,进行真正写操作DefaultIoFilterChain.HeadFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》放入写队列WriteRequestQueue.offer(IoSession session, WriteRequest writeRequest)-》iosession又拿起他的处理器池SimpleIoProcessorPool.getProcessor(T session)-》得到处理器执行AbstractPollingIoProcessor.flush(T session),flush并没有真正执行网络写操作,仅仅是唤醒就绪选择器Selector.wakeup()-》AbstractPollingIoProcessor.flush(long currentTime)-》AbstractPollingIoProcessor.writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)-》NioProcessor.write(NioSession session, IoBuffer buf, int length)最后调用WritableByteChannel.write(ByteBuffer src)进行真正网络io操作(就绪选择器的写就绪是在write完之后触发的,也就是说我们要触发某个网络写操作,并不是依靠就绪选择器的write事件,而要自己创造条件,例如在反应器里面,通过判断写事件队列来决定是否进行网络写,就绪选择器的write事件是我们主动write产生的结果,可以参考mina,zookeeper的网络通信实现)。
?
IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS .getValue(), LineDelimiter.WINDOWS.getValue()))); acceptor.setHandler(new MyIoHandler()); acceptor.bind(new InetSocketAddress(9123)); while (true) { Thread.sleep(10000); }?
?
public class MyIoHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); log.info("The request message received length is [" + str.length() * 2 + "bytes ]"); log.info("The request message received is [" + str + "]"); if (str.endsWith("quit")) { session.close(true); return; } session.write(message); } @Override public void sessionCreated(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated"); } @Override public void sessionClosed(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed"); } @Override public void sessionOpened(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // System.out.println("IDLE " + session.getIdleCount(status)); } @Override public void messageSent(IoSession session, Object message) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent"); } }
?
?
public class MyIoHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); log.info("The request message received length is [" + str.length() * 2 + "bytes ]"); log.info("The request message received is [" + str + "]"); if (str.endsWith("quit")) { session.close(true); return; } session.write(message); } @Override public void sessionCreated(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated"); } @Override public void sessionClosed(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed"); } @Override public void sessionOpened(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // System.out.println("IDLE " + session.getIdleCount(status)); } @Override public void messageSent(IoSession session, Object message) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent"); } }?
服务端网络io状态变迁:接收器accept事件就绪-》sessionCreated-》sessionOpened-》flushingSessions为空-》AbstractPollingIoProcessor.Processor读事件就绪-》messageReceived-》IoSession.write(Object message)-》flushingSessions.add(session),flushingSessions remove,session.getChannel().write(buf.buf()),客户端触发messageReceived-》?AbstractPollingIoProcessor.Processor写事件就绪触发-》flushingSessions.add(session),flushingSessions remove ,AbstractPollingIoProcessor.writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime),fireMessageSent—》消息发送成功后messageSent。
flushingSessions充当写操作的一个同步器,决定何时执行写相关操作,并且当if (!buf.hasRemaining() ||
? ? ? ? ? ? ? ? !hasFragmentation && localWrittenBytes != 0) {
? // Buffer has been sent, clear the current request.
才会触发fireMessageSent。
?
客户端与之类似,唯一的区别在于一个connector,一个acceptor,用于前期的tcp连接建立;AbstractPollingIoProcessor.Processor用于连接成功后的数据读写操作。
?
?
?
2 多线程模型
客户端
?
每次connect成功都会产生一个连接session.
?
ExecutorService threadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < threadNum; i++) { threadPool.execute(new ClientConnetion(connector)); }?
?
class ClientConnetion implements Runnable { private IoConnector connector; public ClientConnetion(IoConnector connector) { this.connector = connector; } @Override public void run() { connector.connect(new InetSocketAddress("localhost", PORT)); } }
?
?
服务端
如果业务线程无阻塞,那么默认的配置n(cpu个数)+1个AbstractPollingIoProcessor.Processor 线程数量为最优,在同一个线程里处理io和业务。
当业务线程阻塞,那么就应该采用多线程模型。
单机环境下,客户并发10个请求,服务端服务计算时间为100ms,阻塞时间为1000ms,处理完100个请求总时间为48797ms。
?
在理想条件下,最优线程数可以通过公式(阻塞时间/计算时间+1)*cpu核数得出。上述情况比较合适的服务线程数为20个。在同样的环境下,服务端采用了多线程模型,处理完100个请求的总时间为 15593?ms。
(其他数据,5个线程,处理时间为22984ms;7个,22266ms;10个,17906ms;15个,16984;20个,15593;22个,16906;25个,15969;30个,17782ms;35个,17891。因为是非理想环境,所以测试数据有所偏差,仅供参考)
如下
?
?
IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.getFilterChain().addLast("executor1", new ExecutorFilter(20, 20); acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS .getValue(), LineDelimiter.WINDOWS.getValue()))); acceptor.setHandler(new MyIoHandler()); acceptor.bind(new InetSocketAddress(9123)); while (true) { Thread.sleep(10000); }
?
?
业务处理器
?
public class MyIoHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); log.info("The request message received length is [" + str.length() * 2 + "bytes ]"); log.info("The request message received is [" + str + "]"); if (str.endsWith("quit")) { session.close(true); return; } cptTime(100); blkTime(1000); session.write(message); } private void cptTime(long time) { long num = 1000000 * time; int temp = 0; // 100 ms // long start = System.currentTimeMillis(); for (int i = 0; i < num; i++) { temp++; } // System.out.println(System.currentTimeMillis() - start); } private void blkTime(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void sessionCreated(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated"); } @Override public void sessionClosed(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed"); } @Override public void sessionOpened(IoSession session) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // System.out.println("IDLE " + session.getIdleCount(status)); } @Override public void messageSent(IoSession session, Object message) throws Exception { log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent"); } }
?
使用 acceptor.getFilterChain().addLast("executor1", new ExecutorFilter(22, 22))就可以实现SEDA模型。把不同阶段的业务处理过程都抽象为一个stage,每个stage都有一个包含最优线程数量的线程池,每个stage之间通过一个事件队列串接起来,ExecutorFilter就是实现的关键环节。