Netty是采用了Reactor模式的多
线程版本,建议先看下面这篇文章了解一下Reactor模式:
http://bylijinnan.iteye.com/blog/1992325
Netty的启动及
事件处理的流程,基本上是按照上面这篇文章来走的
文章里面提到的操作,每一步都能在Netty里面找到对应的代码
其中Reactor里面的Acceptor就对应Netty的ServerBootstrap.boss;
而Reactor里面的Handler就对应Netty里面各ChannelHandler(在worker里面跑)
由于流程涉及到比较多的类和方法,我提取一下Netty的骨架:
class="java" name="code">
ServerBootstrap.bind(localAddress)
|-->newServerSocketChannel & fireChannelOpen (得到ServerSocketChannel[server])
-->
Binder.channelOpen
|-->Channels.bind(that is : sendDownstream of ChannelState.BOUND)
-->
In DefaultChannelPipeline, No downstreamHandler, jump to
NioServerSocketPipelineSink.bind (关键)
|-->1.do the REAL java.net.ServerSocket.bind (server绑定端口)
2.start bossThread in bossExecutor
3.do "accept & dispatch" in a endless loop of bossThread(得到SocketChannel[client])
|--> registerAcceptedChannel, start worker in workerPool
|-->worker.run
|-->processSelectedKeys(selector.selectedKeys())
|--> read & fireMessageReceived (开始调用各Handler)
下面就对照上面的“骨架”,把关键的代码拿出来读一下
其中关键的步骤,我用“===[关键步骤]===”的形式标记出来了
Netty的Server端是从ServerBootstrap.bind方法开始的:
public class ServerBootstrap extends Bootstrap {
public Channel bind(final SocketAddress localAddress) {
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
ChannelHandler binder = new Binder(localAddress, futureQueue);
ChannelPipeline bossPipeline = Channels.pipeline();
bossPipeline.addLast("binder", binder);
/*===OPEN===
NioServerSocketChannelFactory.newChannel返回一个NioServerSocketChannel
在NioServerSocketChannel的构造函数里,调用ServerSocketChannel.open()并触发channelOpen事件
这个事件由上面的“binder”来处理并返回Future(非阻塞),详见Binder
最后将Future放入futureQueue,以便在接下来的while循环里面取
*/
Channel channel = getFactory().newChannel(bossPipeline);
// Wait until the future is available.
ChannelFuture future = null;
boolean interrupted = false;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
interrupted = true;
}
} while (future == null);
//处理中断的一种方式,详见《Java并发编程实践》
if (interrupted) {
Thread.currentThread().interrupt();
}
// Wait for the future.
future.awaitUninterruptibly();
return channel;
}
//主要是处理channelOpen事件
private final class Binder extends SimpleChannelUpstreamHandler {
private final SocketAddress localAddress;
private final BlockingQueue<ChannelFuture> futureQueue;
private final Map<String, Object> childOptions =
new HashMap<String, Object>();
Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {
this.localAddress = localAddress;
this.futureQueue = futureQueue;
}
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
//处理各种option,例如keep alive,nodelay等等,省略代码
} finally {
ctx.sendUpstream(evt);
}
/*
重点在这里
这里bind方法只是触发sendDownstream(ChannelState.BOUND)
而此时pipeline里面还没有ChannelDownstreamHandler(只有一个handler:“binder”):
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
}
}
sendDownstream(tail, e);
}
因此ChannelState.BOUND会去到pipeline里面的sink,在sink里面执行最终的java.net.ServerSocket.bind操作
详见NioServerSocketPipelineSink.bind
*/
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
assert finished;
}
}
}
NioServerSocketPipelineSink:
class NioServerSocketPipelineSink extends AbstractNioChannelSink {
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
//在这里执行真正的===BIND===
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
//java.net.ServerSocket.bind完成,接下来可以accept了,详见Boss类的run方法
//===BOSS start===,放入线程池里跑(bossExecutor)
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted = true;
}
}
private final class Boss implements Runnable {
private final Selector selector;
private final NioServerSocketChannel channel;
/*
===REGISTER[server]===
注意到每新建一个Boss,就会新建一个selector
*/
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
channel.selector = selector;
}
/*
===ACCEPT&DISPATCH===
boss不断地接受Client的连接并将连接成功的SocketChannel交由worker处理
*/
public void run() {
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
//把acceptedSocket交由worker处理
registerAcceptedChannel(acceptedSocket, currentThread);
}
}
/*
这里面的worker(implements Runnable)就相当于“Reactor Pattern”里面“Handler”
handler需要两方面信息:selector和acceptedSocket,其中后者已经传递过来了,而selector则
在worker.register里新创建一个
*/
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline();
//从WorkerPool里面取:workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]
//可见worker是re-used的
NioWorker worker = nextWorker();
/*
值得注意的是new NioAcceptedSocketChannel(...)包含了一个关键操作:
将pipeline与channel关联起来,一对一;见AbstractChannel类:
protected AbstractChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
this.parent = parent;
this.factory = factory;
this.pipeline = pipeline;
id = allocateId(this);
pipeline.attach(this, sink);
}
*/
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
}
}
}
worker.register,主要工作是创建registerTask(implements Runnable)并放入registerTaskQueue
对应的类是NioWorker 和AbstractNioWorker:
void register(AbstractNioChannel<?> channel, ChannelFuture future) {
//只是创建Runnable,未启动。在worker的run方法中,processRegisterTaskQueue时候才执行
Runnable registerTask = createRegisterTask(channel, future);
//在start()里面启动worker线程
Selector selector = start();
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
private Selector start() {
synchronized (startStopLock) {
if (!started) {
selector = Selector.open();
//===WORKER start===
DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));
}
}
return selector;
}
private final class RegisterTask implements Runnable {
private final NioSocketChannel channel;
private final ChannelFuture future;
private final boolean server;
public void run() {
try {
synchronized (channel.interestOpsLock) {
//===REGISTER[client]=== 初始的state(getRawInterestOps)是OP_READ
channel.channel.register(selector, channel.getRawInterestOps(), channel);
}
fireChannelConnected(channel, remoteAddress);
}
}
}
worker线程的run操作:
public void run() {
for (;;) {
//===SELECT===
SelectorUtil.select(selector);
processRegisterTaskQueue();
processEventQueue();
processWriteTaskQueue();
//在这里面,就会遍历selectedKey并调用相关联的handler
processSelectedKeys(selector.selectedKeys());
}
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
int readyOps = k.readyOps();
//下面的“与”操作等价于k.isReadable
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
//执行读操作
if (!read(k)) {
continue;
}
}
//执行写操作
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
}
}
/*
主要是两个操作:
1.从channel里面读取数据
2.读取完成后,fireMessageReceived,从channel(k.attachment)
可以得到与它关联的pipeline,从而触发pipeline里面的handler
*/
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
int ret = 0;
int readBytes = 0;
boolean failure = true;
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
}
if (readBytes > 0) {
bb.flip();
final ChannelBufferFactory bufferFactory =
channel.getConfig().getBufferFactory();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);
recvBufferPool.release(bb);
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
fireMessageReceived(channel, buffer);
}
return true;
}