CompositeChannelBuffer体现了Netty的“Transparent Zero Copy”
查看API(http://docs.jboss.org/netty/3.2/api/org/jboss/netty/buffer/package-summary.html#package_description)
可以看到,所谓“Transparent Zero Copy”是通过 ChannelBuffers的wrappedBuffer方法来实现的:
class="java" name="code"> ChannelBuffer message = ChannelBuffers.wrappedBuffer(header, body);
ChannelBuffer messageWithFooter = ChannelBuffers.wrappedBuffer(message, footer);
这样的好处是:
1.性能更好。因为避免了memory copy
2.返回的是一个ChannelBuffer,这样就统一了
接口,保持了良好的封装和抽象
ChannelBuffers.wrappedBuffer的实现,要依靠CompositeChannelBuffer
先通过一个实例来说明CompositeChannelBuffer的机制
例如有以下两个ChannelBuffer要合并:
bufferA,容量为4,数据为“ABCD”:
+---+---+---+---+
| 0 | 1 | 2 | 3 |
------------------
| A | B | C | D |
+---+---+---+---+
bufferB,容量为3,数据为“EFG”:
+---+---+---+
| 0 | 1 | 2 |
--------------
| E | F | G |
+---+---+---+
合并后,“看起来”是下面这样,但实际上并不会开辟新的
内存空间:
bufferC,容量为7,数据为“ABCDEFG”
+---+---+---+---+---+---+---+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 |
--------------------------------
| A | B | C | D | E | F | G |
+---+---+---+---+---+---+---+
CompositeChannelBuffer是怎么实现“Zero Copy”呢?
首先,用“ChannelBuffer[] components”保存对bufferA和bufferB的引用
注意这是保存引用,而不是复制数据,也就是
components[0] = bufferA
components[1] = bufferB
然后,用“int[] indices”记录bufferA和bufferB的长度:
indices[i] = indices[ i - 1] + component[i -1].length,也就是
indices[0] = 0;
indices[1] = 0 + 4 = 4;
indices[2] = 4 + 3 = 7;
最后看看是如何访问指定位置的数据
例如
ChannelBuffer bufferC = ChannelBuffers.wrappedBuffer(bufferA, bufferB);
那么bufferC.getByte(5)应该取得bufferB里面的“F”
过程是怎样呢?
首先要确定数据是在bufferA还是bufferB,此时,indices就派上用场了
因为 indices[1] < 5 < indices[2]
因此,数据在component[1],也就是bufferB里面
然后要把下标映射到bufferB的下标
因为bufferA长度为4(indices[1]),所以
bufferC.getBytes(5) = bufferB.getBytes(5 - 4) = "F"
说到底,还是指针的运算、下标的映射
另外,还用“int lastAccessedComponentId”缓存了上一个访问的component
比如,访问bufferB的“F”之后,接下来很有可能会访问“G”,因此lastAccessedComponentId=1
(表示上一次访问了bufferB)
加快了查询速度
CompositeChannelBuffer的关键源码:
public class CompositeChannelBuffer extends AbstractChannelBuffer {
private ChannelBuffer[] components;
private int[] indices;
private int lastAccessedComponentId;
public CompositeChannelBuffer(ByteOrder endianness, List<ChannelBuffer> buffers, boolean gathering) {
setComponents(buffers);
}
private void setComponents(List<ChannelBuffer> newComponents) {
// Clear the cache.
lastAccessedComponentId = 0;
components = new ChannelBuffer[newComponents.size()];
for (int i = 0; i < components.length; i ++) {
ChannelBuffer c = newComponents.get(i);
//这两个assert表明了,只接受“满”的buffer
assert c.readerIndex() == 0;
assert c.writerIndex() == c.capacity();
components[i] = c; //保存引用
}
// Build the component lookup table.
indices = new int[components.length + 1];
indices[0] = 0;
for (int i = 1; i <= components.length; i ++) {
indices[i] = indices[i - 1] + components[i - 1].capacity();
}
// Reset the indexes.
setIndex(0, capacity()); //设置合并后的readerIndex=0, writerIndex = 7(以文章开头的为例)
}
public byte getByte(int index) {
int componentId = componentId(index);
//计算得到真实的下标:index - indices[componentId]
return components[componentId].getByte(index - indices[componentId]);
}
private int componentId(int index) {
//先在上次访问的那一个buffer开始查找,先往右(后)查找,找不到再往前
//例如,以文章开头的为例,访问“F”后,再访问“A”,往后查不到,那就需要往前
int lastComponentId = lastAccessedComponentId;
if (index >= indices[lastComponentId]) {
if (index < indices[lastComponentId + 1]) {
return lastComponentId;
}
// Search right
for (int i = lastComponentId + 1; i < components.length; i ++) {
if (index < indices[i + 1]) {
lastAccessedComponentId = i;
return i;
}
}
} else {
// Search left
for (int i = lastComponentId - 1; i >= 0; i --) {
if (index >= indices[i]) {
lastAccessedComponentId = i;
return i;
}
}
}
throw new IndexOutOfBoundsException("Invalid index: " + index + ", maximum: " + indices.length);
}
}