最近实现了基于netty4的beanstalkd的客户端, 实现此客户端的目的是为了学习netty。
beanstalkd是一个高性能、轻量级的分布式
内存队列系统,个人认为,如果需要一个轻量型的 中间件, beanstalkd是很不错的一个选择,
协议也很简单。beanstalkd的
详细介绍 可见
https://wenku.baidu.com/view/b9654077f242336c1eb95e54.html 。
针对消息中间件的概念,分为消息的提供者和消息的
消费者,
结合beanstalkd和netty的概念,beanstalkd中tube与netty中的channel一一对应。为了在同一个项目中消息的提供者与消息的消费者不相互影响,相同名称的tube,提供者和消费者使用不同的channel。
1. beanstalk协议的封装
协议的封装参考了dinstone,首先定义
接口表示协议的功能
class="java">
package com.haole.mq.beanstalk.command;
import io.netty.buffer.ByteBuf;
import java.nio.charset.Charset;
/**
* Created by shengjunzhao on 2017/5/27.
*/
public interface Command {
String getCommandLine(); // 拼接协议的命令
ByteBuf prepareRequest(ByteBuf sendBuf, Charset charset, String delimiter); // 编码命令到ByteBuf中
}
提取公共部分到抽象类中
package com.haole.mq.beanstalk.command;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
/**
* Created by shengjunzhao on 2017/5/27.
*/
public class AbstractCommand implements Command {
private String commandLine;
@Override
public String getCommandLine() {
return this.commandLine;
}
@Override
public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) {
sendBuf.writeBytes(this.commandLine.getBytes(charset));
sendBuf.writeBytes(delimiter.getBytes(charset));
return sendBuf;
}
public void setCommandLine(String commandLine) {
this.commandLine = commandLine;
}
}
beanstalkd每一个协议对于一个具体的类,例如:put命令实现类
package com.haole.mq.beanstalk.command;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
/**
* put command
* Created by shengjunzhao on 2017/5/27.
*/
public class PutCommand extends AbstractCommand {
private final static Logger log = LoggerFactory.getLogger(PutCommand.class);
private byte[] data;
public PutCommand(int priority, int delay, int ttr, byte[] data) {
if (data == null) {
throw new IllegalArgumentException("data is null");
}
if (data.length > 65536) {
throw new IllegalArgumentException("data is too long than 65536");
}
setCommandLine("put " + priority + " " + delay + " " + ttr + " " + data.length);
this.data = data;
}
@Override
public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) {
sendBuf = super.prepareRequest(sendBuf,charset,delimiter);
sendBuf.writeBytes(data);
sendBuf.writeBytes(delimiter.getBytes(charset));
return sendBuf;
}
}
消费类reserve实现
package com.haole.mq.beanstalk.command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by shengjunzhao on 2017/5/29.
*/
public class ReserveCommand extends AbstractCommand {
private final static Logger log = LoggerFactory.getLogger(ReserveCommand.class);
public ReserveCommand(long timeout) {
if (timeout > 0)
setCommandLine("reserve-with-timeout " + timeout);
else
setCommandLine("reserve");
}
}
2. 编码解码的实现
编码用于把beanstalkd的每一个协议转换为netty的ByteBuf
package com.haole.mq.beanstalk.codec;
import com.haole.mq.beanstalk.command.Command;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
/**
* Created by shengjunzhao on 2017/5/28.
*/
public class CommandEncode extends MessageToByteEncoder<Command> {
private static final Logger log = LoggerFactory.getLogger(CommandEncode.class);
private Charset charset;
private String delimiter;
public CommandEncode(Charset charset) {
this(charset, "\r\n");
}
public CommandEncode(Charset charset, String delimiter) {
this.charset = charset;
this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
if (null == msg) {
throw new Exception("The encode message is null");
}
ByteBuf sendBuf = ctx.channel().alloc().buffer(512);
log.debug("&&&&& command={}", msg.getCommandLine());
sendBuf = msg.prepareRequest(sendBuf, charset, delimiter);
out.writeBytes(sendBuf);
}
}
解码用于把netty的ByteBuf转换为beanstalkd协议的响应。beanstalkd协议的响应结构相同,定义一个列来表示
package com.haole.mq.beanstalk.command;
/**
* Created by shengjunzhao on 2017/5/27.
*/
public class Response {
private String statusLine;
private byte[] data;
public String getStatusLine() {
return statusLine;
}
public void setStatusLine(String statusLine) {
this.statusLine = statusLine;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
}
解码器
package com.haole.mq.beanstalk.codec;
import com.haole.mq.beanstalk.command.Response;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.List;
/**
* Created by shengjunzhao on 2017/5/28.
*/
public class CommandDecode extends ByteToMessageDecoder {
private final static Logger log = LoggerFactory.getLogger(CommandDecode.class);
private Charset charset;
private String delimiter;
public CommandDecode(Charset charset) {
this(charset, "\r\n");
}
public CommandDecode(Charset charset, String delimiter) {
this.charset = charset;
this.delimiter = delimiter;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.markReaderIndex();
int readableBytes = in.readableBytes();
Response response = new Response();
byte[] resp = new byte[readableBytes];
in.readBytes(resp);
log.debug("bytebuf in {}",resp);
byte previous = 0;
boolean isReset = true;
for (int i = 0; i < readableBytes; i++) {
byte current = resp[i];
if (previous == 13 && current == 10) {
String commandLine = new String(resp, 0, i - 1, charset);
String[] spilts = commandLine.split(" ");
String result = spilts[0];
if ("RESERVED".equals(result) || "FOUND".equals(result) || "OK".equals(result)) {
String bytesStr = spilts[spilts.length - 1];
if (bytesStr.matches("\\d+")) {
int bytes = Integer.valueOf(bytesStr);
if (bytes == readableBytes - i - 1 - 2) {
byte[] data = new byte[bytes];
System.arraycopy(resp, i + 1, data, 0, bytes);
response.setData(data);
isReset = false;
}
} else
isReset = false;
} else
isReset = false;
response.setStatusLine(commandLine);
break;
}
previous = current;
}
if (isReset)
in.resetReaderIndex();
else {
out.add(response);
}
}
}
3. netty
处理器
处理器接收netty解码后的响应保存到阻塞队列中
package com.haole.mq.beanstalk.handler;
import com.haole.mq.beanstalk.command.Command;
import com.haole.mq.beanstalk.command.Response;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by shengjunzhao on 2017/5/28.
*/
public class BeanstalkHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(BeanstalkHandler.class);
private LinkedBlockingQueue<Response> queue = new LinkedBlockingQueue<>();
private Channel channel;
public BeanstalkHandler() {}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.channel=ctx.channel();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Response response = (Response) msg;
queue.put(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
public Response sendMessage(Command command) throws InterruptedException {
this.channel.writeAndFlush(command);
return queue.take();
}
}
此处使用了阻塞队列保存发送命令的响应。beanstalkd保证了按照顺序接收命令,并按照顺序处理命令。不会出现命令的响应与命令不一致的情况。 但是,netty无法保证请求和响应之间一一对应的关系,依赖于服务器的实现。如果服务器的实现是第三方的,例如像beanstalkd,则能保证一一对应;但是如果服务器对连接到自己的客户端channel的请求使用多
线程来实现,则无法保证按照顺序实现一一对应,可能需要定义特殊的消息协议来保证。
4. netty的初始化
private void init() {
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("beanstalk decode", new CommandDecode(Charset.forName("UTF-8")));
pipeline.addLast("beanstalk encode", new CommandEncode(Charset.forName("UTF-8")));
pipeline.addLast("beanstalk client handler", new BeanstalkHandler());
}
});
}
5. beanstalkd 命令发送
beanstalkd的命令有10多个,以put为例
/**
* 向队列中插入一个job
*
* @param channel
* @param priority 优先级 0~2**32的整数,最高优先级是0
* @param delay 是一个整形数,表示将job放入ready队列需要等待的秒数
* @param ttr time to run—是一个整形数,表示允许一个worker执行该job的秒数。这个时间将从一个worker 获取一个job开始计算。
* 如果该worker没能在<ttr> 秒内删除、释放或休眠该job,这个job就会超时,服务端会主动释放该job。
* 最小ttr为1。如果客户端设置了0,服务端会默认将其增加到1。
* @param data 及job体,是一个长度为<byetes> 的字符序列
* @return 如果大于0,是新job的数字编号,如果小于0,错误,-1;未知;-2:BURIED;-3:EXPECTED_CRLF;-4:JOB_TOO_BIG;-5:DRAINING
* @throws InterruptedException
*/
public long put(Channel channel, int priority, int delay, int ttr, byte[] data) throws InterruptedException {
Command putCommand = new PutCommand(priority, delay, ttr, data);
Response response = channel.pipeline().get(BeanstalkHandler.class).sendMessage(putCommand);
log.debug("response status {}", response.getStatusLine());
String[] spilts = response.getStatusLine().split(" ");
if ("INSERTED".equals(spilts[0])) {
return Long.valueOf(spilts[1]).longValue();
} else if ("BURIED".equals(spilts[0])) {
return -2;
} else if ("EXPECTED_CRLF".equals(spilts[0])) {
return -3;
} else if ("JOB_TOO_BIG".equals(spilts[0])) {
return -4;
} else if ("DRAINING".equals(spilts[0])) {
return -5;
} else
return -1;
}
6. 消息提供者和消费者的初始化和退出
在一个项目中可以单独使用提供者或者消费者,也可以同时使用。此客户端支持简单的beanstalkd集群,同时部署多个beanstalkd服务器,初始化时指定多个服务器的地址,连接是按照tube选择具体的服务器。
Set<String> servers = new HashSet<>();
servers.add("192.168.209.132:11300");
servers.add("192.168.209.133:11300");
servers.add("192.168.209.134:11300");
BeanstalkProvider provider1 = new DefaultBeanstalkProvider(servers, "beanstalks1");
BeanstalkConsumer consumer1 = new DefaultBeanstalkConsumer(servers, "beanstalks1");
初始化后,就可以使用provider或者consumer执行beanstalkd命令。
最后需要调用quit(),退出到服务器的连接
provider1.quit();
consumer1.quit();
具体实现已经上传到GitHub中,[url]https://github.com/shengjunzhao/beanstalk4j [/url]
文中不足之处, 请各位大侠指正。