最近又开始需要使用netty进行网络通信方面的编程开发了。于是遇到了一些问题通过查找好多资料记录下来。
?
做的内容大致是:客户端向服务端发送一条命令,服务端接收到之后,根据命令里面的一些信息去读取服务器上的一些文件并把文件内容(文件的内容类似于数据库中的一行一行的数据,是以行存储的,每个字段值以\t分割,每条数据为一行)发送给客户端处理(我这里的样例暂以获取数据之后按行保存入文件中)。
?
1、客户端服务端的代码
class="java" name="code">cmdLog = getSearchCmd(); ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); final ClientBufferHandler clientHandler = new ClientBufferHandler(cmdLog, getEncoding()); final DelimiterBasedFrameDecoder clientDecoder = new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, false, true, ChannelBuffers.copiedBuffer("\r\n", Charset.defaultCharset())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(clientDecoder, clientHandler); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); ChannelFuture future = bootstrap.connect(new InetSocketAddress(serverHost, serverPort)); future.awaitUninterruptibly(); if (!future.isSuccess()) { future.getCause().printStackTrace(); } future.getChannel().getCloseFuture().awaitUninterruptibly(); factory.releaseExternalResources();
?cmdLog是客户端要发送的命令,getEncoding()是因为每个服务端要读取的文件可能是不同的编码,客户端这边传过去之后通过这个来编码。
有两个handler,下面会介绍,其他的都是很常规的这里就不多说了。
?
2、服务端代码
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool() ,Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline( new ServerDecoderHandler(), new FileSearchHandler()); return pipeline; } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8027));
?服务端的代码也很简单,后面会详细介绍handler。
?
3、先看客户端的handler,一个是ClientBufferHandler,这个是用来发送命令并接收服务端响应handler。
ClientBufferHandler extends SimpleChannelHandler
?
private static final String testPath = "F:/ test/test"; private String cmd; private String encoding; public ClientBufferHandler(String cmd, String encoding) { this.cmd = cmd; this.encoding = encoding; } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { int cmdLength = cmd.getBytes().length; ChannelBuffer cmdBuffer = ChannelBuffers.buffer(cmdLength+4); cmdBuffer.writeInt(cmdLength); cmdBuffer.writeBytes(cmd.getBytes()); e.getChannel().write(cmdBuffer); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); if(!buf.readable()) { return; } FileHelper.writeFile(testPath, buf.toString(Charset.forName(encoding)));
?重写channelConnected方法,发送命令。我这里在发送命令前面跟了四个字节的命令长度,保证服务端一次接收到所有的命令信息。
?
重写messageReceived方法,接收服务端获取的信息存入文件中。FileHelper.writeFile是把字符串追加写入文件的工具方法我就不放出了,实现方法还是很多的。
?
在这个之前还有一个解码器,因为服务端发送过来的数据都是按行发送的(每行结尾是\r\n),所以使用netty提供的一个解码器DelimiterBasedFrameDecoder实现按分隔符分割接收到的数据,构造方法见客户端的代码。保证获取的数据每次都是完整的一行。这里感谢一下http://blog.163.com/linfenliang@126/blog/static/12785719520121082103807/提供的netty的分包、组包、粘包处理机制。
?
4、然后是服务端的handler。
?
首先是ServerDecoderHandler解码器,保证能够读取完整的命令并把命令前的四个字节用来标识命令长度的内容丢掉。
public class ServerDecoderHandler extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel c, ChannelBuffer buf) throws Exception { int length = 4; if(buf.readableBytes() < length) { return null; } byte[] header = new byte[length]; buf.markReaderIndex(); buf.readBytes(header); int cmdLength = (header[0] & 0xFF) << 24 | (header[1] & 0xFF) << 16 | (header[2] & 0xFF) << 8 | (header[3] & 0xFF); if (cmdLength != 0) { if (buf.readableBytes() < cmdLength) { buf.resetReaderIndex(); return null; } length += cmdLength; } buf.resetReaderIndex(); buf.readerIndex(4); return buf.readBytes(cmdLength); } }
?这部分代码内容比较简单就不多做说明了。
?
然后是FileSearchHandler的代码。
public class CdrFileSearchHandler extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); String cmd = buf.toString(Charset.defaultCharset()); logger.info("查询命令:\r\n" + cmd); // 返回文件内容 Channel ch = e.getChannel(); ChannelFuture f = null; final BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(getPath(cmd)))); String line = ""; while (line != null) { line = reader.readLine(); if (line != null) { ChannelBuffer returnBuf = ChannelBuffers.dynamicBuffer(); returnBuf.writeBytes((line + "\r\n").getBytes()); f = ch.write(returnBuf); } } if(line == null) { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { reader.close(); Channel ch = future.getChannel(); ch.close(); } }); } }
这部分代码其实也很简单,就是获取到命令,根据命令选择文件(这部分代码省略了),按行读取文件,然后加上\r\n然后写入发送。当line不为null时则读取到了数据,如果为null则说明没有读取到数据,跳出循环,并且添加监听器,当发送完关闭各种链接。(之所以会这样判断是因为netty本身是多次调用messageReceived的,需要在发送完最后一条数据的时候关闭连接。)
?
这种内容发送的方式只适用与文件内容比较小可以,但是youyu 一般的handler都是同步执行的,一旦文件内容很大,就会因为文件读取耗时较长导致Worker线程不能及时返回处理其它请求,对性能影响较高,从而导致内存溢出等问题。
?
这个时候就需要使用netty提供的一个handler,ExecutionHandler,ExecutionHandler就是为这种情况设计了,它提供了一种异步处理任务的机制,将它之后handler处理以任务的形式投递到线程池中并直接返回。ExecutionHandler不像其它Handler都是独立的,它是所有Handler共享使用。其使用OrderedMemoryAwareThreadPoolExecutor线程池来保证同一个Channel上事件的先后顺序。
所以在服务端的代码处需要修改代码如下:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline( new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)), new ServerDecoderHandler(), new CdrFileSearchHandler()); return pipeline; } });
?增加一个ExecutionHandler的handler,即可处理。
?
?
注意ExecutionHandler一定要在不同的pipeline 之间共享。它的作用是自动从ExecutionHandler自己管理的一个线程池中拿出一个线程来处理排在它后面的业务逻辑handler。而 worker线程在经过ExecutionHandler后就结束了,它会被ChannelFactory的worker线程池所回收。
它的构造方法是ExecutionHandler(Executor executor) ,很显然executor就是ExecutionHandler内部管理的线程池了。netty额外给我们提供了两种线程池:
MemoryAwareThreadPoolExecutor和OrderedMemoryAwareThreadPoolExecutor,它们都在org.jboss.netty.handler.execution 包下。
MemoryAwareThreadPoolExecutor 确保jvm不会因为过多的线程而导致内存溢出错误,OrderedMemoryAwareThreadPoolExecutor是前一个线程池的子类,除 了保证没有内存溢出之外,还可以保证channel event的处理次序。具体可以查看API文档,上面有详细说明。
?
总结一下:写这些东西用了一天的时间,其实多数时候都是在测试大文件的问题,网上好多源码分析的文章,都很浅,真正说每种buffer怎么用,每种handler的用法的很少。而详细讲源码的文章多数也都是抄来抄去,但是也还是有好多不错的。所以遇到什么问题多找找资料,在自己探索一下应该都可以解决的。(其实api上面的东西还是不少的,有时间研究的话应该好好看看netty的源码有助于更好的使用netty开发)
?
最后说下这里使用的版本是3.6的。
?
?