引入Netty背景
?
目前使用JDK的NIO类库进行开发问题较多如下:
1,NIO的类库和API繁杂,使用麻烦,需要熟练掌握Selector,ServerSocketChannel,SocketChannel,ByteBuffer等。
2,需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程。
3,可靠性能力补齐,工作量和难度都非常大。
4,JDK NIO的BUG,如epoll bug,它会导致Selector空轮询,最终导致CPU 100%。
?
Netty是业界最流行的NIO框架之一,它的健壮性,功能,性能,可定制性和可扩展性在同类框架中都是首屈一指的,已经得到成百上千的商用项目的验证。
Netty的优点总结如下:
API使用简单,开发门槛低。
功能强大,预置了多种编解码功能,支持多种主流协议。
性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优。
成熟,稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼。
经历了大规模的商业应用考验,质量得到验证。
?
Netty原理介绍
Netty是基于NIO的多线程设计的Reactors模式。增加线程扩展性,主要应用与多核处理器中。
Worker线程,Reactors要快速触发handlers。handlers的处理降低了Reactor的速度,需要将非I/O操作分离到其他线程中处理。
Reactor线程可以使用IO操作饱和,分布负载到其他reactors,负载均衡来匹配CPU和IO之间的速度差异。
?
原理图如下:
?
mainReactor:用于服务端接受客户端的连接。
subReactor:用于进行SocketChannel的网络读写。
queued tasks和worker threads:作为handler类,用于编解码(序列化和反序列化)以及进行数据的相关处理动作。
?
Netty实例
?
netty的服务端代码如下:
class="java" name="code">package com.huawei.netty.test; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by liuzhengqiu on 2017/10/15. */ public class NettyServer { public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChildChannelHandler()); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } } public static void main(String[] args) throws Exception { new NettyServer().bind(8080); } }
?
package com.huawei.netty.test; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; /** * Created by liuzhengqiu on 2017/10/15. */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] req = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(req); String body = new String(req,"UTF-8"); System.out.println("The time server receive order :" + body); String currentTime = "hello world".equalsIgnoreCase(body)?new Date().toString():"bad order"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) { ctx.close(); } }
?
客户端代码如下:
?
package com.huawei.netty.test; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * Created by liuzhengqiu on 2017/10/15. */ public class NettyClient { public void connect(int port,String host) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture f = bootstrap.connect(host,port).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyClient().connect(8080,"127.0.0.1"); } }
?
package com.huawei.netty.test; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Created by liuzhengqiu on 2017/10/15. */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf msg; public NettyClientHandler() { byte[] req = "hello world".getBytes(); msg = Unpooled.buffer(req.length); msg.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(msg); } @Override public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req,"UTF-8"); System.out.println("Now is :"+body); } }
Netty服务端和客户端总结
netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的ID作为Map的Key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务器发送Ping消息,检测SocketChannel是否终断。
?