因为用memcached集群缓存数据,所以增删服务器节点 对缓存key的影响需要考虑一种策略来实现 数据缓存key所映射的节点变动至最小值(这句话好长啊,就是缓存服务器的增减,对在已经缓存了的数据影响降到最小,比如“test”这个数据之前存在a1节点服务器上,那么增加删除了服务器节点,‘test’依然在 a1上(有可能不在,这个原因可以看以下代码),用10个数据来说明吧,感觉有点只可意会不可言传,10个数据,在节点变化时,尽量只有2个数据发生变动,ok)
下面代码示例:
class="java" name="code">
package com.xll;
//服务器对象
public class Server {
private String name;
private String password;
private String ip;
private String port;
public Server() {
}
public Server(String name, String password, String ip, String port) {
super();
this.name = name;
this.password = password;
this.ip = ip;
this.port = port;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
@Override
public String toString() {
return "name:" + name + "ip:" + ip + "port:" + port;
}
}
package com.xll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
//server注册,算法生成
public class ServerLoadBalance {
private TreeMap<Long, Server> nodes;
private List<Server> servers;
private final int NODE_NUM = 200;
public ServerLoadBalance(List<Server> servers) {
super();
this.servers = servers;
init();
}
private void init() { // 初始化一致性hash环
nodes = new TreeMap<Long, Server>();
for (int i = 0; i != servers.size(); ++i) { // 每个真实机器节点都需要关联虚拟节点
final Server shardInfo = servers.get(i);
for (int n = 0; n < NODE_NUM; n++){
// 一个真实机器节点关联NODE_NUM个虚拟节点
Long hash_value = hash("SHARD-" + shardInfo.getIp() + "-NODE-" + n);
//System.out.println("第"+i+"个server的hash 散列值="+hash_value);
nodes.put(hash_value, shardInfo);
}
}
System.out.println("Finish inited virtual node....");
}
/**
* 通过key的一致性hash值 得到 相应的 Server对象
* @param key
* @return
*/
public Server getShardInfo(String key) {
Long hash_value = hash(key);
//System.out.println("key="+key+"的hash值="+hash_value);
SortedMap<Long, Server> tail = nodes.tailMap(hash_value); // 沿环的顺时针找到一个虚拟节点
if (tail.size() == 0) {
return nodes.get(nodes.firstKey());
}
return tail.get(tail.firstKey()); // 返回该虚拟节点对应的真实机器节点的信息
}
/**
* 一致性hash算法
* @param key
* @return
*/
private Long hash(String key) {
ByteBuffer buf = ByteBuffer.wrap(key.getBytes());
int seed = 0x1234ABCD;
ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);
long m = 0xc6a4a7935bd1e995L;
int r = 47;
long h = seed ^ (buf.remaining() * m);
long k;
while (buf.remaining() >= 8) {
k = buf.getLong();
k *= m;
k ^= k >>> r;
k *= m;
h ^= k;
h *= m;
}
if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(8).order(
ByteOrder.LITTLE_ENDIAN);
// for big-endian version, do this first:
// finish.position(8-buf.remaining());
finish.put(buf).rewind();
h ^= finish.getLong();
h *= m;
}
h ^= h >>> r;
h *= m;
h ^= h >>> r;
buf.order(byteOrder);
return h;
}
}
package com.xll;
import java.util.ArrayList;
import java.util.List;
//主要的测试类
public class TestConsistenHash {
public static Server s1 = new Server("ser1", "123", "192.168.216.1", "1");
public static Server s2 = new Server("ser2", "123", "192.168.216.2", "2");
public static Server s3 = new Server("ser3", "123", "192.168.216.3", "3");
public static Server s4 = new Server("ser4", "123", "192.168.216.4", "4");
public static Server s5 = new Server("ser5", "123", "192.168.216.5", "5");
public static List<Server> list = new ArrayList<Server>();
public static int count1 = 0;
public static int count2 = 0;
public static int count3 = 0;
public static int count4 = 0;
public static int count5 = 0;
public static void main(String[] args) {
/**
* 可以通过 增加 / 剔除 server来测试 被点击量 的变化
*/
list.add(s1);
list.add(s2);
list.add(s3);
list.add(s4);
list.add(s5);
/**
* key的数量为1000个
*
*
*
* 在ServerLoadBalance 的 virtual node 数量NODE_NUM 为200时(这个值能达到负载均衡了,100或更小时还没达到)
* 测试的两组数据(5台server和剔除server3时的4台server)
*
*
* server1 hit count = 197
* server2 hit count = 193
* server3 hit count = 210
* server4 hit count = 170
* server5 hit count = 230
*
*
* server1 hit count = 265
* server2 hit count = 248
* server3 hit count = 0
* server4 hit count = 214
* server5 hit count = 273
*
*
*
*
* 在ServerLoadBalance 的 virtual node 数量NODE_NUM 为1时(负载均衡在100或更小时还没达到)
* 测试的两组数据(5台server和剔除server3时的4台server)
* server1 hit count = 359
* server2 hit count = 94
* server3 hit count = 387
* server4 hit count = 67
* server5 hit count = 93
*
*
* server1 hit count = 359
* server2 hit count = 481
* server3 hit count = 0
* server4 hit count = 67
* server5 hit count = 93
*
*
*
*
*/
ServerLoadBalance server = new ServerLoadBalance(list);
for (int i = 0; i < 1000; i++) {
String key = ""+(i+20);
Server s = (Server)server.getShardInfo(key);
if(s.toString().equals(s1.toString()))
count1 ++;
else if(s.toString().equals(s2.toString()))
count2 ++;
else if(s.toString().equals(s3.toString()))
count3 ++;
else if(s.toString().equals(s4.toString()))
count4 ++;
else
count5 ++;
//System.out.println("key" + i + ", server=" + s);
}
//得到各server的命中 情况
System.out.println("#############");
System.out.println("server1 hit count = "+TestConsistenHash.count1);
System.out.println("server2 hit count = "+TestConsistenHash.count2);
System.out.println("server3 hit count = "+TestConsistenHash.count3);
System.out.println("server4 hit count = "+TestConsistenHash.count4);
System.out.println("server5 hit count = "+TestConsistenHash.count5);
System.out.println("#############");
// String key = "test";
// TestThread tt = new TestThread(shard, key);
//
// new Thread(tt, "0").start();
// new Thread(tt, "10001").start();
// new Thread(tt, "20001").start();
// new Thread(tt, "30001").start();
// new Thread(tt, "40001").start();
// new Thread(tt, "50001").start();
}
}
package com.xll;
//用于跑大量数据
public class TestThread extends Thread{
ServerLoadBalance shard;
String key;
public TestThread(ServerLoadBalance shard,String key){
this.shard = shard;
this.key = key;
}
@Override
public void run() {
runHash();
printCountValue();
}
public void runHash(){
String name = currentThread().getName();
int start = Integer.parseInt(name);
for (int i = start; i < start+10000; i++) {
Server s = (Server)shard.getShardInfo(key+i);
increaseCount(s);
}
}
public void increaseCount(Server s){
if(s.toString().equals(TestConsistenHash.s1.toString()))
increase(1);
else if(s.toString().equals(TestConsistenHash.s2.toString()))
increase(2);
else if(s.toString().equals(TestConsistenHash.s3.toString()))
increase(3);
else if(s.toString().equals(TestConsistenHash.s4.toString()))
increase(4);
else
increase(5);
}
static synchronized void increase(int i){
switch (i) {
case 1:
TestConsistenHash.count1 ++;
break;
case 2:
TestConsistenHash.count2 ++;
break;
case 3:
TestConsistenHash.count3 ++;
break;
case 4:
TestConsistenHash.count4 ++;
break;
default:
TestConsistenHash.count5 ++;
break;
}
}
void printCountValue(){
System.out.println("#############");
System.out.println("server1 hit count = "+TestConsistenHash.count1);
System.out.println("server2 hit count = "+TestConsistenHash.count2);
System.out.println("server3 hit count = "+TestConsistenHash.count3);
System.out.println("server4 hit count = "+TestConsistenHash.count4);
System.out.println("server5 hit count = "+TestConsistenHash.count5);
System.out.println("#############");
}
}
在测试类中,有几组数据对一致性的测试,当每个真实的服务器节点只有一个虚拟节点时,去掉一台机器,那么这台机器的负荷只会倾倒性的转移到一台服务器上,其他节点数据和负荷不发生变化, 这个验证了:
一致性hash在某个节点变化时,其他节点上的原来的数据依然保持
当每个节点的虚拟节点数增加至200个时, 这时去掉一台服务器节点,这个节点的
访问量变为0, 其负载均衡的分摊至每个还存在的服务器节点上,这个验证了:
一致性hash还能达到负载均衡,而不会导致某台负荷过重而导致 多米诺(相继挂掉)的效应