在zookeeper学习笔记(一)中记录了zookeeper相关概念,这一篇是使用zookeeper开发。
一、安装
zookeeper的安装参考 Kafka学习笔记【二】-安装 中的第二章节,注意zookeeper.propertis对应为zoo.properties.
二、Java编码
zookeeper java开发一般有
三种方式:
1) zookeeper 原生api, 封装层次较低,有很多功能需要自行优化,如断网重连,通知后自动watch等。
2) zkClient 高级api, 对重连,自动watch等进行了分装
3) curator 更加高级api, 除了提供zkClient的功能外, 还提供了选举,分布式锁等
高级功能的封装。
因此, 为了后续更快的实现扩展功能,我选择curator.
由于zookeeper我安装的是3.4.6, 因此选择curator 2.12.0
版本兼容性会好一些。
1. pom 依赖
class="xml" name="code">
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
2. java 代码实现服务节点监控功能
package cn.gov.zjport.gtw.gateway.in.service.impl;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import cn.gov.zjport.gtw.GtwConstants;
import cn.gov.zjport.gtw.gateway.in.service.ZkService;
import cn.gov.zjport.gtw.utils.IpUtil;
@Service("zkService")
public class ZkServiceImpl implements ZkService{
private Logger logger = LoggerFactory.getLogger(this.getClass());
//zookeeper集群地址
@Value("${gtw.zk.address}")
private String zkAddress;
//重连重试次数
@Value("${gtw.zk.retry.times}")
private int zkRetryTimes;
//重连间隔时间
@Value("${gtw.zk.retry.interval}")
private int zkRetryInterval;
private CuratorFramework client;
private final SimpleDateFormat sdf=new SimpleDateFormat("yyyyMMddHHmmss");
private static String zkNodePath;
@PostConstruct
public void init() throws Exception{
client =CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.retryPolicy(new RetryNTimes(zkRetryTimes, zkRetryInterval))
.namespace(GtwConstants.PROJECT_SIMPLE_CODE.toLowerCase())
.build();
client.start();
zkNodePath=GtwConstants.SERVER_NODES_PATH+GtwConstants.ZOOKEEPER_DELIMITER+IpUtil.getLocalRealIp();
}
@Override
public List<String> getLiveIps() throws Exception{
//查询节点下所有的子节点
return client.getChildren().forPath(GtwConstants.SERVER_NODES_PATH);
}
@Override
public void setLiveIp() throws Exception{
//检测节点是否存在
if(client.checkExists().forPath(zkNodePath)==null){
//创建临时节点
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(zkNodePath,sdf.format(new Date()).getBytes());
}else{
logger.error("启动时发现相同的ip[{}]已存在,请确认主机ip配置是否正确",IpUtil.getLocalRealIp());
//修改节点数据
client.setData().forPath(zkNodePath,sdf.format(new Date()).getBytes());
}
}
@Override
public void removeLiveIp() throws Exception{
//删除节点
client.delete().forPath(zkNodePath);
}
@PreDestroy
public void destroy(){
client.close();
}
}