ReplayingDecoder是FrameDecoder的子类,不熟悉FrameDecoder的,可以先看看
http://bylijinnan.iteye.com/blog/1982618
API说,ReplayingDecoder简化了操作,比如:
FrameDecoder在decode时,需要判断数据是否接收完全:
class="java">
public class IntegerHeaderFrameDecoder extends FrameDecoder {
protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf) throws Exception {
if (buf.readableBytes() < 4) {
return null;
}
buf.markReaderIndex();
int length = buf.readInt();
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return null;
}
return buf.readBytes(length);
}
}
而ReplayingDecoder则隐藏了这些判断,好像数据已经接收完全了一样:
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<VoidEnum> {
protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf,
VoidEnum state) throws Exception {
int length = buf.readInt();
return buf.readBytes(length);
}
}
可见,ReplayingDecoder使用起来要简单、直观
除此之外,
很多时候frame的结构不会是“length + content”这么简单,如果结构复杂了,
FrameDecoder的
if语句就会非常多,且先到达的sub-frame就会被多次decode
ReplayingDecoder允许用户
自定义state,表示decode到哪一部分了,从而下
一次数据到达时,直接从nextState开始decode
那ReplayingDecoder这是怎么做到“好像数据已经接收完全了一样”?
事实上它跟FrameDecoder的思路是一样的,只是它在数据接收不完全时,
“悄悄地失败”。这主要是通过ReplayingDecoderBuffer来实现的:
class ReplayingDecoderBuffer implements ChannelBuffer {
private static final Error REPLAY = new ReplayError();
private final ReplayingDecoder<?> parent;
private boolean terminated;
//ReplayingDecoder作为参数
ReplayingDecoderBuffer(ReplayingDecoder<?> parent) {
this.parent = parent;
}
/*ReplayingDecoderBuffer实际上是FrameDecoder里面的cumulation(类型为ChannelBuffer)
FrameDecoder里面,对数据的累积接收,以及数据的读取,都是在cumulation,
而ReplayingDecoderBuffer对cumulation进行了封装,在数据不完全时,读取操作都会抛出Error
*/
private ChannelBuffer buf() {
return parent.internalBuffer();
}
public byte getByte(int index) {
checkIndex(index);
return buf().getByte(index);
}
//数据接收不完全时,抛出Error,在ReplayingDecoder里捕获
private void checkIndex(int index) {
if (index > buf().writerIndex()) {
throw REPLAY;
}
}
public abstract class ReplayingDecoder<T extends Enum<T>>
extends FrameDecoder {
//创建了ReplayingDecoderBuffer
private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this);
private T state;
//标记着下一次数据读取的开始位置,也代表着已经decode到哪一部分了
private int checkpoint;
protected void checkpoint() {
ChannelBuffer cumulation = this.cumulation;
if (cumulation != null) {
checkpoint = cumulation.readerIndex();
} else {
checkpoint = -1; // buffer not available (already cleaned up)
}
}
protected void checkpoint(T state) {
checkpoint();
setState(state);
}
protected T setState(T newState) {
T oldState = state;
state = newState;
return oldState;
}
//这个方法交由子类实现,子类可以定义更详细、语义更丰富的state
protected abstract Object decode(ChannelHandlerContext ctx,
Channel channel, ChannelBuffer buffer, T state) throws Exception;
//重点在这个方法,注意对ReplayError的处理
private void callDecode(
ChannelHandlerContext context, Channel channel,
ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
while (input.readable()) {
int oldReaderIndex = checkpoint = input.readerIndex();
Object result = null;
T oldState = state;
try {
//如果数据接收还不完全,decode方法就会抛出ReplayError
//因为decode方法会调用replayableInput(类型为ReplayingDecoderBuffer)的getXXX方法
result = decode(context, channel, replayableInput, state);
if (result == null) {
//省略
}
} catch (ReplayError replay) {
//数据不充足,回退到checkpoint,下一次messageReceived再试
// Return to the checkpoint (or oldPosition) and retry.
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
input.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
}
if (result == null) {
// Seems like more data is required.
// Let us wait for the next notification.
break;
}
// A successful decode
unfoldAndFireMessageReceived(context, remoteAddress, result);
}
}
ReplayingDecoder的使用,API给出了一个简单的
例子:
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf,
MyDecoderState state) throws Exception {
switch (state) {
case READ_LENGTH:
length = buf.readInt();
//设置下一次的decode从哪一个state开始
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ChannelBuffer frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
return frame;
default:
throw new Error("Shouldn't reach here.");
}
}
}
注意到上面的switch语句是不需要break的,因为case READ_LENGTH后:
1.如果数据不充足,那就会在buf.readBytes(length)里面抛出ReplayError,相当于有了break语句
2.如果数据充足,那就接着decode,最终成功并 return frame
除了API以外,下面这两篇文章写得非常好:
http://biasedbit.com/netty-tutorial-replaying-decoder/
http://biasedbit.com/an-enhanced-version-of-replayingdecoder-for-netty/
其中第二个例子,文章中给的代码不是很完整,我补全并做了一个简单的测试:
https://github.com/bylijinnan/nettyLearn/tree/master/ljn-netty3-learn/src/main/java/com/ljn/handler/replay