? ? ?Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。
? ? ? ?Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
? ? ? Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
?
? ? ? ? ? ? ? ? ? ? ?图 1 ? ? ?类结构图
?
?
package cn.chenkangxian.nioconcurrent; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.LinkedList; import java.util.List; /** * @Project: testNio * * @Author: chenkangxian * * @Annotation: 使用线程池来处理大量channel并发 * * @Date:2011-7-5 * * @Copyright: 2011 chenkangxian, All rights reserved. * */ public class SelectSocketsThreadPool extends SelectSockets { private static final int MAX_THREADS = 5; private ThreadPool pool = new ThreadPool(MAX_THREADS); /** * 从socket中读数据 */ protected void readDataFromSocket(SelectionKey key) throws Exception { WorkerThread worker = pool.getWorker(); if (worker == null) { return; } worker.serviceChannel(key); } /** * * @Project: concurrentnio * * @Author: chenkangxian * * @Annotation:线程池 * * @Date:2011-7-20 * * @Copyright: 2011 chenkangxian, All rights reserved. * */ private class ThreadPool { List idle = new LinkedList(); /** * 线程池初始化 * * @param poolSize 线程池大小 */ ThreadPool(int poolSize) { for (int i = 0; i < poolSize; i++) { WorkerThread thread = new WorkerThread(this); thread.setName("Worker" + (i + 1)); thread.start(); idle.add(thread); } } /** * 获得工作线程 * * Author: chenkangxian * * Last Modification Time: 2011-7-20 * * @return */ WorkerThread getWorker() { WorkerThread worker = null; synchronized (idle) { if (idle.size() > 0) { worker = (WorkerThread) idle.remove(0); } } return (worker); } /** * 送回工作线程 * * Author: chenkangxian * * Last Modification Time: 2011-7-20 * * @param worker */ void returnWorker(WorkerThread worker) { synchronized (idle) { idle.add(worker); } } } private class WorkerThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; WorkerThread(ThreadPool pool) { this.pool = pool; } public synchronized void run() { System.out.println(this.getName() + " is ready"); while (true) { try { this.wait();//等待被notify } catch (InterruptedException e) { e.printStackTrace(); this.interrupt(); } if (key == null) {//直到有key continue; } System.out.println(this.getName() + " has been awakened"); try { drainChannel(key); } catch (Exception e) { System.out.println("Caught '" + e + "' closing channel"); try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; this.pool.returnWorker(this); } } synchronized void serviceChannel(SelectionKey key) { this.key = key; //消除读的关注 key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); this.notify(); } void drainChannel(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; buffer.clear(); while ((count = channel.read(buffer)) > 0) { buffer.flip(); while (buffer.hasRemaining()) { channel.write(buffer); } buffer.clear(); } if (count < 0) { channel.close(); return; } //重新开始关注读事件 key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.selector().wakeup(); } } public static void main(String[] args) throws Exception { new SelectSocketsThreadPool().go(args); } }?
?
package cn.chenkangxian.nioconcurrent; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * * @Project: concurrentnio * * @Author: chenkangxian * * @Annotation: * * @Date:2011-7-11 * * @Copyright: 2011 chenkangxian, All rights reserved. * */ public class SelectSockets { public static int PORT_NUMBER = 1234; private ByteBuffer buffer = ByteBuffer.allocate(1024); public static void main(String[] args) throws Exception { new SelectSockets().go(args); } public void go(String[] args) throws Exception{ int port = PORT_NUMBER; // if(args.length > 0){ // port = Integer.parseInt(args[0]); // } // System.out.println("Listening on port " + port); ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverChannel.socket(); Selector selector = Selector.open(); serverSocket.bind(new InetSocketAddress(port)); serverChannel.configureBlocking(false); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); //没有轮询,单个selector if(n == 0){ continue; } Iterator it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = (SelectionKey)it.next(); if(key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); registerChannel(selector,channel ,SelectionKey.OP_READ); sayHello(channel); } if(key.isReadable()){ readDataFromSocket(key); } it.remove(); } } } /** * 在selector上注册channel,并设置interest * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param selector 选择器 * * @param channel 通道 * * @param ops interest * * @throws Exception */ protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception{ if(channel == null){ return ; } channel.configureBlocking(false); channel.register(selector, ops); } /** * 处理有可用数据的通道 * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param key 可用通道对应的key * * @throws Exception */ protected void readDataFromSocket(SelectionKey key) throws Exception{ SocketChannel socketChannel = (SocketChannel)key.channel(); int count; buffer.clear(); //Empty buffer while((count = socketChannel.read(buffer)) > 0){ buffer.flip(); while(buffer.hasRemaining()){ socketChannel.write(buffer); } buffer.clear(); } if(count < 0){ socketChannel.close(); } } /** * 打招呼 * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param channel 客户端channel * * @throws Exception */ private void sayHello(SocketChannel channel) throws Exception{ buffer.clear(); buffer.put("Hello 哈罗! \r\n".getBytes()); buffer.flip(); channel.write(buffer); } }?
?
?
?
?