分布式缓存的一致性 Hash 的 Java 实现
关于分布式缓存一致性 Hash 算法的原理,有很多书籍、博客都有详细的介绍。本文主要是想对一致性 Hash 算法进行一个小小的实现,方便自己更好的理解。
算法的具体原理如下:
先构造一个长度为 2^32 的整数环(这个环被称为一致性 Hash 环),根据节点名称的 Hash 值(其分布为 [0, 2^32-1])将服务器节点放置在这个 Hash 环上,然后根据数据的 Key 值计算得到其 Hash 值(其分布也为 [0, 2^32-1]),接着在 Hash 环上顺时针查找距离这个 Key 值的 Hash 值最近的服务器节点,完成 Key 到服务器的映射查找。
一致性 Hash 的原理图如下:
第一个版本:不使用虚拟节点的一致性 Hash 算法的实现
首先了,我们写一个 ServerNode 类,代表服务器节点。
这个类比较简单,包含两个属性:名字和哈希值
- package hash;
- public class ServerNode {
- private String serverNodeName;
- private long serverNodeHash;
- public ServerNode(String serverNodeName, long serverNodeHash) {
- super();
- this.serverNodeName = serverNodeName;
- this.serverNodeHash = serverNodeHash;
- }
- public String getServerNodeName() {
- return serverNodeName;
- }
- public void setServerNodeName(String serverNodeName) {
- this.serverNodeName = serverNodeName;
- }
- public long getServerNodeHash() {
- return serverNodeHash;
- }
- public void setServerNodeHash(long serverNodeHash) {
- this.serverNodeHash = serverNodeHash;
- }
- }
然后,我们就根据最上面所描述的算法进行实现。
可能首先要考虑的问题就是:用什么样的数据结构来存储服务器节点。本文采用的是 "排序 + List"。
上面所了存储结构的事,在 ConsistentHashWithoutVirtualNode 类中,包含以下几种方法:
1、addServer; 添加服务器节点的方法,这个是必须的
2、deleteServer; 如果有服务器节点宕机,则在集群中需要删除这个节点
3、hash(),hash 函数,这个是必须的。
4、getServerAccordKey;根据指定的 key 得到其路由到哪个服务器节点上。
具体实现代码如下:
- package hash;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.List;
- import java.util.zip.CRC32;
- public class ConsistentHashWithoutVirtualNode { //用来存储服务器节点对象 List<ServerNode> serverNodes= new ArrayList<ServerNode>(); //添加服务器节点 public void addServerNode(String serverNodeName){ if(serverNodeName==null){ return; } //利用Hash算法,求出服务器节点的Hash值 long serverNodeHash = getHash(serverNodeName); ServerNode serverNode = new ServerNode(serverNodeName,serverNodeHash); serverNodes.add(serverNode); //将serverNodes进行排序 Collections.sort(serverNodes,new Comparator<ServerNode>() { @Override public int compare(ServerNode node1, ServerNode node2) { if(node1.getServerNodeHash()<node2.getServerNodeHash()){ return -1; } return 1; } }); } public long getHash(String serverNodeName) { CRC32 crc32 = new CRC32(); crc32.update(serverNodeName.getBytes()); return crc32.getValue(); } //删除服务器节点 public void deleteServerNode(String serverName){ //这里假设所有服务器名字不一样,则直接遍历名字是否相同即可 int serverNum=serverNodes.size(); for(int i=0;i<serverNum;i++){ ServerNode node = serverNodes.get(i); if(node.getServerNodeName().equals(serverName)){ serverNodes.remove(node); return; } } } //得到应当路由到的服务器结点 public ServerNode getServerNode(String key){ //得到key的hash值 long hash = getHash(key); //在serverNodes中找到大于hash且离其最近的的那个ServerNode //由于serverNodes是升序排列的,因此,找到的第一个大于hash的就是目标节点 for(ServerNode node:serverNodes){ if(node.getServerNodeHash()>hash){ return node; } } //如果没有找到,则说明此key的hash值比所有服务器节点的hash值都大,因此返回最小hash值的那个Server节点 return serverNodes.get(0); } public void printServerNodes(){ System.out.println("所有的服务器节点信息如下:"); for(ServerNode node:serverNodes){ System.out.println(node.getServerNodeName()+":"+node.getServerNodeHash()); } } }
测试函数如下:
- public static void main(String[] args) {
- ConsistentHashWithoutVirtualNode ch = new ConsistentHashWithoutVirtualNode(); //添加一系列的服务器节点 String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111", "192.168.0.3:111", "192.168.0.4:111"}; for(String server:servers){ ch.addServerNode(server); } //打印输出一下服务器节点 ch.printServerNodes(); //看看下面的客户端节点会被路由到哪个服务器节点 String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"}; System.out.println("此时,各个客户端的路由情况如下:"); for(String node:nodes){ ServerNode serverNode = ch.getServerNode(node); System.out.println(node+","+ ch.getHash(node)+"------->"+ serverNode.getServerNodeName()+","+serverNode.getServerNodeHash()); } //如果由一个服务器节点宕机,即需要将这个节点从服务器集群中移除 ch.deleteServerNode("192.168.0.0:111"); System.out.println("删除节点后,再看看同样的客户端的路由情况,如下:"); for(String node:nodes){ ServerNode serverNode = ch.getServerNode(node); System.out.println(node+","+ ch.getHash(node)+"------->"+ serverNode.getServerNodeName()+","+serverNode.getServerNodeHash()); } }
运行结果如下:
从结果中可以看出,以上的代码就简单的模拟了下服务器端集群中有新的服务器添加到集群中,有服务器宕机的情况。
第二个版本:使用虚拟节点来实现一致性 Hash
上面的例子中,虽然我们的测试客户端只有三个,但也可以看到,有两个客户端映射到了同一个服务器端,有一个客户端映射到了另一个服务器端,而剩余的服务器端都没有测试客户端。
因此,从某种程度上来说,这失去了负载均衡的意义,因为负载均衡的目的本身就是为了使得目标服务器均分所有的请求。
因此,解决方法就是:引入 "虚拟节点"。其工作原理是:将一个物理节点拆分为多个虚拟节点,并且同一个物理节点的虚拟节点尽量均匀分布在 Hash 环上。采取这样的方式,就可以有效地解决增加或减少节点时候的负载不均衡的问题。
引入一个虚拟节点类
- package hash2;
- public class VirtualServerNode {
- private String serverNodeName; //这个名字指的是其对应的真实的物理服务器节点的名字 private long virtualServerNodeHash; public VirtualServerNode(String serverNodeName, long virtualServerNodeHash) { super(); this.serverNodeName = serverNodeName; this.virtualServerNodeHash = virtualServerNodeHash; } public String getServerNodeName() { return serverNodeName; } public void setServerNodeName(String serverNodeName) { this.serverNodeName = serverNodeName; } public long getVirtualServerNodeHash() { return virtualServerNodeHash; } public void setVirtualServerNodeHash(long virtualServerNodeHash) { this.virtualServerNodeHash = virtualServerNodeHash; } }
然后,按照第一个版本的思路,改吧改吧就形成了如下引入虚拟节点的代码。
- package hash2;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.List;
- import java.util.zip.CRC32;
- public class ConsistentHashWithVirtualNode { //保存虚拟服务器节点节点 List<virtualservernode> virtualServerNodes = new ArrayList<virtualservernode>(); //每个物理节点对应的虚拟节点的个数 private final static int VIRTUAL_NUM = 5; //添加服务器节点 public void addServerNode(String serverName){ if(serverName==null){ return; } for(int i=0;i<virtual_num;i++){ hash="getHash(virtualServerNodeName);" long="" new="" string="" virtualservernode="" virtualservernodename="serverName+"&&VN"+i;" vsnode="new">() { @Override public int compare(VirtualServerNode node1, VirtualServerNode node2) { if(node1.getVirtualServerNodeHash()<node2.getvirtualservernodehash()){ crc32="" hash="getHash(key);" i="0;i<virtualServerNodes.size();i++){" int="" long="" node="virtualServerNodes.get(i);" public="" return="" string="" virtualservernode="" void="">hash){ return node; } } //如果没有找到,则说明此key的hash值比所有服务器节点的hash值都大,因此返回最小hash值的那个Server节点 return virtualServerNodes.get(0); } public void printServerNodes(){ System.out.println("所有的服务器节点信息如下:"); for(VirtualServerNode node:virtualServerNodes){ System.out.println(node.getServerNodeName()+":"+node.getVirtualServerNodeHash()); } } public static void main(String[] args){ ConsistentHashWithVirtualNode ch = new ConsistentHashWithVirtualNode(); //添加一系列的服务器节点 String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111", "192.168.0.3:111", "192.168.0.4:111"}; for(String server:servers){ ch.addServerNode(server); } //打印输出一下服务器节点 ch.printServerNodes(); //看看下面的客户端节点会被路由到哪个服务器节点 String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"}; System.out.println("此时,各个客户端的路由情况如下:"); for(String node:nodes){ VirtualServerNode virtualServerNode = ch.getServerNode(node); System.out.println(node+","+ ch.getHash(node)+"------->"+ virtualServerNode.getServerNodeName()+","+virtualServerNode.getVirtualServerNodeHash()); } //如果由一个服务器节点宕机,即需要将这个节点从服务器集群中移除 String deleteNodeName="192.168.0.2:111"; ch.deleteServerNode(deleteNodeName); System.out.println("删除节点"+deleteNodeName+"后,再看看同样的客户端的路由情况,如下:"); for(String node:nodes){ VirtualServerNode virtualServerNode = ch.getServerNode(node); System.out.println(node+","+ ch.getHash(node)+"------->"+ virtualServerNode.getServerNodeName()+","+virtualServerNode.getVirtualServerNodeHash()); } } }
就爱阅读 www.92to.com 网友整理上传, 为您提供最全的知识大全, 期待您的分享,转载请注明出处。
来源: