1.? 前言
现在很多做网络通讯中间代理层的通讯都是使用Java1.4以后推出的NIO进行编写,现在还有很多开源的框架也是封装了NIO的书写细节来帮助大家简写异步非阻塞通讯服务。像MySql的代理中间件amoeba-mysql-proxy就是采用NIO的方式处理client端过来的request,之后与Mysql-Server层的通讯也是采用NIO进行命令消息发送的。再看咱们JavaEye首页介绍的项目xmemcached,其中作者Dennis是其xmemcached的开发人,他也是通过NIO的方式与memcached的Server进行异步通讯的,Dennis的另一个项目yanf4j就是一个NIO框架,xmemcache也是借助这个NIO框架实现的异步非阻塞方式的网络通讯,Apache的MINA框架都是NIO的封装再实现。
那么我们就来回顾一下以前的处理方式,来看看为什么现在要使用NIO来进行异步非阻塞方式的通讯吧,网上很多文章都是几句话将NIO和原始的socket通讯的优劣一带而过,我们这次用一个简单的下载大文件的网络服务程序进行说明。使用3种模式来说明,分别是同步单独线程服务运行模式、传统阻塞多线程模式、使用NIO异步非阻塞模式。
我们设置服务器上有一个1.81GB的电影,格式为RMVB。使用Server进行服务监听,客户端请求到Server,建立网络通讯,进行电影下载。
2.? 同步单线程阻塞
使用同步单线程下载,是最原始的socket通讯,服务端的代码如下
package server;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* liuyan
*/
public class FilmServer {
public static void main(String[] args) {
FilmServer ms = new FilmServer();
try {
ms.server();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服务器端响应请求
*
* @throws Exception
*/
public void server() throws Exception {
// 0.建立服务器端的server的socket
ServerSocket ss = new ServerSocket(9999);
while (true) {
// 1.打开socket连接
// 等待客户端的请求
final Socket server = ss.accept();
System.out.println("服务-----------请求开始start");
// 2.打开socket的流信息,准备下面的操作
final InputStream is = server.getInputStream();
byte b[] = new byte[1024];
int readCount = is.read(b);
String str = new String(b);
str = str.trim();
final String serverFileName = str;
// 3.对流信息进行读写操作
System.out.println("客户端传过来的信息是:" + str);
System.out.println("线程" + Thread.currentThread().getName() + "启动");
try {
FileInputStream fileInputStream = new FileInputStream(
serverFileName);
// 3.1 服务器回复客户端信息(response)
OutputStream os = server.getOutputStream();
byte[] bfile = new byte[1024];
// 往客户端写
while (fileInputStream.read(bfile) > 0) {
os.write(bfile);
}
fileInputStream.close();
os.close();
// 4.关闭socket
// 先关闭输入流
is.close();
// 最后关闭socket
server.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("服务-----------请求结束over");
}
}
}
?服务端这么写代码会有什么问题?咱们先来看客户端代码,之后运行后就知道了。
package client;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
/**
* liuyan
* @version 1.0
*/
public class FilmClient {
public static void main(String[] args) {
for (int i = 1; i <= 2; i++) {
Client client = new Client();
client.i = i;
client.start();
}
}
}
class Client extends Thread {
int i;
@Override
public void run() {
// 1.建立scoket连接
Socket client;
try {
client = new Socket("127.0.0.1", 9999);
// 2.打开socket的流信息,准备下面的操作
OutputStream os = client.getOutputStream();
// 3.写信息
os.write(("d://film//2.rmvb").getBytes());
String filmName = "c://io"+i+".rmvb";
FileOutputStream fileOutputStream = new FileOutputStream(filmName);
// 3.1接收服务器端的反馈
InputStream is = client.getInputStream();
byte b[] = new byte[1024];
while(is.read(b)>0){
fileOutputStream.write(b);
}
// 4.关闭socket
// 先关闭输出流
os.close();
// 最后关闭socket
client.close();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
?客户端启动了2个线程进行下载电影的工作,先启动服务端,再运行客户端,会看笔者本地的硬盘C分区到有如下效果。
?可以看到线程2的下载任务一直是0字节,等第一个线程下载完成后呢,线程2的下载任务才能进行。
?
服务端的代码造成的问题就是使用传统的sokect网络通讯,那么另一个客户端的线程请求到server端的时候就发生了阻塞的情况,也就是说,服务端相当一个厕所,厕所就有只有一个坑位,来了一个人,相当于客户端请求,那这个人相当于就把坑位给占了,write操作和read操作会阻塞,这个人还没解决完问题呢,下个人就来了,没办法,哥们儿先在门外等等啊,等前一个客户爽完了再给您提供服务好吧。那么如何解决这个占着坑位不让别人用的情况呢?
3.? 阻塞的多线程
为了解决以上问题,那么之后很多Server肯定不可能像以上程序那么做,不过以前很多Server都是基于单线程服务改造一下,做成多线程的Server的通讯,修改一下上面的Server代码,如下
package server;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
*
*/
public class FilmServerNewThread {
public static void main(String[] args) {
FilmServerNewThread ms = new FilmServerNewThread();
try {
ms.server();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服务器端响应请求
*
* @throws Exception
*/
public void server() throws Exception {
// 0.建立服务器端的server的socket
ServerSocket ss = new ServerSocket(9999);
while (true) {
// 1.打开socket连接
// 等待客户端的请求
final Socket server = ss.accept();
System.out.println("服务-----------请求开始start");
// 2.打开socket的流信息,准备下面的操作
final InputStream is = server.getInputStream();
byte b[] = new byte[1024];
int readCount = is.read(b);
String str = new String(b);
str = str.trim();
final String serverFileName = str;
// 3.对流信息进行读写操作
System.out.println("客户端传过来的信息是:" + str);
if (readCount > 0) {
new Thread() {
@Override
public void run() {
System.out.println("线程"
+ Thread.currentThread().getName() + "启动");
try {
FileInputStream fileInputStream = new FileInputStream(
serverFileName);
// 3.1 服务器回复客户端信息(response)
OutputStream os = server.getOutputStream();
byte[] bfile = new byte[1024];
// 往客户端写
while (fileInputStream.read(bfile) > 0) {
os.write(bfile);
}
fileInputStream.close();
os.close();
// 4.关闭socket
// 先关闭输入流
is.close();
// 最后关闭socket
server.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}.start();
}
System.out.println("服务-----------请求结束over");
}
}
}
?以上的Server就是在原始的socket基础上加了线程,每一个Client请求过来后,整个Server主线程不必处于阻塞状态,接收请求后直接另起一个新的线程来处理和客户端的交互,就是往客户端发送二进制包。这个在新线程中虽然阻塞,但是对于服务主线程没有阻塞的影响,主线程依然通过死循环监听着客户端的一举一动。另一个客户端的线程发起请求后就再起一个新的线程对象去为客户端服务。执行效果如下
?
2个线程互不影响,各自下载各自的。当然从非常严格的意义来讲,str变量在十分高并发的情况下有线程安全问题,这个咱暂且忽略,就着眼于低并发的情况。这个问题是什么呢,就是如果客户端请求比较多了,那么为每一个客户端开辟一个新的线程对象来处理网络传输的请求,需要创建个线程对象,而且这个线程对象从时间上来讲还是处于长连接,这个就比较消费系统资源,这个打开进程管理器就可以看到。而且每一个线程内部都是阻塞的,也没有说完全利用好这个新创建的线程。还拿刚才上厕所举例子,好比现在不止一个坑位了,来了一个用户我这边就按照工程师的厕所坑位图建立一个新的坑位,客户来了,不用等待老坑位,用新创建的坑位就行了。等那个老坑位用完了,自然有垃圾回收器去消灭那个一次性的坑位的,腾出资源位置为了建立新的坑位。长时间连接的意思,相当于这个人上厕所的时间非常长,便秘??需要拉一天才能爽完……老的坑位一时半会儿回收不了,新的坑位需要有空间为其建造茅房以便满足客户端的“急切方便”需要。久而久之,线程数目一多,系统就挂了的概率就增多了(谁也别想上,全玩完了)。
4.? 异步非阻塞
使用JDK1.4的NIO可以适当的解决上面的问题,异步 I/O 是一种 没有阻塞地读写数据的方法。通常,在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样, write() 调用将会阻塞直至数据能够写入。异步 I/O 调用不会阻塞。相反,您将注册对特定 I/O 事件的兴趣 ― 可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,系统将会告诉您。异步 I/O 的一个优势在于,它允许您同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,您可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。还是举上公共厕所例子,虽然这个例子有点臭臭的。您现在有“便便”的需求了,不用那么麻烦,看看公共厕所是否有人占领,也不用给您另起个新坑位,您就拿一根我们服务端定制的容器和一个很粗管子,这个坐便器的大小因您那个地方的尺寸而定,坐便器往您的那个地方一放,再将坐便器和管子一连接,OK,您就敞开了“爽”吧。不用担心,这个管子自然会连接到相应的肥料厂家,将您的排泄物有效回收加以利用的。您完了事,擦擦屁股,关上管子该干嘛还干嘛就行了。另一个人也有这个需求,没问题,每个要我们提供服务的人都用这根管子,和自己的坐便器就行了,管子很粗,谁来连这个管子都行,有多少都行啊。下面我们来看基于NIO的网络下载程序
package server;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
/**
*
* @author liuyan
*
*/
public class NIOServer {
static int BLOCK = 500*1024;
/**
* 处理客户端的内部类,专门负责处理与用户的交互
*/
public class HandleClient {
protected FileChannel channel;
protected ByteBuffer buffer;
String filePath;
/**
* 构造函数,文件的管道初始化
* @param filePath
* @throws IOException
*/
public HandleClient(String filePath) throws IOException {
//文件的管道
this.channel = new FileInputStream(filePath).getChannel();
//建立缓存
this.buffer = ByteBuffer.allocate(BLOCK);
this.filePath = filePath;
}
/**
* 读取文件管道中数据到缓存中
* @return
*/
public ByteBuffer readBlock() {
try {
//清除缓冲区的内容,posistion设置为0,limit设置为缓冲的容量大小capacity
buffer.clear();
//读取
int count = channel.read(buffer);
//将缓存的中的posistion设置为0,将缓存中的limit设置在原始posistion位置上
buffer.flip();
if (count <= 0)
return null;
} catch (IOException e) {
e.printStackTrace();
}
return buffer;
}
/**
* 关闭服务端的文件管道
*/
public void close() {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
protected Selector selector;
protected String filename = "d:\\film\\60.rmvb"; // a big file
protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
protected CharsetDecoder decoder;
//构造服务端口,服务管道等等
public NIOServer(int port) throws IOException {
selector = this.getSelector(port);
Charset charset = Charset.forName("GB2312");
decoder = charset.newDecoder();
}
// 获取Selector
//构造服务端口,服务管道等等
protected Selector getSelector(int port) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
Selector sel = Selector.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
//刚开始就注册链接事件
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}
// 服务启动的开始入口
public void listen() {
try {
for (;;) {
//?
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys()
.iterator();
while (iter.hasNext()) {//首先是最先感兴趣的连接事件
SelectionKey key = iter.next();
//
iter.remove();
//处理事件
handleKey(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 处理事件
protected void handleKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) { // 接收请求
//允许网络连接事件
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
//网络管道准备处理读事件
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 读信息
SocketChannel channel = (SocketChannel) key.channel();
//从客户端读过来的数据块
int count = channel.read(clientBuffer);
if (count > 0) {
//读取过来的缓存进行有效分割,posistion设置为0,保证从缓存的有效位置开始读取,limit设置为原先的posistion上
//这样一来从posistion~limit这段缓存数据是有效,可利用的
clientBuffer.flip();
//对客户端缓存块进行编码
CharBuffer charBuffer = decoder.decode(clientBuffer);
System.out.println("Client >>download>>" + charBuffer.toString());
//对网络管道注册写事件
SelectionKey wKey = channel.register(selector,
SelectionKey.OP_WRITE);
//将网络管道附着上一个处理类HandleClient,用于处理客户端事件的类
wKey.attach(new HandleClient(charBuffer.toString()));
} else{
//如客户端没有可读事件,关闭管道
channel.close();
}
clientBuffer.clear();
} else if (key.isWritable()) { // 写事件
SocketChannel channel = (SocketChannel) key.channel();
//从管道中将附着处理类对象HandleClient取出来
HandleClient handle = (HandleClient) key.attachment();
//读取文件管道,返回数据缓存
ByteBuffer block = handle.readBlock();
if (block != null){
//System.out.println("---"+new String(block.array()));
//写给客户端
channel.write(block);
}else {
handle.close();
channel.close();
}
}
}
public static void main(String[] args) {
int port = 12345;
try {
NIOServer server = new NIOServer(port);
System.out.println("Listernint on " + port);
while (true) {
server.listen();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
?ServerSocketChannel相当于我们说的那个大粗管子,在它上面注册了很多这个管子感兴趣的事件,比如大便、小便、酒醉后吐的污浊都是它关心的。至于谁来控制管道应该关心的事件,是由管道通过Selector注册事件完成的,Selector相当于一个大管道的维护员了。管道必须得有服务商的厂家维护吧,不能滥用吧。Selector就是个管家,负责管道的事件监听的。XXXXBuffer相当于咱们说的坐便器,它是以块为单位进行管道疏通的,假如您的尺寸特别大,估计您排出的那个玩意也小不了,就配置一个大点的缓存传给服务那边,当然,您这边得到的服务端返回的加工后肥料,返给您的也是和您配置的尺寸有关系的。客户端的代码如下
package client;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @author liuyan
*
*/
public class NIOClient {
static int SIZE = 2;
final static int bufferSize = 500 * 1024;
static InetSocketAddress ip = new InetSocketAddress("localhost", 12345);
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
static class Download implements Runnable {
protected int index;
String outfile = null;
public Download(int index) {
this.index = index;
this.outfile = "c:\\" + index + ".rmvb";
}
public void run() {
FileOutputStream fout = null;
// FileChannel fcout = null;
try {
fout = new FileOutputStream(outfile);
// fcout = fout.getChannel();
} catch (FileNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
long start = System.currentTimeMillis();
// 打开客户端socket管道
SocketChannel client = SocketChannel.open();
// 客户端的管道的通讯模式
client.configureBlocking(false);
// 选择器
Selector selector = Selector.open();
// 往客户端管道上注册感兴趣的连接事件
client.register(selector, SelectionKey.OP_CONNECT);
// 配置IP
client.connect(ip);
// 配置缓存大小
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
int total = 0;
FOR: for (;;) {
// 阻塞,返回发生感兴趣事件的数量
selector.select();
// 相当于获得感兴趣事件的集合迭代
Iterator<SelectionKey> iter = selector.selectedKeys()
.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
System.out.println("-----Thread "
+ index + "------------------"+key.readyOps());
// 删除这个马上就要被处理的事件
iter.remove();
// 感兴趣的是可连接的事件
if (key.isConnectable()) {
// 获得该事件中的管道对象
SocketChannel channel = (SocketChannel) key
.channel();
// 如果该管道对象已经连接好了
if (channel.isConnectionPending())
channel.finishConnect();
// 往管道中写一些块信息
channel.write(encoder.encode(CharBuffer
.wrap("d://film//1.rmvb")));
// 之后为该客户端管道注册新的感兴趣的事件---读操作
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 由事件获得通讯管道
SocketChannel channel = (SocketChannel) key
.channel();
// 从管道中读取数据放到缓存中
int count = channel.read(buffer);
System.out.println("count:" + count);
if (count > 0) {
// 统计读取的字节数目
total += count;
// 这样一来从posistion~limit这段缓存数据是有效,可利用的
// buffer.flip();
buffer.clear();
// 往输出文件中去写了
if (count < bufferSize) {
byte[] overByte = new byte[count];
for (int index = 0; index < count; index++) {
overByte[index] = buffer.get(index);
}
fout.write(overByte);
} else {
fout.write(buffer.array());
}
} else {
// 关闭客户端通道
client.close();
// 退出大循环
break FOR;
}
}
}
}
// 计算时间
double last = (System.currentTimeMillis() - start) * 1.0 / 1000;
System.out.println("Thread " + index + " downloaded " + total
/ 1024 + "kbytes in " + last + "s.");
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
// 启用线程池
ExecutorService exec = Executors.newFixedThreadPool(SIZE);
for (int index = 1; index <= SIZE; index++) {
exec.execute(new Download(index));
}
exec.shutdown();
long endTime = System.currentTimeMillis();
long timeLong = endTime - startTime;
System.out.println("下载时间:" + timeLong);
}
}
?效果和上一个程序的效果差不多,只是时间上和内存资源占有率上有所提高,当然本机仅仅启动了几个线程,如果客户端启动更多线程,NIO的方式节约资源的效果是明显的,宕机概率小于阻塞IO方式很多。
5.? 总结
其实NIO想写得更多,但是看到网络上已经有很多资料了,就不再展开了,非一篇所能尽述的了的。当然了,NIO也是有场景的,比如适合与长连接的请求,以为NIO维护管道、缓存块、时间选择器等等也需要资源消耗的,而且比传统IO的对象们要重量级。所以原始IO也并不是一无是处,现在还是有很多socket中间件还是已然使用第二种方式。
代码在附件中~~~