?
1 单线程模型,echo过程。
?
客户端
当req大小超过服务端的receive buffer时就会抛出buffer不足的异常。
客户端执行过程
AbstractPollingIoConnector.connect0(SocketAddress, SocketAddress, IoSessionInitializer<? extends ConnectFuture>)-》
NioSocketConnector.connect(SocketChannel, SocketAddress)-》
使用线程池ThreadPoolExecutor执行AbstractPollingIoConnector.Connector<T, H>
-》在connector中执行就绪选择NioSocketConnector.select(int)-》
NioSocketConnector.selectedHandles() ?从中拿到select key进而得到关联的socket channel-》
拿到socket channel 后再放到这个方法里处理AbstractPollingIoConnector.processConnections(Iterator<H>)-》将socket channel封装成Niosession ,并关联一个IoProcessor(SimpleIoProcessorPool), NioSocketConnector.newSession(IoProcessor<NioSession>, SocketChannel) ?-》IoProcessor.add(IoSession session)添加至?SimpleIoProcessorPool中-》SimpleIoProcessorPool.getProcessor(T session)-》AbstractPollingIoProcessor.add(T session)
-》AbstractPollingIoProcessor.startupProcessor()-》使用线程池ThreadPoolExecutor启动AbstractPollingIoProcessor$Processor,这个就是反应器-》AbstractPollingIoProcessor.select(long timeout)反应器采用轮询的方式进行就绪选择,选择器注册了读事件-》如果有读事件的话,那么执行?AbstractPollingIoProcessor.process(T session)。
?
?
IoConnector connector = new NioSocketConnector();
StringBuffer req = new StringBuffer();
int length = 2048 / 26;
for (int i = 0; i < length; i++) {
// 26 byte
req.append("hello server-");
}
System.out.println(req.length() * 2 + " bytes");
connector.setConnectTimeoutMillis(30000);
connector.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS
.getValue(), LineDelimiter.WINDOWS.getValue())));
connector.setHandler(new ClientHandler(req.toString()));
connector.connect(new InetSocketAddress("localhost", 9123));
while (true) {
Thread.sleep(10000);
}
?
public class ClientHandler extends IoHandlerAdapter {
private final static Logger log = LoggerFactory.getLogger(ClientHandler.class);
private final String values;
public ClientHandler(String values) {
this.values = values;
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
log.info("The response message received length is [" + str.length() * 2 + "bytes ]");
log.info("The response message received is [" + str + "]");
}
@Override
public void sessionOpened(IoSession session) {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened");
session.write(values);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
System.out.println("IDLE " + session.getIdleCount(status));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent");
}
}
?
?
public class ClientHandler extends IoHandlerAdapter {
private final static Logger log = LoggerFactory.getLogger(ClientHandler.class);
private final String values;
private final int totalNum;
private AtomicInteger finichNum = new AtomicInteger();
private long start;
public ClientHandler(String values, int totalNum) {
this.values = values;
this.totalNum = totalNum;
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
log.info("The response message received length is [" + str.length() * 2 + "bytes ]");
log.info("The response message received is [" + str + "]");
if (finichNum.addAndGet(1) == totalNum) {
System.out.println(System.currentTimeMillis() - start);
}
}
@Override
public void sessionOpened(IoSession session) {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened");
session.write(values);
if (finichNum.get() == 0) {
start = System.currentTimeMillis();
}
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
System.out.println("IDLE " + session.getIdleCount(status));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent");
}
}
?
?
?服务端
服务端执行过程:
AbstractIoAcceptor.bind(SocketAddress... addresses)-》AbstractPollingIoAcceptor.bindInternal(List<? extends SocketAddress> localAddresses)-》启动AbstractPollingIoAcceptor.startupAcceptor()-》使用线程池启动AbstractIoService.executeWorker(Runnable worker) ,接收器AbstractPollingIoAcceptor.Acceptor-》接收器轮询对接收事件进行就绪选择-》如果有连接进来,从NioSocketAcceptor.selectedHandles()拿到select key中拿到ServerSocketChannel-》执行AbstractPollingIoAcceptor.accept(IoProcessor<T> processor,?H?handle)-》ServerSocketChannel.accept()在从中拿到SocketChannel-》通过NioSocketSession构造iosession-》SimpleIoProcessorPool.add(T session)处理器池中拿到NioProcessor-》AbstractPollingIoProcessor.add(T session)-》启动反应器AbstractPollingIoProcessor.startupProcessor()-》轮询进行读就绪选择NioProcessor.select(long timeout)-》触发读就绪事件AbstractPollingIoProcessor.process(T session)-》AbstractPollingIoProcessor.read(T session)-》IoFilterChain.fireMessageReceived(Object message) 进入过滤器流程,过滤器在构造接收器的时候就已经拼接好了(服务端先产生一个单例acceptor,触发读写事件后,acceptor再从SimpleIoProcessorPool(池里的AbstractPollingIoProcessor个数一般为cpu核数+1,如果业务处理无阻塞,那么这是最优的线程数)池里拿到一个AbstractPollingIoProcessor并创建一个与之关联的单例AbstractPollingIoProcessor.Processor
,在没有ExecutorFilter的情况下,反应器Processor和业务处理处于同一线程,属于单线程模型)。-》服务器接收到请求,那么在MyIoHandler.messageReceived(IoSession session, Object message)中读取,并发出响应IoSession.write(Object message)-》直接进入过滤器链IoFilterChain.fireFilterWrite(WriteRequest writeRequest)-》IoFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》进入编码解码器ProtocolCodecFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》最后一个环节,进行真正写操作DefaultIoFilterChain.HeadFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》放入写队列WriteRequestQueue.offer(IoSession session, WriteRequest writeRequest)-》iosession又拿起他的处理器池SimpleIoProcessorPool.getProcessor(T session)-》得到处理器执行AbstractPollingIoProcessor.flush(T session),flush并没有真正执行网络写操作,仅仅是唤醒就绪选择器Selector.wakeup()-》AbstractPollingIoProcessor.flush(long currentTime)-》AbstractPollingIoProcessor.writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)-》NioProcessor.write(NioSession session, IoBuffer buf, int length)最后调用WritableByteChannel.write(ByteBuffer src)进行真正网络io操作(就绪选择器的写就绪是在write完之后触发的,也就是说我们要触发某个网络写操作,并不是依靠就绪选择器的write事件,而要自己创造条件,例如在反应器里面,通过判断写事件队列来决定是否进行网络写,就绪选择器的write事件是我们主动write产生的结果,可以参考mina,zookeeper的网络通信实现)。
?
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS
.getValue(), LineDelimiter.WINDOWS.getValue())));
acceptor.setHandler(new MyIoHandler());
acceptor.bind(new InetSocketAddress(9123));
while (true) {
Thread.sleep(10000);
}
?
?
public class MyIoHandler extends IoHandlerAdapter {
private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
log.info("The request message received length is [" + str.length() * 2 + "bytes ]");
log.info("The request message received is [" + str + "]");
if (str.endsWith("quit")) {
session.close(true);
return;
}
session.write(message);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// System.out.println("IDLE " + session.getIdleCount(status));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent");
}
}
?
?
public class MyIoHandler extends IoHandlerAdapter {
private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
log.info("The request message received length is [" + str.length() * 2 + "bytes ]");
log.info("The request message received is [" + str + "]");
if (str.endsWith("quit")) {
session.close(true);
return;
}
session.write(message);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// System.out.println("IDLE " + session.getIdleCount(status));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent");
}
}
?
服务端网络io状态变迁:接收器accept事件就绪-》sessionCreated-》sessionOpened-》flushingSessions为空-》AbstractPollingIoProcessor.Processor读事件就绪-》messageReceived-》IoSession.write(Object message)-》flushingSessions.add(session),flushingSessions remove,session.getChannel().write(buf.buf()),客户端触发messageReceived-》?AbstractPollingIoProcessor.Processor写事件就绪触发-》flushingSessions.add(session),flushingSessions remove ,AbstractPollingIoProcessor.writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime),fireMessageSent—》消息发送成功后messageSent。
flushingSessions充当写操作的一个同步器,决定何时执行写相关操作,并且当if (!buf.hasRemaining() ||
? ? ? ? ? ? ? ? !hasFragmentation && localWrittenBytes != 0) {
? // Buffer has been sent, clear the current request.
才会触发fireMessageSent。
?
客户端与之类似,唯一的区别在于一个connector,一个acceptor,用于前期的tcp连接建立;AbstractPollingIoProcessor.Processor用于连接成功后的数据读写操作。
?
?
?
2 多线程模型
客户端
?
每次connect成功都会产生一个连接session.
?
ExecutorService threadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < threadNum; i++) { threadPool.execute(new ClientConnetion(connector)); }?
?
class ClientConnetion implements Runnable {
private IoConnector connector;
public ClientConnetion(IoConnector connector) {
this.connector = connector;
}
@Override
public void run() {
connector.connect(new InetSocketAddress("localhost", PORT));
}
}
?
?
服务端
如果业务线程无阻塞,那么默认的配置n(cpu个数)+1个AbstractPollingIoProcessor.Processor 线程数量为最优,在同一个线程里处理io和业务。
当业务线程阻塞,那么就应该采用多线程模型。
单机环境下,客户并发10个请求,服务端服务计算时间为100ms,阻塞时间为1000ms,处理完100个请求总时间为48797ms。
?
在理想条件下,最优线程数可以通过公式(阻塞时间/计算时间+1)*cpu核数得出。上述情况比较合适的服务线程数为20个。在同样的环境下,服务端采用了多线程模型,处理完100个请求的总时间为 15593?ms。
(其他数据,5个线程,处理时间为22984ms;7个,22266ms;10个,17906ms;15个,16984;20个,15593;22个,16906;25个,15969;30个,17782ms;35个,17891。因为是非理想环境,所以测试数据有所偏差,仅供参考)
如下
?
?
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor.getFilterChain().addLast("executor1", new ExecutorFilter(20, 20);
acceptor.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS
.getValue(), LineDelimiter.WINDOWS.getValue())));
acceptor.setHandler(new MyIoHandler());
acceptor.bind(new InetSocketAddress(9123));
while (true) {
Thread.sleep(10000);
}
?
?
业务处理器
?
public class MyIoHandler extends IoHandlerAdapter {
private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
log.info("The request message received length is [" + str.length() * 2 + "bytes ]");
log.info("The request message received is [" + str + "]");
if (str.endsWith("quit")) {
session.close(true);
return;
}
cptTime(100);
blkTime(1000);
session.write(message);
}
private void cptTime(long time) {
long num = 1000000 * time;
int temp = 0;
// 100 ms
// long start = System.currentTimeMillis();
for (int i = 0; i < num; i++) {
temp++;
}
// System.out.println(System.currentTimeMillis() - start);
}
private void blkTime(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionCreated");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionClosed");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " sessionOpened");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// System.out.println("IDLE " + session.getIdleCount(status));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + " messageSent");
}
}
?
使用 acceptor.getFilterChain().addLast("executor1", new ExecutorFilter(22, 22))就可以实现SEDA模型。把不同阶段的业务处理过程都抽象为一个stage,每个stage都有一个包含最优线程数量的线程池,每个stage之间通过一个事件队列串接起来,ExecutorFilter就是实现的关键环节。